diff --git a/collect-channels.scm b/collect-channels.scm index d068e4c..c574146 100644 --- a/collect-channels.scm +++ b/collect-channels.scm @@ -1,255 +1,176 @@ -(define-record-type :collect&reply-channel - (collect&reply/really-make-channel cmd-in cmd-out from-server to-server) - is-collect&reply-channel? - (cmd-in collect&reply-channel-cmd-in) - (cmd-out collect&reply-channel-cmd-out) - (from-server collect&reply-channel-from-server) - (to-server collect&reply-channel-to-server)) - -(define-record-type :send&collect-channel - (send&collect/really-make-channel cmd-in cmd-out from-server to-server) - is-send&collect-channel? - (cmd-in send&collect-channel-cmd-in) - (cmd-out send&collect-channel-cmd-out) - (from-server send&collect-channel-from-server) - (to-server send&collect-channel-to-server)) - -(define-enumerated-type collect&reply-cmd :collect&reply-cmd - is-collect&reply-cmd? - the-collect&reply-cmds - collect&reply-cmd-name - collect&reply-cmd-index - (make-link)) - -(define-enumerated-type send&collect-cmd :send&collect-cmd - is-send&collect-cmd? - the-send&collect-cmds - send&collect-cmd-name - send&collect-cmd-index - (make-link)) - (define-record-type :tagged-msg (make-tagged-msg tag stripped) is-tagged-msg? (tag tagged-msg-tag) (stripped tagged-msg-stripped)) -(define (collect&reply/tee2 from-server to-sink from-sink to-server in out) - (let ((tmp-ch (cml-sync-ch/make-channel))) - (spawn - (lambda () - (let ((tuid (thread-uid (current-thread)))) - (cml-sync-ch/send tmp-ch tuid) - (let drink-tee ((collect-rv (cml-sync-ch/receive-rv from-sink)) - (reply-rv (cml-sync-ch/receive-rv from-server)) - (request-rv (cml-sync-ch/receive-rv in))) - (cml-rv/select - (cml-rv/wrap collect-rv - (lambda (tmsg) -;;; (display "tuid: ") (display tuid) -;;; (display ". collect&reply/tee2: collect-rv.\n") - (cml-sync-ch/send to-server tmsg))) - (cml-rv/wrap reply-rv - (lambda (tmsg) - (let ((msg (tagged-msg-stripped tmsg)) - (tag (tagged-msg-tag tmsg))) -;;; (display "tuid: ") (display tuid) -;;; (display ". collect&reply/tee2: reply-rv.\n") - (if (eq? tag tuid) - (cml-sync-ch/send out msg) - (if to-sink - (cml-sync-ch/send to-sink tmsg)))))) - (cml-rv/wrap request-rv - (lambda (msg) -;;; (display "tuid: ") (display tuid) -;;; (display ". collect&reply/tee2: request-rv.\n") - (let ((tmsg (make-tagged-msg tuid msg))) - (cml-sync-ch/send to-server tmsg))))) - (drink-tee (cml-sync-ch/receive-rv from-sink) - (cml-sync-ch/receive-rv from-server) - (cml-sync-ch/receive-rv in)))) - 'collect&reply/tee2)) - (cml-sync-ch/receive tmp-ch))) +(define-record-type :cmd-msg + (make-cmd-msg cmd data) + is-cmd-msg? + (cmd cmd-msg-cmd) + (data cmd-msg-data)) -(define (send&collect/tee2 from-server to-sink from-sink to-server in out) +(define (print-info tuid event name) + (display ">>> ") (display tuid) (display " : ") + (display event) (display " [") (display name) (display "]") (newline)) + +(define (no-modify msg) msg) +(define (always msg) #t) +(define (never msg) #f) + +(define (cond-sink pred modify in out name) (let ((tmp-ch (cml-sync-ch/make-channel))) (spawn - (lambda () - (let ((tuid (thread-uid (current-thread)))) - (cml-sync-ch/send tmp-ch tuid) - (let drink-tee ((collect-rv (cml-sync-ch/receive-rv from-sink)) - (send-rv (cml-sync-ch/receive-rv from-server)) - (reply-rv (cml-sync-ch/receive-rv in))) - (cml-rv/select - (cml-rv/wrap collect-rv - (lambda (tmsg) -;;; (display "tuid: ") (display tuid) -;;; (display ". send&collect/tee2: collect-rv.\n") - (cml-sync-ch/send to-server tmsg))) - (cml-rv/wrap send-rv - (lambda (tmsg) - (let ((msg (tagged-msg-stripped tmsg)) - (tag (tagged-msg-tag tmsg))) -;;; (display "tuid: ") (display tuid) -;;; (display ". send&collect/tee2: send-rv.\n") - (if (eq? tag tuid) - (cml-sync-ch/send out msg) - (if to-sink - (cml-sync-ch/send to-sink tmsg)))))) - (cml-rv/wrap reply-rv - (lambda (msg) -;;; (display "tuid: ") (display tuid) -;;; (display ". send&collect/tee2: reply-rv.\n") - (let ((tmsg (make-tagged-msg tuid msg))) - (cml-sync-ch/send to-server tmsg))))) - (drink-tee (cml-sync-ch/receive-rv from-sink) - (cml-sync-ch/receive-rv from-server) - (cml-sync-ch/receive-rv in))))) - 'send&collect/tee2) + (lambda () + (cml-sync-ch/send tmp-ch (thread-uid (current-thread))) + (let cond-sink-lp ((msg (cml-sync-ch/receive in))) + (if (pred msg) +; (begin +; (print-info (thread-uid (current-thread)) +; "cond-sink, forward" (symbol->string name)) + (cml-sync-ch/send out (modify msg))) +; (print-info (thread-uid (current-thread)) +; "cond-sink, shredder" (symbol->string name))) + (cond-sink-lp (cml-sync-ch/receive in)))) + name) (cml-sync-ch/receive tmp-ch))) -(define (collect&reply/server cmd-in cmd-out from-server to-server) - (spawn - (lambda () - (let collect-or-reply ((cmd-rv (cml-sync-ch/receive-rv cmd-in)) - (collect-rv (cml-sync-ch/receive-rv to-server))) - (cml-rv/select - (cml-rv/wrap cmd-rv - (lambda (cmd) - (cond - ((and (is-collect&reply-cmd? cmd) - (eq? (collect&reply-cmd-name cmd) 'make-link)) - (let* ((link-in (cml-sync-ch/receive cmd-in)) - (link-out (cml-sync-ch/receive cmd-in)) - (new-from-server (cml-sync-ch/make-channel)) - (new-to-server (cml-sync-ch/make-channel)) - (tuid (collect&reply/tee2 new-from-server - from-server - to-server - new-to-server - link-in - link-out)) - (tmp-ch (cml-sync-ch/receive cmd-in))) -;;; (display "collect&reply/server: cmd-rv, tuid: ") -;;; (display (thread-uid (current-thread))) -;;; (newline) - (set! from-server new-from-server) - (set! to-server new-to-server) - (cml-sync-ch/send tmp-ch tuid))) - ((is-tagged-msg? cmd) -;;; (display "collect&reply/server: cmd-rv, tuid: ") -;;; (display (thread-uid (current-thread))) -;;; (newline) - (cml-sync-ch/send from-server cmd)) - (else - (error "collect&reply: unsupported message type."))))) - (cml-rv/wrap collect-rv - (lambda (request) -;;; (display "collect&reply/server: collect-rv, tuid: ") -;;; (display (thread-uid (current-thread))) -;;; (newline) - (cml-sync-ch/send cmd-out request)))) - (collect-or-reply (cml-sync-ch/receive-rv cmd-in) - (cml-sync-ch/receive-rv to-server)))) - 'collect&reply/server)) +(define (sink in out) (cond-sink never no-modify in out 'sink)) -(define (send&collect/server cmd-in cmd-out from-server to-server) - (spawn - (lambda () - (let send-or-collect ((cmd-rv (cml-sync-ch/receive-rv cmd-in)) - (reply-rv (cml-sync-ch/receive-rv to-server))) - (cml-rv/select - (cml-rv/wrap cmd-rv - (lambda (cmd) - (cond - ((and (is-send&collect-cmd? cmd) - (eq? (send&collect-cmd-name cmd) 'make-link)) - (let* ((link-in (cml-sync-ch/receive cmd-in)) - (link-out (cml-sync-ch/receive cmd-in)) - (new-from-server (cml-sync-ch/make-channel)) - (new-to-server (cml-sync-ch/make-channel)) - (tuid (send&collect/tee2 new-from-server - from-server - to-server - new-to-server - link-in - link-out)) - (tmp-ch (cml-sync-ch/receive cmd-in))) -;;; (display "send&collect/server: cmd-rv, tuid: ") -;;; (display (thread-uid (current-thread))) -;;; (newline) - (set! from-server new-from-server) - (set! to-server new-to-server) - (cml-sync-ch/send tmp-ch tuid))) - ((is-tagged-msg? cmd) -;;; (display "send&collect/server: cmd-rv, tuid: ") -;;; (display (thread-uid (current-thread))) -;;; (newline) - (cml-sync-ch/send from-server cmd)) - (else - (error "send&collect: unsupported message type."))))) - (cml-rv/wrap reply-rv - (lambda (reply) -;;; (display "send&collect/server: reply-rv, tuid: ") -;;; (display (thread-uid (current-thread))) -;;; (newline) - (cml-sync-ch/send cmd-out reply)))) - (send-or-collect (cml-sync-ch/receive-rv cmd-in) - (cml-sync-ch/receive-rv to-server)))) - 'send&collect/server)) +(define (cond-tee pred modify in out alt name) + (let ((tmp-ch (cml-sync-ch/make-channel))) + (spawn + (lambda () + (cml-sync-ch/send tmp-ch (thread-uid (current-thread))) + (let cond-tee-lp ((msg (cml-sync-ch/receive in))) + (if (pred msg) +; (begin +; (print-info (thread-uid (current-thread)) +; "cond-tee, default" (symbol->string name)) + (cml-sync-ch/send out (modify msg)) +; (begin +; (print-info (thread-uid (current-thread)) +; "cond-tee, alternate" (symbol->string name)) + (cml-sync-ch/send alt msg)) + (cond-tee-lp (cml-sync-ch/receive in)))) + name) + (cml-sync-ch/receive tmp-ch))) -(define (collect&reply/make-sink from-server to-server) - (let ((to-sink #f) - (from-sink (cml-sync-ch/make-channel)) - (link-in (cml-sync-ch/make-channel)) - (link-out (cml-sync-ch/make-channel))) - (collect&reply/tee2 from-server to-sink from-sink to-server link-in link-out))) +(define (tee in out) (cond-tee always no-modify in out #f 'tee)) + +(define (tail-element from-head to-head from-sink to-sink in out) + (let* ((id (tee from-sink to-head)) + (tag-msg (lambda (msg) (make-tagged-msg id msg))) + (pred (lambda (tmsg) (eq? (tagged-msg-tag tmsg) id)))) + (cond-tee pred tagged-msg-stripped from-head out to-sink 'tail-element-switch) + (cond-tee always tag-msg in to-head #f 'tail-element-insert) + id)) + +(define-enumerated-type collect-cmd :collect-cmd + is-collect-cmd? + the-collect-cmds + collect-cmd-name + collect-cmd-index + (make-link)) + +(define (head-element modify cmd-in cmd-out head-in head-out name) + (let ((id-res-ch (cml-sync-ch/make-channel)) + (pred (lambda (msg) + (cond + ((and (is-cmd-msg? msg) + (is-collect-cmd? (cmd-msg-cmd msg)) + (eq? (cmd-msg-cmd msg) (collect-cmd make-link))) #f) + ((is-tagged-msg? msg) #t) + (else (error "head-element: wrong type" msg)))))) + (spawn + (lambda () + (cml-sync-ch/send id-res-ch (thread-uid (current-thread))) + (sink head-out head-in) + (let head-element-lp ((from-tail head-in) + (to-tail head-out)) + (let* ((->cmd-out (lambda (msg) +; (print-info (thread-uid (current-thread)) +; "head-element, ->cmd-out" +; (symbol->string name)) + (cml-sync-ch/send cmd-out (modify msg)) + (cons from-tail to-tail))) + (->to-tail (lambda (msg) +; (print-info (thread-uid (current-thread)) +; "head-element, ->to-tail" +; (symbol->string name)) + (cml-sync-ch/send to-tail (modify msg)) + (cons from-tail to-tail))) + (new-tail-el (lambda (msg) + (let* ((chs (cmd-msg-data msg)) + (new-from-tail (cml-sync-ch/make-channel)) + (new-to-tail (cml-sync-ch/make-channel)) + (link-in (list-ref chs 0)) + (link-out (list-ref chs 1)) + (tmp-ch (list-ref chs 2)) + (id (tail-element new-to-tail new-from-tail + from-tail to-tail + link-in link-out))) +; (print-info (thread-uid (current-thread)) +; "head-element, new-tail-el" +; (symbol->string name)) + (cml-sync-ch/send tmp-ch id) + (cons new-from-tail new-to-tail)))) + (chs (cml-rv/select + (cml-rv/wrap (cml-sync-ch/receive-rv cmd-in) + (lambda (msg) + (if (pred msg) + (->to-tail msg) + (new-tail-el msg)))) + (cml-rv/wrap (cml-sync-ch/receive-rv from-tail) + (lambda (msg) (->cmd-out msg)))))) + (head-element-lp (car chs) (cdr chs))))) + name) + (cml-sync-ch/receive id-res-ch))) + +(define-record-type :collect&reply-channel + (collect&reply/really-make-channel cmd-in cmd-out) + is-collect&reply-channel? + (cmd-in collect&reply-channel-cmd-in) + (cmd-out collect&reply-channel-cmd-out)) (define (collect&reply/make-channel) (let ((cmd-in (cml-sync-ch/make-channel)) (cmd-out (cml-sync-ch/make-channel)) - (from-server (cml-sync-ch/make-channel)) - (to-server (cml-sync-ch/make-channel))) - (collect&reply/make-sink from-server to-server) - (collect&reply/server cmd-in cmd-out from-server to-server) - (collect&reply/really-make-channel cmd-in cmd-out from-server to-server))) + (head-in (cml-sync-ch/make-channel)) + (head-out (cml-sync-ch/make-channel))) + (head-element no-modify cmd-in cmd-out head-in head-out 'collect&reply) + (collect&reply/really-make-channel cmd-in cmd-out))) -(define (send&collect/make-sink from-server to-server) - (let ((to-sink #f) - (from-sink (cml-sync-ch/make-channel)) - (link-in (cml-sync-ch/make-channel)) - (link-out (cml-sync-ch/make-channel))) - (send&collect/tee2 from-server to-sink from-sink to-server link-in link-out))) +(define (make-link from to) + (let* ((from-->to (cml-sync-ch/make-channel)) + (from<--to (cml-sync-ch/make-channel)) + (to-tmp-ch (cml-sync-ch/make-channel)) + (from-tmp-ch (cml-sync-ch/make-channel)) + (chs-for-to (make-cmd-msg (collect-cmd make-link) + (list from-->to from<--to to-tmp-ch))) + (chs-for-from (make-cmd-msg (collect-cmd make-link) + (list from<--to from-->to from-tmp-ch)))) + (cond + ((and (is-send&collect-channel? from) + (is-collect&reply-channel? to)) + (cml-sync-ch/send (collect&reply-channel-cmd-in to) chs-for-to) + (cml-sync-ch/send (send&collect-channel-cmd-in from) chs-for-from) + (cons (cml-sync-ch/receive from-tmp-ch) (cml-sync-ch/receive to-tmp-ch))) + (else (error "make-link: wrong type" from to))))) + +(define-record-type :send&collect-channel + (send&collect/really-make-channel cmd-in cmd-out) + is-send&collect-channel? + (cmd-in send&collect-channel-cmd-in) + (cmd-out send&collect-channel-cmd-out)) (define (send&collect/make-channel) (let ((cmd-in (cml-sync-ch/make-channel)) (cmd-out (cml-sync-ch/make-channel)) - (from-server (cml-sync-ch/make-channel)) - (to-server (cml-sync-ch/make-channel))) - (send&collect/make-sink from-server to-server) - (send&collect/server cmd-in cmd-out from-server to-server) - (send&collect/really-make-channel cmd-in cmd-out from-server to-server))) - -(define (make-link from to) - (let ((from-->to (cml-sync-ch/make-channel)) - (from<--to (cml-sync-ch/make-channel)) - (tmp-ch (cml-sync-ch/make-channel))) - (cond - ((and (is-send&collect-channel? from) - (is-collect&reply-channel? to)) - (cml-sync-ch/send (collect&reply-channel-cmd-in to) - (collect&reply-cmd make-link)) - (cml-sync-ch/send (collect&reply-channel-cmd-in to) from-->to) - (cml-sync-ch/send (collect&reply-channel-cmd-in to) from<--to) - (cml-sync-ch/send (collect&reply-channel-cmd-in to) tmp-ch) - (cml-sync-ch/receive tmp-ch) - (cml-sync-ch/send (send&collect-channel-cmd-in from) - (send&collect-cmd make-link)) - (cml-sync-ch/send (send&collect-channel-cmd-in from) from<--to) - (cml-sync-ch/send (send&collect-channel-cmd-in from) from-->to) - (cml-sync-ch/send (send&collect-channel-cmd-in from) tmp-ch) - (cml-sync-ch/receive tmp-ch)) - (else (error "make-link: from/to has/have wrong type."))))) + (head-in (cml-sync-ch/make-channel)) + (head-out (cml-sync-ch/make-channel))) + (head-element no-modify cmd-in cmd-out head-in head-out 'send&collect) + (send&collect/really-make-channel cmd-in cmd-out))) (define (collect&reply/receive ch) (cml-sync-ch/receive (collect&reply-channel-cmd-out ch)))