(define-record-type :jobd (really-make-jobd version-s job-c sig-mc) jobd? (job-c jobd-job-c) (sig-mc jobd-sig-mc)) (define-enumerated-type jobber-sig :jobber-sig jobber-sig? the-jobber-sigs jobber-sig-name jobber-sig-index (shutdown stop continue)) (define (cml-fork/collecting->rv id job-desc sig-ch) (let* ((ch (cml-sync-ch/make-channel)) (cwd (job-desc-wd job-desc)) (env (job-desc-env job-desc)) (cmd (job-desc-cmd job-desc)) (fds (list 1 2)) (thunk (lambda () (with-total-env ,env (with-cwd cwd cmd)))) (res-rv (cml-fork/collecting fds sig-ch thunk))) (spawn (lambda () (let ((results (cml-rv/sync res-rv))) (cml-sync-ch/send ch (make-job-res (list-ref results 0) (list-ref results 1) (list-ref results 2))))) (format #t "cml-fork/collecting->rv (no. ~a)\n" id)) (cml-sync-ch/receive-rv ch))) ;;; ->alist? (define (jobber-sig->signal sig to-process-element) (cond ((jobber-sig? sig) (cond ((eq? (jobber-sig-name sig) 'shutdown) (cml-sync-ch/send to-process-element signal/kill)) ((eq? (jobber-sig-name sig) 'stop) (cml-sync-ch/send to-process-element signal/stop)) ((eq? (jobber-sig-name sig) 'continue) (cml-sync-ch/send to-process-element signal/cont)) (else (error "jobber-sig->signal: unknown jobber-sig.")))) (else (error "jobber-sig->signal: unknown object.")))) (define (job-desc->job-res id sig-mport j-des+res-ch) (let* ((j-des (car j-des+res-ch)) (res-ch (cdr j-des+res-ch)) (to-process-element (cml-sync-ch/make-channel)) (sig-rcv-rv (cml-mcast-ch/mcast-port-receive-rv sig-mport)) (job-res-rv (cml-fork/collecting->rv id j-des to-process-element))) (let finish-job () (cml-rv/select (cml-rv/wrap sig-rcv-rv (lambda (sig) (jobber-sig->signal sig to-process-element) (finish-job))) (cml-rv/wrap job-res-rv (lambda (res) (cml-async-ch/send-async res-ch res))))))) (define (jobber id job-ch sig-mport) (spawn (lambda () (let loop () (let ((new-job-rv (cml-async-ch/receive-async-rv job-ch)) (sig-rcv-rv (cml-mcast-ch/mcast-port-receive-rv sig-mport))) (cml-rv/select (cml-rv/wrap new-job-rv (lambda (j-des+res-ch) (job-desc->job-res id sig-mport j-des+res-ch))) (cml-rv/wrap sig-rcv-rv (lambda (sig) (if (eq? (jobber-sig-name sig) 'shutdown) (terminate-current-thread))))) (loop)))) (format #t "jobber (no. ~a)\n" id))) (define (make-jobd) (let* ((job-ch (cml-async-ch/make-async-channel)) (sig-m-ch (cml-mcast-ch/make-mcast-channel)) (start-jobber (lambda (id) (let ((new-mport (cml-mcast-ch/mcast-port sig-m-ch))) (jobber id job-ch new-mport))))) (for-each start-jobber (enumerate jobbers)) (really-make-jobd job-ch sig-m-ch)))) (define (execute-rv job-desc jobd) (let ((res-ch (cml-async-ch/make-async-channel))) (cml-async-ch/send-async (jobd-job-c jobd) (cons job-desc res-ch)) (cml-async-ch/receive-async-rv res-ch))) (define (execute job-desc jobd) (cml-rv/sync (execute-rv job-desc jobd))) (define (shutdown jobd) (cml-mcast-ch/mcast (jobd-sig-mc jobd) (jobber-sig shutdown))) (define (stop jobd) (cml-mcast-ch/mcast (jobd-sig-mc jobd) (jobber-sig stop))) (define (continue jobd) (cml-mcast-ch/mcast (jobd-sig-mc jobd) (jobber-sig continue))) (define (enumerate n-max) (cond ((> n-max 1) (append (enumerate (- n-max 1)) (list n-max))) ((= n-max 1) (list n-max)) (else (error "n-max < 0")))) (define jobbers 2) (define (set-jobbers! n-of) (set! jobbers n-of))