(define-record-type :tagged-msg (make-tagged-msg tag stripped) is-tagged-msg? (tag tagged-msg-tag) (stripped tagged-msg-stripped)) (define-record-type :cmd-msg (make-cmd-msg cmd data) is-cmd-msg? (cmd cmd-msg-cmd) (data cmd-msg-data)) (define (print-info tuid event name) (display ">>> ") (display tuid) (display " : ") (display event) (display " [") (display name) (display "]") (newline)) (define (no-modify msg) msg) (define (always msg) #t) (define (never msg) #f) (define (cond-sink pred modify in out name) (let ((tmp-ch (cml-sync-ch/make-channel))) (spawn (lambda () (cml-sync-ch/send tmp-ch (thread-uid (current-thread))) (let cond-sink-lp ((msg (cml-sync-ch/receive in))) (if (pred msg) ; (begin ; (print-info (thread-uid (current-thread)) ; "cond-sink, forward" (symbol->string name)) (cml-sync-ch/send out (modify msg))) ; (print-info (thread-uid (current-thread)) ; "cond-sink, shredder" (symbol->string name))) (cond-sink-lp (cml-sync-ch/receive in)))) name) (cml-sync-ch/receive tmp-ch))) (define (sink in out) (cond-sink never no-modify in out 'sink)) (define (cond-tee pred modify in out alt name) (let ((tmp-ch (cml-sync-ch/make-channel))) (spawn (lambda () (cml-sync-ch/send tmp-ch (thread-uid (current-thread))) (let cond-tee-lp ((msg (cml-sync-ch/receive in))) (if (pred msg) ; (begin ; (print-info (thread-uid (current-thread)) ; "cond-tee, default" (symbol->string name)) (cml-sync-ch/send out (modify msg)) ; (begin ; (print-info (thread-uid (current-thread)) ; "cond-tee, alternate" (symbol->string name)) (cml-sync-ch/send alt msg)) (cond-tee-lp (cml-sync-ch/receive in)))) name) (cml-sync-ch/receive tmp-ch))) (define (tee in out) (cond-tee always no-modify in out #f 'tee)) (define (tail-element from-head to-head from-sink to-sink in out) (let* ((id (tee from-sink to-head)) (tag-msg (lambda (msg) (make-tagged-msg id msg))) (pred (lambda (tmsg) (eq? (tagged-msg-tag tmsg) id)))) (cond-tee pred tagged-msg-stripped from-head out to-sink 'tail-element-switch) (cond-tee always tag-msg in to-head #f 'tail-element-insert) id)) (define-enumerated-type collect-cmd :collect-cmd is-collect-cmd? the-collect-cmds collect-cmd-name collect-cmd-index (make-link)) (define (head-element modify cmd-in cmd-out head-in head-out name) (let ((id-res-ch (cml-sync-ch/make-channel)) (pred (lambda (msg) (cond ((and (is-cmd-msg? msg) (is-collect-cmd? (cmd-msg-cmd msg)) (eq? (cmd-msg-cmd msg) (collect-cmd make-link))) #f) ((is-tagged-msg? msg) #t) (else (error "head-element: wrong type" msg)))))) (spawn (lambda () (cml-sync-ch/send id-res-ch (thread-uid (current-thread))) (sink head-out head-in) (let head-element-lp ((from-tail head-in) (to-tail head-out)) (let* ((->cmd-out (lambda (msg) ; (print-info (thread-uid (current-thread)) ; "head-element, ->cmd-out" ; (symbol->string name)) (cml-sync-ch/send cmd-out (modify msg)) (cons from-tail to-tail))) (->to-tail (lambda (msg) ; (print-info (thread-uid (current-thread)) ; "head-element, ->to-tail" ; (symbol->string name)) (cml-sync-ch/send to-tail (modify msg)) (cons from-tail to-tail))) (new-tail-el (lambda (msg) (let* ((chs (cmd-msg-data msg)) (new-from-tail (cml-sync-ch/make-channel)) (new-to-tail (cml-sync-ch/make-channel)) (link-in (list-ref chs 0)) (link-out (list-ref chs 1)) (tmp-ch (list-ref chs 2)) (id (tail-element new-to-tail new-from-tail from-tail to-tail link-in link-out))) ; (print-info (thread-uid (current-thread)) ; "head-element, new-tail-el" ; (symbol->string name)) (cml-sync-ch/send tmp-ch id) (cons new-from-tail new-to-tail)))) (chs (cml-rv/select (cml-rv/wrap (cml-sync-ch/receive-rv cmd-in) (lambda (msg) (if (pred msg) (->to-tail msg) (new-tail-el msg)))) (cml-rv/wrap (cml-sync-ch/receive-rv from-tail) (lambda (msg) (->cmd-out msg)))))) (head-element-lp (car chs) (cdr chs))))) name) (cml-sync-ch/receive id-res-ch))) (define-record-type :collect&reply-channel (collect&reply/really-make-channel cmd-in cmd-out) is-collect&reply-channel? (cmd-in collect&reply-channel-cmd-in) (cmd-out collect&reply-channel-cmd-out)) (define (collect&reply/make-channel) (let ((cmd-in (cml-sync-ch/make-channel)) (cmd-out (cml-sync-ch/make-channel)) (head-in (cml-sync-ch/make-channel)) (head-out (cml-sync-ch/make-channel))) (head-element no-modify cmd-in cmd-out head-in head-out 'collect&reply) (collect&reply/really-make-channel cmd-in cmd-out))) (define (make-link from to) (let* ((from-->to (cml-sync-ch/make-channel)) (from<--to (cml-sync-ch/make-channel)) (to-tmp-ch (cml-sync-ch/make-channel)) (from-tmp-ch (cml-sync-ch/make-channel)) (chs-for-to (make-cmd-msg (collect-cmd make-link) (list from-->to from<--to to-tmp-ch))) (chs-for-from (make-cmd-msg (collect-cmd make-link) (list from<--to from-->to from-tmp-ch)))) (cond ((and (is-send&collect-channel? from) (is-collect&reply-channel? to)) (cml-sync-ch/send (collect&reply-channel-cmd-in to) chs-for-to) (cml-sync-ch/send (send&collect-channel-cmd-in from) chs-for-from) (cons (cml-sync-ch/receive from-tmp-ch) (cml-sync-ch/receive to-tmp-ch))) (else (error "make-link: wrong type" from to))))) (define-record-type :send&collect-channel (send&collect/really-make-channel cmd-in cmd-out) is-send&collect-channel? (cmd-in send&collect-channel-cmd-in) (cmd-out send&collect-channel-cmd-out)) (define (send&collect/make-channel) (let ((cmd-in (cml-sync-ch/make-channel)) (cmd-out (cml-sync-ch/make-channel)) (head-in (cml-sync-ch/make-channel)) (head-out (cml-sync-ch/make-channel))) (head-element no-modify cmd-in cmd-out head-in head-out 'send&collect) (send&collect/really-make-channel cmd-in cmd-out))) (define (collect&reply/receive ch) (cml-sync-ch/receive (collect&reply-channel-cmd-out ch))) (define (collect&reply/receive-rv ch) (cml-sync-ch/receive-rv (collect&reply-channel-cmd-out ch))) (define (collect&reply/send ch msg) (cml-sync-ch/send (collect&reply-channel-cmd-in ch) msg)) (define (collect&reply/send-rv ch msg) (cml-sync-ch/send-rv (collect&reply-channel-cmd-in ch) msg)) (define (send&collect/send ch msg) (cml-sync-ch/send (send&collect-channel-cmd-in ch) msg)) (define (send&collect/send-rv ch msg) (cml-sync-ch/send-rv (send&collect-channel-cmd-in ch) msg)) (define (send&collect/receive ch) (cml-sync-ch/receive (send&collect-channel-cmd-out ch))) (define (send&collect/receive-rv ch) (cml-sync-ch/receive-rv (send&collect-channel-cmd-out ch)))