277 lines
10 KiB
Scheme
277 lines
10 KiB
Scheme
|
(define-record-type :collect&reply-channel
|
||
|
(collect&reply/really-make-channel cmd-in cmd-out from-server to-server)
|
||
|
is-collect&reply-channel?
|
||
|
(cmd-in collect&reply-channel-cmd-in)
|
||
|
(cmd-out collect&reply-channel-cmd-out)
|
||
|
(from-server collect&reply-channel-from-server)
|
||
|
(to-server collect&reply-channel-to-server))
|
||
|
|
||
|
(define-record-type :send&collect-channel
|
||
|
(send&collect/really-make-channel cmd-in cmd-out from-server to-server)
|
||
|
is-send&collect-channel?
|
||
|
(cmd-in send&collect-channel-cmd-in)
|
||
|
(cmd-out send&collect-channel-cmd-out)
|
||
|
(from-server send&collect-channel-from-server)
|
||
|
(to-server send&collect-channel-to-server))
|
||
|
|
||
|
(define-enumerated-type collect&reply-cmd :collect&reply-cmd
|
||
|
is-collect&reply-cmd?
|
||
|
the-collect&reply-cmds
|
||
|
collect&reply-cmd-name
|
||
|
collect&reply-cmd-index
|
||
|
(make-link))
|
||
|
|
||
|
(define-enumerated-type send&collect-cmd :send&collect-cmd
|
||
|
is-send&collect-cmd?
|
||
|
the-send&collect-cmds
|
||
|
send&collect-cmd-name
|
||
|
send&collect-cmd-index
|
||
|
(make-link))
|
||
|
|
||
|
(define-record-type :tagged-msg
|
||
|
(make-tagged-msg tag stripped)
|
||
|
is-tagged-msg?
|
||
|
(tag tagged-msg-tag)
|
||
|
(stripped tagged-msg-stripped))
|
||
|
|
||
|
(define (collect&reply/tee2 from-server to-sink from-sink to-server in out)
|
||
|
(let ((tmp-ch (cml-sync-ch/make-channel)))
|
||
|
(spawn
|
||
|
(lambda ()
|
||
|
(let ((tuid (thread-uid (current-thread))))
|
||
|
(cml-sync-ch/send tmp-ch tuid)
|
||
|
(let drink-tee ((collect-rv (cml-sync-ch/receive-rv from-sink))
|
||
|
(reply-rv (cml-sync-ch/receive-rv from-server))
|
||
|
(request-rv (cml-sync-ch/receive-rv in)))
|
||
|
(cml-rv/select
|
||
|
(cml-rv/wrap collect-rv
|
||
|
(lambda (tmsg)
|
||
|
;;; (display "tuid: ") (display tuid)
|
||
|
;;; (display ". collect&reply/tee2: collect-rv.\n")
|
||
|
(cml-sync-ch/send to-server tmsg)))
|
||
|
(cml-rv/wrap reply-rv
|
||
|
(lambda (tmsg)
|
||
|
(let ((msg (tagged-msg-stripped tmsg))
|
||
|
(tag (tagged-msg-tag tmsg)))
|
||
|
;;; (display "tuid: ") (display tuid)
|
||
|
;;; (display ". collect&reply/tee2: reply-rv.\n")
|
||
|
(if (eq? tag tuid)
|
||
|
(cml-sync-ch/send out msg)
|
||
|
(if to-sink
|
||
|
(cml-sync-ch/send to-sink tmsg))))))
|
||
|
(cml-rv/wrap request-rv
|
||
|
(lambda (msg)
|
||
|
;;; (display "tuid: ") (display tuid)
|
||
|
;;; (display ". collect&reply/tee2: request-rv.\n")
|
||
|
(let ((tmsg (make-tagged-msg tuid msg)))
|
||
|
(cml-sync-ch/send to-server tmsg)))))
|
||
|
(drink-tee (cml-sync-ch/receive-rv from-sink)
|
||
|
(cml-sync-ch/receive-rv from-server)
|
||
|
(cml-sync-ch/receive-rv in))))
|
||
|
'collect&reply/tee2))
|
||
|
(cml-sync-ch/receive tmp-ch)))
|
||
|
|
||
|
(define (send&collect/tee2 from-server to-sink from-sink to-server in out)
|
||
|
(let ((tmp-ch (cml-sync-ch/make-channel)))
|
||
|
(spawn
|
||
|
(lambda ()
|
||
|
(let ((tuid (thread-uid (current-thread))))
|
||
|
(cml-sync-ch/send tmp-ch tuid)
|
||
|
(let drink-tee ((collect-rv (cml-sync-ch/receive-rv from-sink))
|
||
|
(send-rv (cml-sync-ch/receive-rv from-server))
|
||
|
(reply-rv (cml-sync-ch/receive-rv in)))
|
||
|
(cml-rv/select
|
||
|
(cml-rv/wrap collect-rv
|
||
|
(lambda (tmsg)
|
||
|
;;; (display "tuid: ") (display tuid)
|
||
|
;;; (display ". send&collect/tee2: collect-rv.\n")
|
||
|
(cml-sync-ch/send to-server tmsg)))
|
||
|
(cml-rv/wrap send-rv
|
||
|
(lambda (tmsg)
|
||
|
(let ((msg (tagged-msg-stripped tmsg))
|
||
|
(tag (tagged-msg-tag tmsg)))
|
||
|
;;; (display "tuid: ") (display tuid)
|
||
|
;;; (display ". send&collect/tee2: send-rv.\n")
|
||
|
(if (eq? tag tuid)
|
||
|
(cml-sync-ch/send out msg)
|
||
|
(if to-sink
|
||
|
(cml-sync-ch/send to-sink tmsg))))))
|
||
|
(cml-rv/wrap reply-rv
|
||
|
(lambda (msg)
|
||
|
;;; (display "tuid: ") (display tuid)
|
||
|
;;; (display ". send&collect/tee2: reply-rv.\n")
|
||
|
(let ((tmsg (make-tagged-msg tuid msg)))
|
||
|
(cml-sync-ch/send to-server tmsg)))))
|
||
|
(drink-tee (cml-sync-ch/receive-rv from-sink)
|
||
|
(cml-sync-ch/receive-rv from-server)
|
||
|
(cml-sync-ch/receive-rv in)))))
|
||
|
'send&collect/tee2)
|
||
|
(cml-sync-ch/receive tmp-ch)))
|
||
|
|
||
|
(define (collect&reply/server cmd-in cmd-out from-server to-server)
|
||
|
(spawn
|
||
|
(lambda ()
|
||
|
(let collect-or-reply ((cmd-rv (cml-sync-ch/receive-rv cmd-in))
|
||
|
(collect-rv (cml-sync-ch/receive-rv to-server)))
|
||
|
(cml-rv/select
|
||
|
(cml-rv/wrap cmd-rv
|
||
|
(lambda (cmd)
|
||
|
(cond
|
||
|
((and (is-collect&reply-cmd? cmd)
|
||
|
(eq? (collect&reply-cmd-name cmd) 'make-link))
|
||
|
(let* ((link-in (cml-sync-ch/receive cmd-in))
|
||
|
(link-out (cml-sync-ch/receive cmd-in))
|
||
|
(new-from-server (cml-sync-ch/make-channel))
|
||
|
(new-to-server (cml-sync-ch/make-channel))
|
||
|
(tuid (collect&reply/tee2 new-from-server
|
||
|
from-server
|
||
|
to-server
|
||
|
new-to-server
|
||
|
link-in
|
||
|
link-out))
|
||
|
(tmp-ch (cml-sync-ch/receive cmd-in)))
|
||
|
;;; (display "collect&reply/server: cmd-rv, tuid: ")
|
||
|
;;; (display (thread-uid (current-thread)))
|
||
|
;;; (newline)
|
||
|
(set! from-server new-from-server)
|
||
|
(set! to-server new-to-server)
|
||
|
(cml-sync-ch/send tmp-ch tuid)))
|
||
|
((is-tagged-msg? cmd)
|
||
|
;;; (display "collect&reply/server: cmd-rv, tuid: ")
|
||
|
;;; (display (thread-uid (current-thread)))
|
||
|
;;; (newline)
|
||
|
(cml-sync-ch/send from-server cmd))
|
||
|
(else
|
||
|
(error "collect&reply: unsupported message type.")))))
|
||
|
(cml-rv/wrap collect-rv
|
||
|
(lambda (request)
|
||
|
;;; (display "collect&reply/server: collect-rv, tuid: ")
|
||
|
;;; (display (thread-uid (current-thread)))
|
||
|
;;; (newline)
|
||
|
(cml-sync-ch/send cmd-out request))))
|
||
|
(collect-or-reply (cml-sync-ch/receive-rv cmd-in)
|
||
|
(cml-sync-ch/receive-rv to-server))))
|
||
|
'collect&reply/server))
|
||
|
|
||
|
(define (send&collect/server cmd-in cmd-out from-server to-server)
|
||
|
(spawn
|
||
|
(lambda ()
|
||
|
(let send-or-collect ((cmd-rv (cml-sync-ch/receive-rv cmd-in))
|
||
|
(reply-rv (cml-sync-ch/receive-rv to-server)))
|
||
|
(cml-rv/select
|
||
|
(cml-rv/wrap cmd-rv
|
||
|
(lambda (cmd)
|
||
|
(cond
|
||
|
((and (is-send&collect-cmd? cmd)
|
||
|
(eq? (send&collect-cmd-name cmd) 'make-link))
|
||
|
(let* ((link-in (cml-sync-ch/receive cmd-in))
|
||
|
(link-out (cml-sync-ch/receive cmd-in))
|
||
|
(new-from-server (cml-sync-ch/make-channel))
|
||
|
(new-to-server (cml-sync-ch/make-channel))
|
||
|
(tuid (send&collect/tee2 new-from-server
|
||
|
from-server
|
||
|
to-server
|
||
|
new-to-server
|
||
|
link-in
|
||
|
link-out))
|
||
|
(tmp-ch (cml-sync-ch/receive cmd-in)))
|
||
|
;;; (display "send&collect/server: cmd-rv, tuid: ")
|
||
|
;;; (display (thread-uid (current-thread)))
|
||
|
;;; (newline)
|
||
|
(set! from-server new-from-server)
|
||
|
(set! to-server new-to-server)
|
||
|
(cml-sync-ch/send tmp-ch tuid)))
|
||
|
((is-tagged-msg? cmd)
|
||
|
;;; (display "send&collect/server: cmd-rv, tuid: ")
|
||
|
;;; (display (thread-uid (current-thread)))
|
||
|
;;; (newline)
|
||
|
(cml-sync-ch/send from-server cmd))
|
||
|
(else
|
||
|
(error "send&collect: unsupported message type.")))))
|
||
|
(cml-rv/wrap reply-rv
|
||
|
(lambda (reply)
|
||
|
;;; (display "send&collect/server: reply-rv, tuid: ")
|
||
|
;;; (display (thread-uid (current-thread)))
|
||
|
;;; (newline)
|
||
|
(cml-sync-ch/send cmd-out reply))))
|
||
|
(send-or-collect (cml-sync-ch/receive-rv cmd-in)
|
||
|
(cml-sync-ch/receive-rv to-server))))
|
||
|
'send&collect/server))
|
||
|
|
||
|
(define (collect&reply/make-sink from-server to-server)
|
||
|
(let ((to-sink #f)
|
||
|
(from-sink (cml-sync-ch/make-channel))
|
||
|
(link-in (cml-sync-ch/make-channel))
|
||
|
(link-out (cml-sync-ch/make-channel)))
|
||
|
(collect&reply/tee2 from-server to-sink from-sink to-server link-in link-out)))
|
||
|
|
||
|
(define (collect&reply/make-channel)
|
||
|
(let ((cmd-in (cml-sync-ch/make-channel))
|
||
|
(cmd-out (cml-sync-ch/make-channel))
|
||
|
(from-server (cml-sync-ch/make-channel))
|
||
|
(to-server (cml-sync-ch/make-channel)))
|
||
|
(collect&reply/make-sink from-server to-server)
|
||
|
(collect&reply/server cmd-in cmd-out from-server to-server)
|
||
|
(collect&reply/really-make-channel cmd-in cmd-out from-server to-server)))
|
||
|
|
||
|
(define (send&collect/make-sink from-server to-server)
|
||
|
(let ((to-sink #f)
|
||
|
(from-sink (cml-sync-ch/make-channel))
|
||
|
(link-in (cml-sync-ch/make-channel))
|
||
|
(link-out (cml-sync-ch/make-channel)))
|
||
|
(send&collect/tee2 from-server to-sink from-sink to-server link-in link-out)))
|
||
|
|
||
|
(define (send&collect/make-channel)
|
||
|
(let ((cmd-in (cml-sync-ch/make-channel))
|
||
|
(cmd-out (cml-sync-ch/make-channel))
|
||
|
(from-server (cml-sync-ch/make-channel))
|
||
|
(to-server (cml-sync-ch/make-channel)))
|
||
|
(send&collect/make-sink from-server to-server)
|
||
|
(send&collect/server cmd-in cmd-out from-server to-server)
|
||
|
(send&collect/really-make-channel cmd-in cmd-out from-server to-server)))
|
||
|
|
||
|
(define (make-link from to)
|
||
|
(let ((from-->to (cml-sync-ch/make-channel))
|
||
|
(from<--to (cml-sync-ch/make-channel))
|
||
|
(tmp-ch (cml-sync-ch/make-channel)))
|
||
|
(cond
|
||
|
((and (is-send&collect-channel? from)
|
||
|
(is-collect&reply-channel? to))
|
||
|
(cml-sync-ch/send (collect&reply-channel-cmd-in to)
|
||
|
(collect&reply-cmd make-link))
|
||
|
(cml-sync-ch/send (collect&reply-channel-cmd-in to) from-->to)
|
||
|
(cml-sync-ch/send (collect&reply-channel-cmd-in to) from<--to)
|
||
|
(cml-sync-ch/send (collect&reply-channel-cmd-in to) tmp-ch)
|
||
|
(cml-sync-ch/receive tmp-ch)
|
||
|
(cml-sync-ch/send (send&collect-channel-cmd-in from)
|
||
|
(send&collect-cmd make-link))
|
||
|
(cml-sync-ch/send (send&collect-channel-cmd-in from) from<--to)
|
||
|
(cml-sync-ch/send (send&collect-channel-cmd-in from) from-->to)
|
||
|
(cml-sync-ch/send (send&collect-channel-cmd-in from) tmp-ch)
|
||
|
(cml-sync-ch/receive tmp-ch))
|
||
|
(else (error "make-link: from/to has/have wrong type.")))))
|
||
|
|
||
|
(define (collect&reply/receive ch)
|
||
|
(cml-sync-ch/receive (collect&reply-channel-cmd-out ch)))
|
||
|
|
||
|
(define (collect&reply/receive-rv ch)
|
||
|
(cml-sync-ch/receive-rv (collect&reply-channel-cmd-out ch)))
|
||
|
|
||
|
(define (collect&reply/send ch msg)
|
||
|
(cml-sync-ch/send (collect&reply-channel-cmd-in ch) msg))
|
||
|
|
||
|
(define (collect&reply/send-rv ch msg)
|
||
|
(cml-sync-ch/send-rv (collect&reply-channel-cmd-in ch) msg))
|
||
|
|
||
|
(define (send&collect/send ch msg)
|
||
|
(cml-sync-ch/send (send&collect-channel-cmd-in ch) msg))
|
||
|
|
||
|
(define (send&collect/send-rv ch msg)
|
||
|
(cml-sync-ch/send-rv (send&collect-channel-cmd-in ch) msg))
|
||
|
|
||
|
(define (send&collect/receive ch)
|
||
|
(cml-sync-ch/receive (send&collect-channel-cmd-out ch)))
|
||
|
|
||
|
(define (send&collect/receive-rv ch)
|
||
|
(cml-sync-ch/receive-rv (send&collect-channel-cmd-out ch)))
|