sunterlib/s48/cml/channel.scm

101 lines
2.7 KiB
Scheme
Raw Normal View History

(define-record-type :channel
(really-make-channel priority in out)
channel?
(priority channel-priority set-channel-priority!)
;; queue of trans-id * #f
(in channel-in)
;; queue of trans-id * message
(out channel-out))
(define-record-type :q-item
(make-q-item trans-id message cleanup-thunk wrap-proc)
q-item?
(trans-id q-item-trans-id)
(message q-item-message)
(cleanup-thunk q-item-cleanup-thunk)
(wrap-proc q-item-wrap-proc))
(define (make-channel)
(really-make-channel 1 (make-queue) (make-queue)))
(define (channel=? channel-1 channel-2)
(eq? channel-1 channel-2))
(define (clean-and-enqueue! queue value)
(clean-queue-head! queue)
(enqueue! queue value))
(define (clean-and-dequeue! queue)
(let loop ()
(if (queue-empty? queue)
#f
(let ((front (dequeue! queue)))
(if (trans-id-cancelled? (q-item-trans-id front))
(loop)
front)))))
(define (clean-queue-head! queue)
(let loop ()
(if (not (queue-empty? queue))
(let ((front (queue-front queue)))
(if (trans-id-cancelled? (q-item-trans-id front))
(begin
(dequeue! queue)
(loop)))))))
(define (send-rv channel message)
(make-base
(lambda ()
(let ((in (channel-in channel)))
(clean-queue-head! in)
(if (queue-empty? in)
(make-blocked (lambda (trans-id cleanup-thunk wrap-proc)
(clean-and-enqueue! (channel-out channel)
(make-q-item trans-id
message
cleanup-thunk
wrap-proc))))
(let ((priority (channel-priority channel)))
(set-channel-priority! channel (+ 1 priority))
(make-enabled
priority
(lambda ()
(let ((q-item (dequeue! in)))
(set-channel-priority! channel 1)
((q-item-cleanup-thunk q-item))
(cr-trans-id-wakeup (q-item-trans-id q-item)
(cons message
(q-item-wrap-proc q-item)))
(unspecific))))))))))
(define (send channel message)
(sync (send-rv channel message)))
(define (receive-rv channel)
(make-base
(lambda ()
(let ((out (channel-out channel)))
(clean-queue-head! out)
(if (queue-empty? out)
(make-blocked (lambda (trans-id cleanup-thunk wrap-proc)
(clean-and-enqueue! (channel-in channel)
(make-q-item trans-id
#f
cleanup-thunk
wrap-proc))))
(let ((priority (channel-priority channel)))
(set-channel-priority! channel (+ 1 priority))
(make-enabled
priority
(lambda ()
(let ((q-item (dequeue! out)))
(set-channel-priority! channel 1)
((q-item-cleanup-thunk q-item))
(cr-trans-id-wakeup (q-item-trans-id q-item)
(cons (unspecific)
(q-item-wrap-proc q-item)))
(q-item-message q-item))))))))))
(define (receive channel)
(sync (receive-rv channel)))