(define-record-type :tagged-msg (make-tagged-msg tag stripped) is-tagged-msg? (tag tagged-msg-tag) (stripped tagged-msg-stripped)) (define-record-type :cmd-msg (make-cmd-msg cmd data) is-cmd-msg? (cmd cmd-msg-cmd) (data cmd-msg-data)) (define (print-info tuid event name) (format (current-error-port) ">>> ~a : ~a [~a]~%" tuid event name)) (define (no-modify msg) msg) (define (always msg) #t) (define (never msg) #f) ;;; (define (cond-sink pred modify in out name) ;;; (let ((tmp-ch (cml-sync-ch/make-channel))) ;;; (spawn ;;; (lambda () ;;; (cml-sync-ch/send tmp-ch (thread-uid (current-thread))) ;;; (let cond-sink-lp ((msg (cml-sync-ch/receive in))) ;;; (if (pred msg) ;;; (cml-sync-ch/send out (modify msg))) ;;; (cond-sink-lp (cml-sync-ch/receive in)))) ;;; name) ;;; (cml-sync-ch/receive tmp-ch))) ;;; ;;; (define (sink in out) (cond-sink never no-modify in out 'sink)) ;;; ;;; (define (cond-tee pred modify in out alt name) ;;; (let ((tmp-ch (cml-sync-ch/make-channel))) ;;; (spawn ;;; (lambda () ;;; (cml-sync-ch/send tmp-ch (thread-uid (current-thread))) ;;; (let cond-tee-lp ((msg (cml-sync-ch/receive in))) ;;; (if (pred msg) ;;; (cml-sync-ch/send out (modify msg)) ;;; (cml-sync-ch/send alt msg)) ;;; (cond-tee-lp (cml-sync-ch/receive in)))) ;;; name) ;;; (cml-sync-ch/receive tmp-ch))) ;;; ;;; (define (tee in out) (cond-tee always no-modify in out #f 'tee)) ;;; ;;; (define (tail-element from-head to-head from-sink to-sink in out) ;;; (let* ((id (tee from-sink to-head)) ;;; (tag-msg (lambda (msg) (make-tagged-msg id msg))) ;;; (pred (lambda (tmsg) (eq? (tagged-msg-tag tmsg) id)))) ;;; (cond-tee pred tagged-msg-stripped from-head out to-sink ;;; (string->symbol (string-append "tail-switch " (number->string id)))) ;;; (cond-tee always tag-msg in to-head #f ;;; (string->symbol (string-append "tail-insert " (number->string id)))) ;;; id)) (define (tail-element from-head to-head from-sink to-sink in out) (let ((id-res-ch (cml-sync-ch/make-channel))) (spawn (lambda () (let* ((id (thread-uid (current-thread))) (tag-msg (lambda (msg) (make-tagged-msg id msg))) (pred (lambda (tmsg) (eq? (tagged-msg-tag tmsg) id)))) (cml-sync-ch/send id-res-ch id) (let ((insert-msg (lambda (msg) (cml-async-ch/send-async to-head (tag-msg msg)))) (insert-rv (cml-async-ch/receive-async-rv in)) (forward-msg (lambda (msg) (cml-async-ch/send-async to-head msg))) (forward-rv (cml-async-ch/receive-async-rv from-sink)) (deliver-msg (lambda (msg) (if (pred msg) (let ((stripped-msg (tagged-msg-stripped msg))) (cml-async-ch/send-async out stripped-msg)) (cml-async-ch/send-async to-sink msg)))) (deliver-rv (cml-async-ch/receive-async-rv from-head))) (let receive+send-lp () (cml-rv/select (cml-rv/wrap insert-rv insert-msg) (cml-rv/wrap forward-rv forward-msg) (cml-rv/wrap deliver-rv deliver-msg)) (receive+send-lp)))))) (cml-sync-ch/receive id-res-ch))) (define-enumerated-type collect-cmd :collect-cmd is-collect-cmd? the-collect-cmds collect-cmd-name collect-cmd-index (make-link)) (define (head-element modify cmd-in cmd-out head-in head-out name) (let ((id-res-ch (cml-sync-ch/make-channel)) (pred (lambda (msg) (cond ((and (is-cmd-msg? msg) (is-collect-cmd? (cmd-msg-cmd msg)) (eq? (cmd-msg-cmd msg) (collect-cmd make-link))) #f) ((is-tagged-msg? msg) #t) (else (error "head-element: wrong type" msg)))))) (spawn (lambda () (cml-sync-ch/send id-res-ch (thread-uid (current-thread))) ; (sink head-out head-in) (let head-element-lp ((from-tail head-in) (to-tail head-out)) (let* ((forward-msg (lambda (ch msg) (cml-async-ch/send-async ch (modify msg)) (cons from-tail to-tail))) (new-tail-el (lambda (msg) (let* ((chs (cmd-msg-data msg)) (new-from-tail (cml-async-ch/make-async-channel)) (new-to-tail (cml-async-ch/make-async-channel)) (link-in (list-ref chs 0)) (link-out (list-ref chs 1)) (tmp-ch (list-ref chs 2)) (id (tail-element new-to-tail new-from-tail from-tail to-tail link-in link-out))) (cml-async-ch/send-async tmp-ch id) (cons new-from-tail new-to-tail)))) (chs (cml-rv/select (cml-rv/wrap (cml-async-ch/receive-async-rv cmd-in) (lambda (msg) (if (pred msg) (forward-msg to-tail msg) (new-tail-el msg)))) (cml-rv/wrap (cml-async-ch/receive-async-rv from-tail) (lambda (msg) (forward-msg cmd-out msg)))))) (head-element-lp (car chs) (cdr chs))))) name) (cml-sync-ch/receive id-res-ch))) (define-record-type :collect&reply-channel (collect&reply/really-make-channel cmd-in cmd-out) is-collect&reply-channel? (cmd-in collect&reply-channel-cmd-in) (cmd-out collect&reply-channel-cmd-out)) (define (collect&reply/make-channel) (let ((cmd-in (cml-async-ch/make-async-channel)) (cmd-out (cml-async-ch/make-async-channel)) (head-in (cml-async-ch/make-async-channel)) (head-out (cml-async-ch/make-async-channel))) (head-element no-modify cmd-in cmd-out head-in head-out 'collect&reply) (collect&reply/really-make-channel cmd-in cmd-out))) (define (make-link from to) (let* ((from-->to (cml-async-ch/make-async-channel)) (from<--to (cml-async-ch/make-async-channel)) (to-tmp-ch (cml-async-ch/make-async-channel)) (from-tmp-ch (cml-async-ch/make-async-channel)) (chs-for-to (make-cmd-msg (collect-cmd make-link) (list from-->to from<--to to-tmp-ch))) (chs-for-from (make-cmd-msg (collect-cmd make-link) (list from<--to from-->to from-tmp-ch)))) (cond ((and (is-send&collect-channel? from) (is-collect&reply-channel? to)) (collect&reply/send to chs-for-to) (send&collect/send from chs-for-from) (cml-rv/select (cml-rv/wrap (cml-async-ch/receive-async-rv from-tmp-ch) (lambda (id-from) (cons id-from (cml-rv/sync (cml-async-ch/receive-async-rv to-tmp-ch))))) (cml-rv/wrap (cml-async-ch/receive-async-rv to-tmp-ch) (lambda (id-to) (cons (cml-rv/sync (cml-async-ch/receive-async-rv from-tmp-ch)) id-to))))) (else (error "make-link: wrong type" from to))))) (define-record-type :send&collect-channel (send&collect/really-make-channel cmd-in cmd-out) is-send&collect-channel? (cmd-in send&collect-channel-cmd-in) (cmd-out send&collect-channel-cmd-out)) (define (send&collect/make-channel) (let ((cmd-in (cml-async-ch/make-async-channel)) (cmd-out (cml-async-ch/make-async-channel)) (head-in (cml-async-ch/make-async-channel)) (head-out (cml-async-ch/make-async-channel))) (head-element no-modify cmd-in cmd-out head-in head-out 'send&collect) (send&collect/really-make-channel cmd-in cmd-out))) (define (collect&reply/receive ch) (cml-rv/sync (cml-async-ch/receive-async-rv (collect&reply-channel-cmd-out ch)))) (define (collect&reply/receive-rv ch) (cml-async-ch/receive-async-rv (collect&reply-channel-cmd-out ch))) (define (collect&reply/send ch msg) (cml-async-ch/send-async (collect&reply-channel-cmd-in ch) msg)) (define (send&collect/send ch msg) (cml-async-ch/send-async (send&collect-channel-cmd-in ch) msg)) (define (send&collect/receive ch) (cml-rv/sync (cml-async-ch/receive-async-rv (send&collect-channel-cmd-out ch)))) (define (send&collect/receive-rv ch) (cml-async-ch/receive-async-rv (send&collect-channel-cmd-out ch)))