101 lines
2.7 KiB
Scheme
101 lines
2.7 KiB
Scheme
(define-record-type :channel
|
|
(really-make-channel priority in out)
|
|
channel?
|
|
(priority channel-priority set-channel-priority!)
|
|
;; queue of trans-id * #f
|
|
(in channel-in)
|
|
;; queue of trans-id * message
|
|
(out channel-out))
|
|
|
|
(define-record-type :q-item
|
|
(make-q-item trans-id message cleanup-thunk wrap-proc)
|
|
q-item?
|
|
(trans-id q-item-trans-id)
|
|
(message q-item-message)
|
|
(cleanup-thunk q-item-cleanup-thunk)
|
|
(wrap-proc q-item-wrap-proc))
|
|
|
|
(define (make-channel)
|
|
(really-make-channel 1 (make-queue) (make-queue)))
|
|
|
|
(define (channel=? channel-1 channel-2)
|
|
(eq? channel-1 channel-2))
|
|
|
|
(define (clean-and-enqueue! queue value)
|
|
(clean-queue-head! queue)
|
|
(enqueue! queue value))
|
|
|
|
(define (clean-and-dequeue! queue)
|
|
(let loop ()
|
|
(if (queue-empty? queue)
|
|
#f
|
|
(let ((front (dequeue! queue)))
|
|
(if (trans-id-cancelled? (q-item-trans-id front))
|
|
(loop)
|
|
front)))))
|
|
|
|
(define (clean-queue-head! queue)
|
|
(let loop ()
|
|
(if (not (queue-empty? queue))
|
|
(let ((front (queue-front queue)))
|
|
(if (trans-id-cancelled? (q-item-trans-id front))
|
|
(begin
|
|
(dequeue! queue)
|
|
(loop)))))))
|
|
|
|
(define (send-rv channel message)
|
|
(make-base
|
|
(lambda ()
|
|
(let ((in (channel-in channel)))
|
|
(clean-queue-head! in)
|
|
(if (queue-empty? in)
|
|
(make-blocked (lambda (trans-id cleanup-thunk wrap-proc)
|
|
(clean-and-enqueue! (channel-out channel)
|
|
(make-q-item trans-id
|
|
message
|
|
cleanup-thunk
|
|
wrap-proc))))
|
|
(let ((priority (channel-priority channel)))
|
|
(set-channel-priority! channel (+ 1 priority))
|
|
(make-enabled
|
|
priority
|
|
(lambda ()
|
|
(let ((q-item (dequeue! in)))
|
|
(set-channel-priority! channel 1)
|
|
((q-item-cleanup-thunk q-item))
|
|
(cr-trans-id-wakeup (q-item-trans-id q-item)
|
|
(cons message
|
|
(q-item-wrap-proc q-item)))
|
|
(unspecific))))))))))
|
|
|
|
(define (send channel message)
|
|
(sync (send-rv channel message)))
|
|
|
|
(define (receive-rv channel)
|
|
(make-base
|
|
(lambda ()
|
|
(let ((out (channel-out channel)))
|
|
(clean-queue-head! out)
|
|
(if (queue-empty? out)
|
|
(make-blocked (lambda (trans-id cleanup-thunk wrap-proc)
|
|
(clean-and-enqueue! (channel-in channel)
|
|
(make-q-item trans-id
|
|
#f
|
|
cleanup-thunk
|
|
wrap-proc))))
|
|
(let ((priority (channel-priority channel)))
|
|
(set-channel-priority! channel (+ 1 priority))
|
|
(make-enabled
|
|
priority
|
|
(lambda ()
|
|
(let ((q-item (dequeue! out)))
|
|
(set-channel-priority! channel 1)
|
|
((q-item-cleanup-thunk q-item))
|
|
(cr-trans-id-wakeup (q-item-trans-id q-item)
|
|
(cons (unspecific)
|
|
(q-item-wrap-proc q-item)))
|
|
(q-item-message q-item))))))))))
|
|
|
|
(define (receive channel)
|
|
(sync (receive-rv channel)))
|