(define-record-type :collect&reply-channel (collect&reply/really-make-channel cmd-in cmd-out from-server to-server) is-collect&reply-channel? (cmd-in collect&reply-channel-cmd-in) (cmd-out collect&reply-channel-cmd-out) (from-server collect&reply-channel-from-server) (to-server collect&reply-channel-to-server)) (define-record-type :send&collect-channel (send&collect/really-make-channel cmd-in cmd-out from-server to-server) is-send&collect-channel? (cmd-in send&collect-channel-cmd-in) (cmd-out send&collect-channel-cmd-out) (from-server send&collect-channel-from-server) (to-server send&collect-channel-to-server)) (define-enumerated-type collect&reply-cmd :collect&reply-cmd is-collect&reply-cmd? the-collect&reply-cmds collect&reply-cmd-name collect&reply-cmd-index (make-link)) (define-enumerated-type send&collect-cmd :send&collect-cmd is-send&collect-cmd? the-send&collect-cmds send&collect-cmd-name send&collect-cmd-index (make-link)) (define-record-type :tagged-msg (make-tagged-msg tag stripped) is-tagged-msg? (tag tagged-msg-tag) (stripped tagged-msg-stripped)) (define (collect&reply/tee2 from-server to-sink from-sink to-server in out) (let ((tmp-ch (cml-sync-ch/make-channel))) (spawn (lambda () (let ((tuid (thread-uid (current-thread)))) (cml-sync-ch/send tmp-ch tuid) (let drink-tee ((collect-rv (cml-sync-ch/receive-rv from-sink)) (reply-rv (cml-sync-ch/receive-rv from-server)) (request-rv (cml-sync-ch/receive-rv in))) (cml-rv/select (cml-rv/wrap collect-rv (lambda (tmsg) ;;; (display "tuid: ") (display tuid) ;;; (display ". collect&reply/tee2: collect-rv.\n") (cml-sync-ch/send to-server tmsg))) (cml-rv/wrap reply-rv (lambda (tmsg) (let ((msg (tagged-msg-stripped tmsg)) (tag (tagged-msg-tag tmsg))) ;;; (display "tuid: ") (display tuid) ;;; (display ". collect&reply/tee2: reply-rv.\n") (if (eq? tag tuid) (cml-sync-ch/send out msg) (if to-sink (cml-sync-ch/send to-sink tmsg)))))) (cml-rv/wrap request-rv (lambda (msg) ;;; (display "tuid: ") (display tuid) ;;; (display ". collect&reply/tee2: request-rv.\n") (let ((tmsg (make-tagged-msg tuid msg))) (cml-sync-ch/send to-server tmsg))))) (drink-tee (cml-sync-ch/receive-rv from-sink) (cml-sync-ch/receive-rv from-server) (cml-sync-ch/receive-rv in)))) 'collect&reply/tee2)) (cml-sync-ch/receive tmp-ch))) (define (send&collect/tee2 from-server to-sink from-sink to-server in out) (let ((tmp-ch (cml-sync-ch/make-channel))) (spawn (lambda () (let ((tuid (thread-uid (current-thread)))) (cml-sync-ch/send tmp-ch tuid) (let drink-tee ((collect-rv (cml-sync-ch/receive-rv from-sink)) (send-rv (cml-sync-ch/receive-rv from-server)) (reply-rv (cml-sync-ch/receive-rv in))) (cml-rv/select (cml-rv/wrap collect-rv (lambda (tmsg) ;;; (display "tuid: ") (display tuid) ;;; (display ". send&collect/tee2: collect-rv.\n") (cml-sync-ch/send to-server tmsg))) (cml-rv/wrap send-rv (lambda (tmsg) (let ((msg (tagged-msg-stripped tmsg)) (tag (tagged-msg-tag tmsg))) ;;; (display "tuid: ") (display tuid) ;;; (display ". send&collect/tee2: send-rv.\n") (if (eq? tag tuid) (cml-sync-ch/send out msg) (if to-sink (cml-sync-ch/send to-sink tmsg)))))) (cml-rv/wrap reply-rv (lambda (msg) ;;; (display "tuid: ") (display tuid) ;;; (display ". send&collect/tee2: reply-rv.\n") (let ((tmsg (make-tagged-msg tuid msg))) (cml-sync-ch/send to-server tmsg))))) (drink-tee (cml-sync-ch/receive-rv from-sink) (cml-sync-ch/receive-rv from-server) (cml-sync-ch/receive-rv in))))) 'send&collect/tee2) (cml-sync-ch/receive tmp-ch))) (define (collect&reply/server cmd-in cmd-out from-server to-server) (spawn (lambda () (let collect-or-reply ((cmd-rv (cml-sync-ch/receive-rv cmd-in)) (collect-rv (cml-sync-ch/receive-rv to-server))) (cml-rv/select (cml-rv/wrap cmd-rv (lambda (cmd) (cond ((and (is-collect&reply-cmd? cmd) (eq? (collect&reply-cmd-name cmd) 'make-link)) (let* ((link-in (cml-sync-ch/receive cmd-in)) (link-out (cml-sync-ch/receive cmd-in)) (new-from-server (cml-sync-ch/make-channel)) (new-to-server (cml-sync-ch/make-channel)) (tuid (collect&reply/tee2 new-from-server from-server to-server new-to-server link-in link-out)) (tmp-ch (cml-sync-ch/receive cmd-in))) ;;; (display "collect&reply/server: cmd-rv, tuid: ") ;;; (display (thread-uid (current-thread))) ;;; (newline) (set! from-server new-from-server) (set! to-server new-to-server) (cml-sync-ch/send tmp-ch tuid))) ((is-tagged-msg? cmd) ;;; (display "collect&reply/server: cmd-rv, tuid: ") ;;; (display (thread-uid (current-thread))) ;;; (newline) (cml-sync-ch/send from-server cmd)) (else (error "collect&reply: unsupported message type."))))) (cml-rv/wrap collect-rv (lambda (request) ;;; (display "collect&reply/server: collect-rv, tuid: ") ;;; (display (thread-uid (current-thread))) ;;; (newline) (cml-sync-ch/send cmd-out request)))) (collect-or-reply (cml-sync-ch/receive-rv cmd-in) (cml-sync-ch/receive-rv to-server)))) 'collect&reply/server)) (define (send&collect/server cmd-in cmd-out from-server to-server) (spawn (lambda () (let send-or-collect ((cmd-rv (cml-sync-ch/receive-rv cmd-in)) (reply-rv (cml-sync-ch/receive-rv to-server))) (cml-rv/select (cml-rv/wrap cmd-rv (lambda (cmd) (cond ((and (is-send&collect-cmd? cmd) (eq? (send&collect-cmd-name cmd) 'make-link)) (let* ((link-in (cml-sync-ch/receive cmd-in)) (link-out (cml-sync-ch/receive cmd-in)) (new-from-server (cml-sync-ch/make-channel)) (new-to-server (cml-sync-ch/make-channel)) (tuid (send&collect/tee2 new-from-server from-server to-server new-to-server link-in link-out)) (tmp-ch (cml-sync-ch/receive cmd-in))) ;;; (display "send&collect/server: cmd-rv, tuid: ") ;;; (display (thread-uid (current-thread))) ;;; (newline) (set! from-server new-from-server) (set! to-server new-to-server) (cml-sync-ch/send tmp-ch tuid))) ((is-tagged-msg? cmd) ;;; (display "send&collect/server: cmd-rv, tuid: ") ;;; (display (thread-uid (current-thread))) ;;; (newline) (cml-sync-ch/send from-server cmd)) (else (error "send&collect: unsupported message type."))))) (cml-rv/wrap reply-rv (lambda (reply) ;;; (display "send&collect/server: reply-rv, tuid: ") ;;; (display (thread-uid (current-thread))) ;;; (newline) (cml-sync-ch/send cmd-out reply)))) (send-or-collect (cml-sync-ch/receive-rv cmd-in) (cml-sync-ch/receive-rv to-server)))) 'send&collect/server)) (define (collect&reply/make-sink from-server to-server) (let ((to-sink #f) (from-sink (cml-sync-ch/make-channel)) (link-in (cml-sync-ch/make-channel)) (link-out (cml-sync-ch/make-channel))) (collect&reply/tee2 from-server to-sink from-sink to-server link-in link-out))) (define (collect&reply/make-channel) (let ((cmd-in (cml-sync-ch/make-channel)) (cmd-out (cml-sync-ch/make-channel)) (from-server (cml-sync-ch/make-channel)) (to-server (cml-sync-ch/make-channel))) (collect&reply/make-sink from-server to-server) (collect&reply/server cmd-in cmd-out from-server to-server) (collect&reply/really-make-channel cmd-in cmd-out from-server to-server))) (define (send&collect/make-sink from-server to-server) (let ((to-sink #f) (from-sink (cml-sync-ch/make-channel)) (link-in (cml-sync-ch/make-channel)) (link-out (cml-sync-ch/make-channel))) (send&collect/tee2 from-server to-sink from-sink to-server link-in link-out))) (define (send&collect/make-channel) (let ((cmd-in (cml-sync-ch/make-channel)) (cmd-out (cml-sync-ch/make-channel)) (from-server (cml-sync-ch/make-channel)) (to-server (cml-sync-ch/make-channel))) (send&collect/make-sink from-server to-server) (send&collect/server cmd-in cmd-out from-server to-server) (send&collect/really-make-channel cmd-in cmd-out from-server to-server))) (define (make-link from to) (let ((from-->to (cml-sync-ch/make-channel)) (from<--to (cml-sync-ch/make-channel)) (tmp-ch (cml-sync-ch/make-channel))) (cond ((and (is-send&collect-channel? from) (is-collect&reply-channel? to)) (cml-sync-ch/send (collect&reply-channel-cmd-in to) (collect&reply-cmd make-link)) (cml-sync-ch/send (collect&reply-channel-cmd-in to) from-->to) (cml-sync-ch/send (collect&reply-channel-cmd-in to) from<--to) (cml-sync-ch/send (collect&reply-channel-cmd-in to) tmp-ch) (cml-sync-ch/receive tmp-ch) (cml-sync-ch/send (send&collect-channel-cmd-in from) (send&collect-cmd make-link)) (cml-sync-ch/send (send&collect-channel-cmd-in from) from<--to) (cml-sync-ch/send (send&collect-channel-cmd-in from) from-->to) (cml-sync-ch/send (send&collect-channel-cmd-in from) tmp-ch) (cml-sync-ch/receive tmp-ch)) (else (error "make-link: from/to has/have wrong type."))))) (define (collect&reply/receive ch) (cml-sync-ch/receive (collect&reply-channel-cmd-out ch))) (define (collect&reply/receive-rv ch) (cml-sync-ch/receive-rv (collect&reply-channel-cmd-out ch))) (define (collect&reply/send ch msg) (cml-sync-ch/send (collect&reply-channel-cmd-in ch) msg)) (define (collect&reply/send-rv ch msg) (cml-sync-ch/send-rv (collect&reply-channel-cmd-in ch) msg)) (define (send&collect/send ch msg) (cml-sync-ch/send (send&collect-channel-cmd-in ch) msg)) (define (send&collect/send-rv ch msg) (cml-sync-ch/send-rv (send&collect-channel-cmd-in ch) msg)) (define (send&collect/receive ch) (cml-sync-ch/receive (send&collect-channel-cmd-out ch))) (define (send&collect/receive-rv ch) (cml-sync-ch/receive-rv (send&collect-channel-cmd-out ch)))