71 lines
1.9 KiB
Scheme
71 lines
1.9 KiB
Scheme
|
(define-record-type :mcast-channel
|
||
|
(really-make-mcast-channel msg-in fan-out reply-ch)
|
||
|
mcast-channel?
|
||
|
(msg-in mcast-channel-msg-in)
|
||
|
(fan-out mcast-channel-fan-out)
|
||
|
(reply-ch mcast-channel-reply-ch))
|
||
|
|
||
|
(define-record-type :mcast-port
|
||
|
(really-make-mcast-port channel)
|
||
|
mcast-port?
|
||
|
(channel channel->mcast-port))
|
||
|
|
||
|
(define-enumerated-type mcast-cmd :mcast-cmd
|
||
|
mcast-cmd?
|
||
|
the-mcast-cmds
|
||
|
mcast-cmd-name
|
||
|
mcast-cmd-index
|
||
|
(new-port new-sink))
|
||
|
|
||
|
(define (tee from-server to-sink)
|
||
|
(let ((to-mbox (make-channel)))
|
||
|
(spawn (lambda ()
|
||
|
(let drink-tee ((msg (receive from-server)))
|
||
|
(cond
|
||
|
((channel? to-sink)
|
||
|
(send to-sink msg)
|
||
|
(send to-mbox msg)))
|
||
|
(drink-tee (receive from-server))))
|
||
|
'mcast-channel/tee)
|
||
|
to-mbox))
|
||
|
|
||
|
(define (sink from-server)
|
||
|
(tee from-server #f))
|
||
|
|
||
|
(define (make-mcast-channel)
|
||
|
(let ((m-in (make-channel))
|
||
|
(f-out (make-channel))
|
||
|
(r-ch (make-channel)))
|
||
|
(spawn (lambda ()
|
||
|
(sink f-out)
|
||
|
(let lp ((msg (receive m-in)))
|
||
|
(cond
|
||
|
((eq? (mcast-cmd-name msg) 'new-port)
|
||
|
(let ((new-f-out (make-channel)))
|
||
|
(send r-ch (tee new-f-out f-out))
|
||
|
(set! f-out new-f-out)))
|
||
|
((eq? (mcast-cmd-name msg) 'new-sink)
|
||
|
(send r-ch (sink f-out)))
|
||
|
(else (send f-out msg)))
|
||
|
(lp (receive m-in))))
|
||
|
'mcast-channel/server)
|
||
|
(really-make-mcast-channel m-in f-out r-ch)))
|
||
|
|
||
|
(define (mcast mcast-ch msg)
|
||
|
(send (mcast-channel-msg-in mcast-ch) msg))
|
||
|
|
||
|
(define (mcast-port mcast-ch)
|
||
|
(send (mcast-channel-msg-in mcast-ch) (mcast-cmd new-port))
|
||
|
(really-make-mcast-port (receive (mcast-channel-reply-ch mcast-ch))))
|
||
|
|
||
|
(define (mcast-port-receive-rv mc-port)
|
||
|
(receive-rv (channel->mcast-port mc-port)))
|
||
|
|
||
|
(define (mcast-port-receive mc-port)
|
||
|
(sync (mcast-port-receive-rv mc-port)))
|
||
|
|
||
|
;; attach the sink to the first port after the server
|
||
|
(define (mcast-new-sink mcast-ch)
|
||
|
(send (mcast-channel-msg-in mcast-ch) (mcast-cmd new-sink))
|
||
|
(receive (mcast-channel-reply-ch mcast-ch)))
|