scsh-make/mcast-channels.scm

71 lines
1.9 KiB
Scheme
Raw Permalink Normal View History

(define-record-type :mcast-channel
(really-make-mcast-channel msg-in fan-out reply-ch)
mcast-channel?
(msg-in mcast-channel-msg-in)
(fan-out mcast-channel-fan-out)
(reply-ch mcast-channel-reply-ch))
(define-record-type :mcast-port
(really-make-mcast-port channel)
mcast-port?
(channel channel->mcast-port))
(define-enumerated-type mcast-cmd :mcast-cmd
mcast-cmd?
the-mcast-cmds
mcast-cmd-name
mcast-cmd-index
(new-port new-sink))
(define (tee from-server to-sink)
(let ((to-mbox (make-channel)))
(spawn (lambda ()
(let drink-tee ((msg (receive from-server)))
(cond
((channel? to-sink)
(send to-sink msg)
(send to-mbox msg)))
(drink-tee (receive from-server))))
'mcast-channel/tee)
to-mbox))
(define (sink from-server)
(tee from-server #f))
(define (make-mcast-channel)
(let ((m-in (make-channel))
(f-out (make-channel))
(r-ch (make-channel)))
(spawn (lambda ()
(sink f-out)
(let lp ((msg (receive m-in)))
(cond
((eq? (mcast-cmd-name msg) 'new-port)
(let ((new-f-out (make-channel)))
(send r-ch (tee new-f-out f-out))
(set! f-out new-f-out)))
((eq? (mcast-cmd-name msg) 'new-sink)
(send r-ch (sink f-out)))
(else (send f-out msg)))
(lp (receive m-in))))
'mcast-channel/server)
(really-make-mcast-channel m-in f-out r-ch)))
(define (mcast mcast-ch msg)
(send (mcast-channel-msg-in mcast-ch) msg))
(define (mcast-port mcast-ch)
(send (mcast-channel-msg-in mcast-ch) (mcast-cmd new-port))
(really-make-mcast-port (receive (mcast-channel-reply-ch mcast-ch))))
(define (mcast-port-receive-rv mc-port)
(receive-rv (channel->mcast-port mc-port)))
(define (mcast-port-receive mc-port)
(sync (mcast-port-receive-rv mc-port)))
;; attach the sink to the first port after the server
(define (mcast-new-sink mcast-ch)
(send (mcast-channel-msg-in mcast-ch) (mcast-cmd new-sink))
(receive (mcast-channel-reply-ch mcast-ch)))