scsh-make/collect-channels.scm

212 lines
7.5 KiB
Scheme

(define-record-type :tagged-msg
(make-tagged-msg tag stripped)
is-tagged-msg?
(tag tagged-msg-tag)
(stripped tagged-msg-stripped))
(define-record-type :cmd-msg
(make-cmd-msg cmd data)
is-cmd-msg?
(cmd cmd-msg-cmd)
(data cmd-msg-data))
(define (print-info tuid event name)
(format (current-error-port) ">>> ~a : ~a [~a]~%" tuid event name))
(define (no-modify msg) msg)
(define (always msg) #t)
(define (never msg) #f)
;;; (define (cond-sink pred modify in out name)
;;; (let ((tmp-ch (cml-sync-ch/make-channel)))
;;; (spawn
;;; (lambda ()
;;; (cml-sync-ch/send tmp-ch (thread-uid (current-thread)))
;;; (let cond-sink-lp ((msg (cml-sync-ch/receive in)))
;;; (if (pred msg)
;;; (cml-sync-ch/send out (modify msg)))
;;; (cond-sink-lp (cml-sync-ch/receive in))))
;;; name)
;;; (cml-sync-ch/receive tmp-ch)))
;;;
;;; (define (sink in out) (cond-sink never no-modify in out 'sink))
;;;
;;; (define (cond-tee pred modify in out alt name)
;;; (let ((tmp-ch (cml-sync-ch/make-channel)))
;;; (spawn
;;; (lambda ()
;;; (cml-sync-ch/send tmp-ch (thread-uid (current-thread)))
;;; (let cond-tee-lp ((msg (cml-sync-ch/receive in)))
;;; (if (pred msg)
;;; (cml-sync-ch/send out (modify msg))
;;; (cml-sync-ch/send alt msg))
;;; (cond-tee-lp (cml-sync-ch/receive in))))
;;; name)
;;; (cml-sync-ch/receive tmp-ch)))
;;;
;;; (define (tee in out) (cond-tee always no-modify in out #f 'tee))
;;;
;;; (define (tail-element from-head to-head from-sink to-sink in out)
;;; (let* ((id (tee from-sink to-head))
;;; (tag-msg (lambda (msg) (make-tagged-msg id msg)))
;;; (pred (lambda (tmsg) (eq? (tagged-msg-tag tmsg) id))))
;;; (cond-tee pred tagged-msg-stripped from-head out to-sink
;;; (string->symbol (string-append "tail-switch " (number->string id))))
;;; (cond-tee always tag-msg in to-head #f
;;; (string->symbol (string-append "tail-insert " (number->string id))))
;;; id))
(define (tail-element from-head to-head from-sink to-sink in out)
(let ((id-res-ch (cml-sync-ch/make-channel)))
(spawn
(lambda ()
(let* ((id (thread-uid (current-thread)))
(tag-msg (lambda (msg) (make-tagged-msg id msg)))
(pred (lambda (tmsg) (eq? (tagged-msg-tag tmsg) id))))
(cml-sync-ch/send id-res-ch id)
(let ((insert-msg (lambda (msg)
(cml-async-ch/send-async to-head (tag-msg msg))))
(insert-rv (cml-async-ch/receive-async-rv in))
(forward-msg (lambda (msg)
(cml-async-ch/send-async to-head msg)))
(forward-rv (cml-async-ch/receive-async-rv from-sink))
(deliver-msg (lambda (msg)
(if (pred msg)
(let ((stripped-msg (tagged-msg-stripped msg)))
(cml-async-ch/send-async out stripped-msg))
(cml-async-ch/send-async to-sink msg))))
(deliver-rv (cml-async-ch/receive-async-rv from-head)))
(let receive+send-lp ()
(cml-rv/select
(cml-rv/wrap insert-rv insert-msg)
(cml-rv/wrap forward-rv forward-msg)
(cml-rv/wrap deliver-rv deliver-msg))
(receive+send-lp))))))
(cml-sync-ch/receive id-res-ch)))
(define-enumerated-type collect-cmd :collect-cmd
is-collect-cmd?
the-collect-cmds
collect-cmd-name
collect-cmd-index
(make-link))
(define (head-element modify cmd-in cmd-out head-in head-out name)
(let ((id-res-ch (cml-sync-ch/make-channel))
(pred (lambda (msg)
(cond
((and (is-cmd-msg? msg)
(is-collect-cmd? (cmd-msg-cmd msg))
(eq? (cmd-msg-cmd msg) (collect-cmd make-link))) #f)
((is-tagged-msg? msg) #t)
(else (error "head-element: wrong type" msg))))))
(spawn
(lambda ()
(cml-sync-ch/send id-res-ch (thread-uid (current-thread)))
; (sink head-out head-in)
(let head-element-lp ((from-tail head-in)
(to-tail head-out))
(let* ((forward-msg (lambda (ch msg)
(cml-async-ch/send-async ch (modify msg))
(cons from-tail to-tail)))
(new-tail-el (lambda (msg)
(let* ((chs (cmd-msg-data msg))
(new-from-tail
(cml-async-ch/make-async-channel))
(new-to-tail
(cml-async-ch/make-async-channel))
(link-in (list-ref chs 0))
(link-out (list-ref chs 1))
(tmp-ch (list-ref chs 2))
(id (tail-element new-to-tail new-from-tail
from-tail to-tail
link-in link-out)))
(cml-async-ch/send-async tmp-ch id)
(cons new-from-tail new-to-tail))))
(chs (cml-rv/select
(cml-rv/wrap (cml-async-ch/receive-async-rv cmd-in)
(lambda (msg)
(if (pred msg)
(forward-msg to-tail msg)
(new-tail-el msg))))
(cml-rv/wrap (cml-async-ch/receive-async-rv from-tail)
(lambda (msg) (forward-msg cmd-out msg))))))
(head-element-lp (car chs) (cdr chs)))))
name)
(cml-sync-ch/receive id-res-ch)))
(define-record-type :collect&reply-channel
(collect&reply/really-make-channel cmd-in cmd-out)
is-collect&reply-channel?
(cmd-in collect&reply-channel-cmd-in)
(cmd-out collect&reply-channel-cmd-out))
(define (collect&reply/make-channel)
(let ((cmd-in (cml-async-ch/make-async-channel))
(cmd-out (cml-async-ch/make-async-channel))
(head-in (cml-async-ch/make-async-channel))
(head-out (cml-async-ch/make-async-channel)))
(head-element no-modify cmd-in cmd-out head-in head-out 'collect&reply)
(collect&reply/really-make-channel cmd-in cmd-out)))
(define (make-link from to)
(let* ((from-->to (cml-async-ch/make-async-channel))
(from<--to (cml-async-ch/make-async-channel))
(to-tmp-ch (cml-async-ch/make-async-channel))
(from-tmp-ch (cml-async-ch/make-async-channel))
(chs-for-to (make-cmd-msg (collect-cmd make-link)
(list from-->to from<--to to-tmp-ch)))
(chs-for-from (make-cmd-msg (collect-cmd make-link)
(list from<--to from-->to from-tmp-ch))))
(cond
((and (is-send&collect-channel? from)
(is-collect&reply-channel? to))
(collect&reply/send to chs-for-to)
(send&collect/send from chs-for-from)
(cml-rv/select
(cml-rv/wrap (cml-async-ch/receive-async-rv from-tmp-ch)
(lambda (id-from)
(cons id-from
(cml-rv/sync
(cml-async-ch/receive-async-rv to-tmp-ch)))))
(cml-rv/wrap (cml-async-ch/receive-async-rv to-tmp-ch)
(lambda (id-to)
(cons (cml-rv/sync (cml-async-ch/receive-async-rv
from-tmp-ch))
id-to)))))
(else (error "make-link: wrong type" from to)))))
(define-record-type :send&collect-channel
(send&collect/really-make-channel cmd-in cmd-out)
is-send&collect-channel?
(cmd-in send&collect-channel-cmd-in)
(cmd-out send&collect-channel-cmd-out))
(define (send&collect/make-channel)
(let ((cmd-in (cml-async-ch/make-async-channel))
(cmd-out (cml-async-ch/make-async-channel))
(head-in (cml-async-ch/make-async-channel))
(head-out (cml-async-ch/make-async-channel)))
(head-element no-modify cmd-in cmd-out head-in head-out 'send&collect)
(send&collect/really-make-channel cmd-in cmd-out)))
(define (collect&reply/receive ch)
(cml-rv/sync
(cml-async-ch/receive-async-rv (collect&reply-channel-cmd-out ch))))
(define (collect&reply/receive-rv ch)
(cml-async-ch/receive-async-rv (collect&reply-channel-cmd-out ch)))
(define (collect&reply/send ch msg)
(cml-async-ch/send-async (collect&reply-channel-cmd-in ch) msg))
(define (send&collect/send ch msg)
(cml-async-ch/send-async (send&collect-channel-cmd-in ch) msg))
(define (send&collect/receive ch)
(cml-rv/sync
(cml-async-ch/receive-async-rv (send&collect-channel-cmd-out ch))))
(define (send&collect/receive-rv ch)
(cml-async-ch/receive-async-rv (send&collect-channel-cmd-out ch)))