*** empty log message ***
This commit is contained in:
		
							parent
							
								
									a96da29be7
								
							
						
					
					
						commit
						8cb0012a99
					
				| 
						 | 
					@ -0,0 +1,62 @@
 | 
				
			||||||
 | 
					(define (cml-fork sig-ch thunk)
 | 
				
			||||||
 | 
					  (let* ((ch (cml-sync-ch/make-channel))
 | 
				
			||||||
 | 
						 (res-ch (cml-sync-ch/make-channel))
 | 
				
			||||||
 | 
						 (sig-rv (cml-sync-ch/receive-rv sig-ch))
 | 
				
			||||||
 | 
						 (process (fork thunk))
 | 
				
			||||||
 | 
						 (proc-done-rv (cml-sync-ch/receive-rv ch)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    (spawn 
 | 
				
			||||||
 | 
					      (lambda ()
 | 
				
			||||||
 | 
						(let lp ()
 | 
				
			||||||
 | 
						  (cml-rv/select 
 | 
				
			||||||
 | 
						   (cml-rv/wrap sig-rv
 | 
				
			||||||
 | 
								(lambda (sig) (if (not (wait process wait/poll)) 
 | 
				
			||||||
 | 
										  (begin (signal-process process sig)
 | 
				
			||||||
 | 
											 (lp)))))
 | 
				
			||||||
 | 
						   (cml-rv/wrap proc-done-rv
 | 
				
			||||||
 | 
								(lambda (res) (cml-sync-ch/send res-ch res))))))
 | 
				
			||||||
 | 
					      (format #t "cml-fork: signals (for ~a)\n" (proc:pid process)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    (spawn (lambda ()
 | 
				
			||||||
 | 
						     (cml-sync-ch/send ch (wait process)))
 | 
				
			||||||
 | 
						   (format #t "cml-fork: waiting (for ~a)\n" (proc:pid process)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    (cml-sync-ch/receive-rv res-ch)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define (cml-fork-collecting fds sig-ch thunk)
 | 
				
			||||||
 | 
					  (let* ((ch (cml-sync-ch/make-channel))
 | 
				
			||||||
 | 
						 (res-ch (cml-sync-ch/make-channel))
 | 
				
			||||||
 | 
						 (sig-rv (cml-sync-ch/receive-rv sig-ch))
 | 
				
			||||||
 | 
						 ;; from scsh-0.6.6/scsh/scsh.scm
 | 
				
			||||||
 | 
						 (channels (map (lambda (ignore)
 | 
				
			||||||
 | 
								  (call-with-values temp-file-channel cons))
 | 
				
			||||||
 | 
								fds))
 | 
				
			||||||
 | 
						 (read-ports (map car channels))
 | 
				
			||||||
 | 
						 (write-ports (map cdr channels))
 | 
				
			||||||
 | 
						 (process (fork (lambda () 
 | 
				
			||||||
 | 
								  (for-each close-input-port read-ports)
 | 
				
			||||||
 | 
								  (for-each move->fdes write-ports fds)
 | 
				
			||||||
 | 
								  (apply exec-path (thunk)))))
 | 
				
			||||||
 | 
						 (proc-done-rv (cml-sync-ch/receive-rv ch)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    (spawn 
 | 
				
			||||||
 | 
					      (lambda ()
 | 
				
			||||||
 | 
						(let ((exitno (wait process)))
 | 
				
			||||||
 | 
						  (cml-sync-ch/send ch (append (list exitno) 
 | 
				
			||||||
 | 
									       (map port->string read-ports))))) 
 | 
				
			||||||
 | 
					      (format #t "cml-fork-collecting: waiting (for ~a)\n" (proc:pid process))) 
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    (spawn 
 | 
				
			||||||
 | 
					      (lambda ()
 | 
				
			||||||
 | 
						(let loop ()
 | 
				
			||||||
 | 
						  (cml-rv/select 
 | 
				
			||||||
 | 
						   (cml-rv/wrap sig-rv
 | 
				
			||||||
 | 
								(lambda (sig) (if (not (wait process wait/poll)) 
 | 
				
			||||||
 | 
										  (begin (signal-process process sig)
 | 
				
			||||||
 | 
											 (loop)))))
 | 
				
			||||||
 | 
						   (cml-rv/wrap proc-done-rv 
 | 
				
			||||||
 | 
								(lambda (res) (cml-sync-ch/send res-ch res))))))
 | 
				
			||||||
 | 
					      (format #t "cml-fork-collecting: signals (for ~a)\n" (proc:pid process)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    (for-each close-output-port write-ports)
 | 
				
			||||||
 | 
					    (cml-sync-ch/receive-rv res-ch)))
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,25 @@
 | 
				
			||||||
 | 
					(define-record-type :job-desc
 | 
				
			||||||
 | 
					  (make-job-desc wd env cmd)
 | 
				
			||||||
 | 
					  job-desc?
 | 
				
			||||||
 | 
					  (wd job-desc-wd)
 | 
				
			||||||
 | 
					  (env job-desc-env)
 | 
				
			||||||
 | 
					  (cmd job-desc-cmd))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define-record-type :job-res
 | 
				
			||||||
 | 
					  (make-job-res errno stdout stderr)
 | 
				
			||||||
 | 
					  job-res?
 | 
				
			||||||
 | 
					  (errno job-res-errno)
 | 
				
			||||||
 | 
					  (stdout job-res-stdout)
 | 
				
			||||||
 | 
					  (stderr job-res-stderr))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define (display-job-output j-res)
 | 
				
			||||||
 | 
					  (display 
 | 
				
			||||||
 | 
					   (string-append 
 | 
				
			||||||
 | 
					    "job finished with output exitno:\n" 
 | 
				
			||||||
 | 
					    (number->string (job-res-errno j-res)) "\n"
 | 
				
			||||||
 | 
					    "job finished with output stdout:\n" 
 | 
				
			||||||
 | 
					    (job-res-stdout j-res) "\n"
 | 
				
			||||||
 | 
					    "job finished with output stderr:\n" 
 | 
				
			||||||
 | 
					    (job-res-stderr j-res) "\n"))
 | 
				
			||||||
 | 
					  (newline))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -0,0 +1,116 @@
 | 
				
			||||||
 | 
					(define-record-type :jobd
 | 
				
			||||||
 | 
					  (really-make-jobd version-s job-c sig-mc)
 | 
				
			||||||
 | 
					  jobd?
 | 
				
			||||||
 | 
					  (version-s jobd-version-s)
 | 
				
			||||||
 | 
					  (job-c jobd-job-c)
 | 
				
			||||||
 | 
					  (sig-mc jobd-sig-mc))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define-enumerated-type jobber-sig :jobber-sig
 | 
				
			||||||
 | 
					  jobber-sig?
 | 
				
			||||||
 | 
					  the-jobber-sigs
 | 
				
			||||||
 | 
					  jobber-sig-name
 | 
				
			||||||
 | 
					  jobber-sig-index
 | 
				
			||||||
 | 
					  (shutdown stop continue))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define (cml-fork-collecting->rv id job-desc sig-ch)
 | 
				
			||||||
 | 
					  (let* ((ch (cml-sync-ch/make-channel))
 | 
				
			||||||
 | 
						 (cwd (job-desc-wd job-desc))
 | 
				
			||||||
 | 
						 (env (job-desc-env job-desc))
 | 
				
			||||||
 | 
						 (cmd (job-desc-cmd job-desc))
 | 
				
			||||||
 | 
						 (fds (list 1 2))
 | 
				
			||||||
 | 
						 (thunk (lambda () (with-total-env ,env (with-cwd cwd cmd))))
 | 
				
			||||||
 | 
						 (res-rv (cml-fork-collecting fds sig-ch thunk)))
 | 
				
			||||||
 | 
					    (spawn 
 | 
				
			||||||
 | 
					      (lambda ()
 | 
				
			||||||
 | 
						(let ((results (cml-rv/sync res-rv)))
 | 
				
			||||||
 | 
						  (cml-sync-ch/send ch (make-job-res (list-ref results 0)
 | 
				
			||||||
 | 
					                                             (list-ref results 1)
 | 
				
			||||||
 | 
					                                             (list-ref results 2)))))
 | 
				
			||||||
 | 
					      (format #t "cml-fork-collecting->rv (no. ~a)\n" id))
 | 
				
			||||||
 | 
					    (cml-sync-ch/receive-rv ch)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					;;; ->alist?
 | 
				
			||||||
 | 
					(define (jobber-sig->signal sig to-process-element)
 | 
				
			||||||
 | 
					  (cond 
 | 
				
			||||||
 | 
					   ((jobber-sig? sig)
 | 
				
			||||||
 | 
					    (cond 
 | 
				
			||||||
 | 
					     ((eq? (jobber-sig-name sig) 'shutdown) 
 | 
				
			||||||
 | 
						  (cml-sync-ch/send to-process-element signal/kill))
 | 
				
			||||||
 | 
					     ((eq? (jobber-sig-name sig) 'stop) 
 | 
				
			||||||
 | 
						  (cml-sync-ch/send to-process-element signal/stop))
 | 
				
			||||||
 | 
					     ((eq? (jobber-sig-name sig) 'continue) 
 | 
				
			||||||
 | 
						  (cml-sync-ch/send to-process-element signal/cont))
 | 
				
			||||||
 | 
					     (else (error "jobber: jobber-sig->signal received unknown jobber-sig."))))
 | 
				
			||||||
 | 
					   (else (error "jobber: jobber-sig->signal received unknown object."))))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define (job-desc->job-res id sig-mport j-des+res-ch)
 | 
				
			||||||
 | 
					  (let* ((j-des (car j-des+res-ch))
 | 
				
			||||||
 | 
						 (res-ch (cdr j-des+res-ch))
 | 
				
			||||||
 | 
						 (to-process-element (cml-sync-ch/make-channel))
 | 
				
			||||||
 | 
						 (sig-rcv-rv (cml-mcast-ch/mcast-port-receive-rv sig-mport))
 | 
				
			||||||
 | 
						 (job-res-rv (cml-fork-collecting->rv id j-des to-process-element)))
 | 
				
			||||||
 | 
					    (let finish-job ()
 | 
				
			||||||
 | 
					      (cml-rv/select 
 | 
				
			||||||
 | 
					       (cml-rv/wrap sig-rcv-rv
 | 
				
			||||||
 | 
							    (lambda (sig) 
 | 
				
			||||||
 | 
							      (jobber-sig->signal sig to-process-element)
 | 
				
			||||||
 | 
							      (finish-job)))
 | 
				
			||||||
 | 
					       (cml-rv/wrap job-res-rv
 | 
				
			||||||
 | 
							    (lambda (res) 
 | 
				
			||||||
 | 
							      (cml-async-ch/send-async res-ch res)))))))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define (jobber id job-ch sig-mport)
 | 
				
			||||||
 | 
					  (spawn 
 | 
				
			||||||
 | 
					    (lambda () 
 | 
				
			||||||
 | 
					      (let loop ()
 | 
				
			||||||
 | 
						(let ((new-job-rv (cml-async-ch/receive-async-rv job-ch))
 | 
				
			||||||
 | 
						      (sig-rcv-rv (cml-mcast-ch/mcast-port-receive-rv sig-mport)))
 | 
				
			||||||
 | 
						  (cml-rv/select 
 | 
				
			||||||
 | 
						   (cml-rv/wrap new-job-rv
 | 
				
			||||||
 | 
								(lambda (j-des+res-ch) 
 | 
				
			||||||
 | 
								  (job-desc->job-res id sig-mport j-des+res-ch)))
 | 
				
			||||||
 | 
						   (cml-rv/wrap sig-rcv-rv
 | 
				
			||||||
 | 
								(lambda (sig) 
 | 
				
			||||||
 | 
								  (if (eq? (jobber-sig-name sig) 'shutdown) 
 | 
				
			||||||
 | 
								      (terminate-current-thread)))))
 | 
				
			||||||
 | 
						   (loop))))
 | 
				
			||||||
 | 
					    (format #t "jobber (no. ~a)\n" id)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define jobd-vers "jobd-0.0.1")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define (make-jobd)
 | 
				
			||||||
 | 
					  (let* ((version jobd-vers) 
 | 
				
			||||||
 | 
						 (job-ch (cml-async-ch/make-async-channel))
 | 
				
			||||||
 | 
						 (sig-m-ch (cml-mcast-ch/make-mcast-channel))
 | 
				
			||||||
 | 
						 (start-jobber (lambda (id) 
 | 
				
			||||||
 | 
								 (jobber id job-ch (cml-mcast-ch/mcast-port sig-m-ch)))))
 | 
				
			||||||
 | 
					    (for-each start-jobber (enumerate jobbers))
 | 
				
			||||||
 | 
					    (really-make-jobd version job-ch sig-m-ch)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define (version jobd)
 | 
				
			||||||
 | 
					  (jobd-version-s jobd))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define (execute job-desc jobd)
 | 
				
			||||||
 | 
					  (let ((res-ch (cml-async-ch/make-async-channel)))
 | 
				
			||||||
 | 
					    (cml-async-ch/send-async (jobd-job-c jobd) (cons job-desc res-ch))
 | 
				
			||||||
 | 
					    (cml-async-ch/receive-async-rv res-ch)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define (shutdown jobd)
 | 
				
			||||||
 | 
					  (cml-mcast-ch/mcast (jobd-sig-mc jobd) (jobber-sig shutdown)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define (stop jobd)
 | 
				
			||||||
 | 
					  (cml-mcast-ch/mcast (jobd-sig-mc jobd) (jobber-sig stop)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define (continue jobd)
 | 
				
			||||||
 | 
					  (cml-mcast-ch/mcast (jobd-sig-mc jobd) (jobber-sig continue)))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define (enumerate n-max)
 | 
				
			||||||
 | 
					  (cond 
 | 
				
			||||||
 | 
					   ((> n-max 1) (append (enumerate (- n-max 1)) (list n-max)))
 | 
				
			||||||
 | 
					   ((= n-max 1) (list n-max))
 | 
				
			||||||
 | 
					   (else (error "n-max < 0"))))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define jobbers 2)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					(define (set-jobbers! n-of)
 | 
				
			||||||
 | 
					  (set! jobbers n-of))
 | 
				
			||||||
		Loading…
	
		Reference in New Issue