scsh-make/jobd.scm

114 lines
3.6 KiB
Scheme

(define-record-type :jobd
(really-make-jobd version-s job-c sig-mc)
jobd?
(job-c jobd-job-c)
(sig-mc jobd-sig-mc))
(define-enumerated-type jobber-sig :jobber-sig
jobber-sig?
the-jobber-sigs
jobber-sig-name
jobber-sig-index
(shutdown stop continue))
(define (cml-fork/collecting->rv id job-desc sig-ch)
(let* ((ch (cml-sync-ch/make-channel))
(cwd (job-desc-wd job-desc))
(env (job-desc-env job-desc))
(cmd (job-desc-cmd job-desc))
(fds (list 1 2))
(thunk (lambda () (with-total-env ,env (with-cwd cwd cmd))))
(res-rv (cml-fork/collecting fds sig-ch thunk)))
(spawn
(lambda ()
(let ((results (cml-rv/sync res-rv)))
(cml-sync-ch/send ch (make-job-res (list-ref results 0)
(list-ref results 1)
(list-ref results 2)))))
(format #t "cml-fork/collecting->rv (no. ~a)\n" id))
(cml-sync-ch/receive-rv ch)))
;;; ->alist?
(define (jobber-sig->signal sig to-process-element)
(cond
((jobber-sig? sig)
(cond
((eq? (jobber-sig-name sig) 'shutdown)
(cml-sync-ch/send to-process-element signal/kill))
((eq? (jobber-sig-name sig) 'stop)
(cml-sync-ch/send to-process-element signal/stop))
((eq? (jobber-sig-name sig) 'continue)
(cml-sync-ch/send to-process-element signal/cont))
(else (error "jobber-sig->signal: unknown jobber-sig."))))
(else (error "jobber-sig->signal: unknown object."))))
(define (job-desc->job-res id sig-mport j-des+res-ch)
(let* ((j-des (car j-des+res-ch))
(res-ch (cdr j-des+res-ch))
(to-process-element (cml-sync-ch/make-channel))
(sig-rcv-rv (cml-mcast-ch/mcast-port-receive-rv sig-mport))
(job-res-rv (cml-fork/collecting->rv id j-des to-process-element)))
(let finish-job ()
(cml-rv/select
(cml-rv/wrap sig-rcv-rv
(lambda (sig)
(jobber-sig->signal sig to-process-element)
(finish-job)))
(cml-rv/wrap job-res-rv
(lambda (res)
(cml-async-ch/send-async res-ch res)))))))
(define (jobber id job-ch sig-mport)
(spawn
(lambda ()
(let loop ()
(let ((new-job-rv (cml-async-ch/receive-async-rv job-ch))
(sig-rcv-rv (cml-mcast-ch/mcast-port-receive-rv sig-mport)))
(cml-rv/select
(cml-rv/wrap new-job-rv
(lambda (j-des+res-ch)
(job-desc->job-res id sig-mport j-des+res-ch)))
(cml-rv/wrap sig-rcv-rv
(lambda (sig)
(if (eq? (jobber-sig-name sig) 'shutdown)
(terminate-current-thread)))))
(loop))))
(format #t "jobber (no. ~a)\n" id)))
(define (make-jobd)
(let* ((job-ch (cml-async-ch/make-async-channel))
(sig-m-ch (cml-mcast-ch/make-mcast-channel))
(start-jobber (lambda (id)
(let ((new-mport (cml-mcast-ch/mcast-port sig-m-ch)))
(jobber id job-ch new-mport)))))
(for-each start-jobber (enumerate jobbers))
(really-make-jobd job-ch sig-m-ch))))
(define (execute-rv job-desc jobd)
(let ((res-ch (cml-async-ch/make-async-channel)))
(cml-async-ch/send-async (jobd-job-c jobd) (cons job-desc res-ch))
(cml-async-ch/receive-async-rv res-ch)))
(define (execute job-desc jobd)
(cml-rv/sync (execute-rv job-desc jobd)))
(define (shutdown jobd)
(cml-mcast-ch/mcast (jobd-sig-mc jobd) (jobber-sig shutdown)))
(define (stop jobd)
(cml-mcast-ch/mcast (jobd-sig-mc jobd) (jobber-sig stop)))
(define (continue jobd)
(cml-mcast-ch/mcast (jobd-sig-mc jobd) (jobber-sig continue)))
(define (enumerate n-max)
(cond
((> n-max 1) (append (enumerate (- n-max 1)) (list n-max)))
((= n-max 1) (list n-max))
(else (error "n-max < 0"))))
(define jobbers 2)
(define (set-jobbers! n-of)
(set! jobbers n-of))