From ec29e6728ea2c08afea9f1f4a8cd996f19ff7349 Mon Sep 17 00:00:00 2001 From: jottbee Date: Mon, 17 Jan 2005 07:56:42 +0000 Subject: [PATCH] *** empty log message *** --- mcast-channels.scm | 70 +++++++++++++++++++++++++++++++++++++++++ test-jobd.scm | 34 ++++++++++++++++++++ test-mcast-channels.scm | 21 +++++++++++++ 3 files changed, 125 insertions(+) create mode 100644 mcast-channels.scm create mode 100644 test-jobd.scm create mode 100644 test-mcast-channels.scm diff --git a/mcast-channels.scm b/mcast-channels.scm new file mode 100644 index 0000000..6976ba5 --- /dev/null +++ b/mcast-channels.scm @@ -0,0 +1,70 @@ +(define-record-type :mcast-channel + (really-make-mcast-channel msg-in fan-out reply-ch) + mcast-channel? + (msg-in mcast-channel-msg-in) + (fan-out mcast-channel-fan-out) + (reply-ch mcast-channel-reply-ch)) + +(define-record-type :mcast-port + (really-make-mcast-port channel) + mcast-port? + (channel channel->mcast-port)) + +(define-enumerated-type mcast-cmd :mcast-cmd + mcast-cmd? + the-mcast-cmds + mcast-cmd-name + mcast-cmd-index + (new-port new-sink)) + +(define (tee from-server to-sink) + (let ((to-mbox (make-channel))) + (spawn (lambda () + (let drink-tee ((msg (receive from-server))) + (cond + ((channel? to-sink) + (send to-sink msg) + (send to-mbox msg))) + (drink-tee (receive from-server)))) + 'mcast-channel/tee) + to-mbox)) + +(define (sink from-server) + (tee from-server #f)) + +(define (make-mcast-channel) + (let ((m-in (make-channel)) + (f-out (make-channel)) + (r-ch (make-channel))) + (spawn (lambda () + (sink f-out) + (let lp ((msg (receive m-in))) + (cond + ((eq? (mcast-cmd-name msg) 'new-port) + (let ((new-f-out (make-channel))) + (send r-ch (tee new-f-out f-out)) + (set! f-out new-f-out))) + ((eq? (mcast-cmd-name msg) 'new-sink) + (send r-ch (sink f-out))) + (else (send f-out msg))) + (lp (receive m-in)))) + 'mcast-channel/server) + (really-make-mcast-channel m-in f-out r-ch))) + +(define (mcast mcast-ch msg) + (send (mcast-channel-msg-in mcast-ch) msg)) + +(define (mcast-port mcast-ch) + (send (mcast-channel-msg-in mcast-ch) (mcast-cmd new-port)) + (really-make-mcast-port (receive (mcast-channel-reply-ch mcast-ch)))) + +(define (mcast-port-receive-rv mc-port) + (receive-rv (channel->mcast-port mc-port))) + +(define (mcast-port-receive mc-port) + (sync (mcast-port-receive-rv mc-port))) + +;; attach the sink to the first port after the server +(define (mcast-new-sink mcast-ch) + (send (mcast-channel-msg-in mcast-ch) (mcast-cmd new-sink)) + (receive (mcast-channel-reply-ch mcast-ch))) diff --git a/test-jobd.scm b/test-jobd.scm new file mode 100644 index 0000000..198c873 --- /dev/null +++ b/test-jobd.scm @@ -0,0 +1,34 @@ +(define j-cwd "/home/johannes") +(define j-env (env->alist)) +(define j-cmd "find /afs -name nonexistant") +(define j-descr (make-job-desc j-cwd j-env j-cmd)) + +(define j-cmds + (list (list "leo-dict" "detractors") + (list "ipcs" "-a") + (list "leo-dict" "schism") + (list "find" "studium" "-name" "*.txt") + (list "/bin/sh" "-c" "/bin/cat /var/tmp/*") + (list "ls" "-l") + (list "ps" "-eF"))) + +(define j-descrs + (let ((jdes (lambda (j-cmd) (make-job-desc j-cwd j-env j-cmd)))) + (map jdes j-cmds))) + +(define (do-some-jobs) + (let ((ch (cml-sync-ch/make-channel)) + (start-parallel (jobd/set-jobbers! 3)) + (jobd (jobd/make-jobd))) + (spawn + (lambda () + (let* ((res-rvs (map jobd/execute j-descrs (circular-list jobd))) + (j-res (map cml-rv/sync res-rvs))) + (map display-job-output j-res) + (cml-sync-ch/send ch "done."))) + 'test-jobd) + (sleep 50) + (jobd/stop jobd) + (sleep 40000) + (jobd/continue jobd) + (cml-sync-ch/receive ch))) diff --git a/test-mcast-channels.scm b/test-mcast-channels.scm new file mode 100644 index 0000000..36225b5 --- /dev/null +++ b/test-mcast-channels.scm @@ -0,0 +1,21 @@ +(define (test-it) + (let* ((ls (list "a" "b" "c" "d" "e" "f")) + (m-ch (make-mcast-channel)) + (a-port (mcast-port m-ch)) + (b-port (mcast-port m-ch)) + (c-port (mcast-port m-ch)) + (d-port (mcast-port m-ch)) + (e-port (mcast-port m-ch)) + (f-port (mcast-port m-ch)) + (mcast-ports (list a-port b-port c-port d-port e-port f-port))) + (for-each (lambda (msg) + (spawn (lambda () + (mcast m-ch msg)))) + ls) + (for-each (lambda (mc-port) + (spawn (lambda () + (let rcv-msg ((msg (mcast-port-receive mc-port))) + (display msg) + (newline) + (rcv-msg (mcast-port-receive mc-port)))))) + mcast-ports)))