Add rough, but fairly well-debugged implementation of many CML
primitives.
This commit is contained in:
parent
13d95988e4
commit
91d2bb15c8
|
@ -0,0 +1,38 @@
|
|||
(define-record-type :async-channel
|
||||
(really-make-async-channel in-channel out-channel)
|
||||
async-channel?
|
||||
(in-channel async-channel-in-channel)
|
||||
(out-channel async-channel-out-channel))
|
||||
|
||||
(define (make-async-channel)
|
||||
(let ((in-channel (make-channel))
|
||||
(out-channel (make-channel)))
|
||||
(spawn
|
||||
(lambda ()
|
||||
(let ((queue (make-queue)))
|
||||
(let loop ()
|
||||
(if (queue-empty? queue)
|
||||
(begin
|
||||
(enqueue! queue (receive in-channel))
|
||||
(loop))
|
||||
(select
|
||||
(list
|
||||
(wrap (receive-rv in-channel)
|
||||
(lambda (message)
|
||||
(enqueue! queue message)
|
||||
(loop)))
|
||||
(wrap (send-rv out-channel (queue-front queue))
|
||||
(lambda (ignore)
|
||||
(dequeue! queue)
|
||||
(loop))))))))))
|
||||
(really-make-async-channel in-channel
|
||||
out-channel)))
|
||||
|
||||
(define (send-async channel message)
|
||||
(send (async-channel-in-channel channel) message))
|
||||
|
||||
(define (receive-async-rv channel)
|
||||
(receive-rv (async-channel-out-channel channel)))
|
||||
|
||||
(define (receive-async channel)
|
||||
(sync (receive-async-rv channel)))
|
|
@ -0,0 +1,100 @@
|
|||
(define-record-type :channel
|
||||
(really-make-channel priority in out)
|
||||
channel?
|
||||
(priority channel-priority set-channel-priority!)
|
||||
;; queue of trans-id * #f
|
||||
(in channel-in)
|
||||
;; queue of trans-id * message
|
||||
(out channel-out))
|
||||
|
||||
(define-record-type :q-item
|
||||
(make-q-item trans-id message cleanup-thunk wrap-proc)
|
||||
q-item?
|
||||
(trans-id q-item-trans-id)
|
||||
(message q-item-message)
|
||||
(cleanup-thunk q-item-cleanup-thunk)
|
||||
(wrap-proc q-item-wrap-proc))
|
||||
|
||||
(define (make-channel)
|
||||
(really-make-channel 1 (make-queue) (make-queue)))
|
||||
|
||||
(define (channel=? channel-1 channel-2)
|
||||
(eq? channel-1 channel-2))
|
||||
|
||||
(define (clean-and-enqueue! queue value)
|
||||
(clean-queue-head! queue)
|
||||
(enqueue! queue value))
|
||||
|
||||
(define (clean-and-dequeue! queue)
|
||||
(let loop ()
|
||||
(if (queue-empty? queue)
|
||||
#f
|
||||
(let ((front (dequeue! queue)))
|
||||
(if (trans-id-cancelled? (q-item-trans-id front))
|
||||
(loop)
|
||||
front)))))
|
||||
|
||||
(define (clean-queue-head! queue)
|
||||
(let loop ()
|
||||
(if (not (queue-empty? queue))
|
||||
(let ((front (queue-front queue)))
|
||||
(if (trans-id-cancelled? (q-item-trans-id front))
|
||||
(begin
|
||||
(dequeue! queue)
|
||||
(loop)))))))
|
||||
|
||||
(define (send-rv channel message)
|
||||
(make-base
|
||||
(lambda ()
|
||||
(let ((in (channel-in channel)))
|
||||
(clean-queue-head! in)
|
||||
(if (queue-empty? in)
|
||||
(make-blocked (lambda (trans-id cleanup-thunk wrap-proc)
|
||||
(clean-and-enqueue! (channel-out channel)
|
||||
(make-q-item trans-id
|
||||
message
|
||||
cleanup-thunk
|
||||
wrap-proc))))
|
||||
(let ((priority (channel-priority channel)))
|
||||
(set-channel-priority! channel (+ 1 priority))
|
||||
(make-enabled
|
||||
priority
|
||||
(lambda ()
|
||||
(let ((q-item (dequeue! in)))
|
||||
(set-channel-priority! channel 1)
|
||||
((q-item-cleanup-thunk q-item))
|
||||
(cr-trans-id-wakeup (q-item-trans-id q-item)
|
||||
(cons message
|
||||
(q-item-wrap-proc q-item)))
|
||||
(unspecific))))))))))
|
||||
|
||||
(define (send channel message)
|
||||
(sync (send-rv channel message)))
|
||||
|
||||
(define (receive-rv channel)
|
||||
(make-base
|
||||
(lambda ()
|
||||
(let ((out (channel-out channel)))
|
||||
(clean-queue-head! out)
|
||||
(if (queue-empty? out)
|
||||
(make-blocked (lambda (trans-id cleanup-thunk wrap-proc)
|
||||
(clean-and-enqueue! (channel-in channel)
|
||||
(make-q-item trans-id
|
||||
#f
|
||||
cleanup-thunk
|
||||
wrap-proc))))
|
||||
(let ((priority (channel-priority channel)))
|
||||
(set-channel-priority! channel (+ 1 priority))
|
||||
(make-enabled
|
||||
priority
|
||||
(lambda ()
|
||||
(let ((q-item (dequeue! out)))
|
||||
(set-channel-priority! channel 1)
|
||||
((q-item-cleanup-thunk q-item))
|
||||
(cr-trans-id-wakeup (q-item-trans-id q-item)
|
||||
(cons (unspecific)
|
||||
(q-item-wrap-proc q-item)))
|
||||
(q-item-message q-item))))))))))
|
||||
|
||||
(define (receive channel)
|
||||
(sync (receive-rv channel)))
|
|
@ -0,0 +1,102 @@
|
|||
; Jars (multiple-assignment cells for use with threads)
|
||||
; these are equivalent to ID-90 M-structures
|
||||
|
||||
(define-record-type :jar
|
||||
(really-make-jar priority queue value id)
|
||||
jar?
|
||||
(priority jar-priority set-jar-priority!)
|
||||
(queue jar-queue)
|
||||
(value jar-value set-jar-value!)
|
||||
(id jar-id))
|
||||
|
||||
(define the-empty-jar-value (list 'empty-jar))
|
||||
|
||||
(define (empty-jar-value? thing)
|
||||
(eq? thing the-empty-jar-value))
|
||||
|
||||
(define-record-discloser :jar
|
||||
(lambda (jar)
|
||||
(cons 'jar
|
||||
(if (jar-id jar)
|
||||
(list (jar-id jar))
|
||||
'()))))
|
||||
|
||||
(define-record-type :q-item
|
||||
(make-q-item trans-id cleanup-thunk wrap-proc)
|
||||
q-item?
|
||||
(trans-id q-item-trans-id)
|
||||
(cleanup-thunk q-item-cleanup-thunk)
|
||||
(wrap-proc q-item-wrap-proc))
|
||||
|
||||
(define (clean-and-enqueue! queue value)
|
||||
(clean-queue-head! queue)
|
||||
(enqueue! queue value))
|
||||
|
||||
(define (clean-and-dequeue! queue)
|
||||
(let loop ()
|
||||
(if (queue-empty? queue)
|
||||
#f
|
||||
(let ((front (dequeue! queue)))
|
||||
(if (trans-id-cancelled? (q-item-trans-id front))
|
||||
(loop)
|
||||
front)))))
|
||||
|
||||
(define (clean-queue-head! queue)
|
||||
(let loop ()
|
||||
(if (not (queue-empty? queue))
|
||||
(let ((front (queue-front queue)))
|
||||
(if (trans-id-cancelled? (q-item-trans-id front))
|
||||
(begin
|
||||
(dequeue! queue)
|
||||
(loop)))))))
|
||||
|
||||
(define (make-jar . id-option)
|
||||
(really-make-jar 0
|
||||
(make-queue)
|
||||
the-empty-jar-value
|
||||
(if (null? id-option)
|
||||
#f
|
||||
(car id-option))))
|
||||
|
||||
(define (jar-take-rv jar)
|
||||
(make-base
|
||||
(lambda ()
|
||||
(cond
|
||||
((empty-jar-value? (jar-value jar))
|
||||
(make-blocked
|
||||
(lambda (trans-id cleanup-thunk wrap-proc)
|
||||
(clean-and-enqueue! (jar-queue jar)
|
||||
(make-q-item trans-id
|
||||
cleanup-thunk
|
||||
wrap-proc)))))
|
||||
(else
|
||||
(let ((priority (jar-priority jar)))
|
||||
(set-jar-priority! jar (+ 1 priority))
|
||||
(make-enabled
|
||||
priority
|
||||
(lambda ()
|
||||
(let ((value (jar-value jar)))
|
||||
(set-jar-value! jar the-empty-jar-value)
|
||||
value)))))))))
|
||||
|
||||
(define (jar-put! jar value)
|
||||
(enter-cr!)
|
||||
(cond
|
||||
((empty-jar-value? (jar-value jar))
|
||||
(cond
|
||||
((clean-and-dequeue! (jar-queue jar))
|
||||
=> (lambda (q-item)
|
||||
((q-item-cleanup-thunk q-item))
|
||||
(cr-trans-id-wakeup (q-item-trans-id q-item)
|
||||
(cons value
|
||||
(q-item-wrap-proc q-item)))))
|
||||
(else
|
||||
(set-jar-value! jar value)))
|
||||
(leave-cr!)
|
||||
(unspecific))
|
||||
(else
|
||||
(leave-cr!)
|
||||
(error "jar is already full" jar value))))
|
||||
|
||||
(define (jar-take jar)
|
||||
(sync (jar-take-rv jar)))
|
|
@ -0,0 +1,96 @@
|
|||
(define-interface trans-ids-interface
|
||||
(export enter-cr! leave-cr!
|
||||
leave-cr-and-block!
|
||||
trans-id?
|
||||
make-trans-id
|
||||
cr-trans-id-wait cr-trans-id-wakeup cr-maybe-trans-id-wakeup
|
||||
trans-id-thread-uid trans-id-cancelled?))
|
||||
|
||||
(define-interface rendezvous-interface
|
||||
(export always-rv never-rv
|
||||
guard with-nack choose wrap
|
||||
sync
|
||||
select))
|
||||
|
||||
(define-interface make-rendezvous-interface
|
||||
(export make-blocked make-enabled make-base))
|
||||
|
||||
(define-interface rendezvous-channels-interface
|
||||
(export make-channel
|
||||
channel?
|
||||
send-rv send
|
||||
receive-rv receive))
|
||||
|
||||
(define-interface rendezvous-async-channels-interface
|
||||
(export make-async-channel
|
||||
async-channel?
|
||||
send-async
|
||||
receive-async-rv
|
||||
receive-async))
|
||||
|
||||
(define-interface rendezvous-placeholders-interface
|
||||
(export make-placeholder
|
||||
placeholder?
|
||||
placeholder-value
|
||||
placeholder-set!
|
||||
placeholder-value-rv))
|
||||
|
||||
(define-interface rendezvous-jars-interface
|
||||
(export make-jar
|
||||
jar?
|
||||
jar-take
|
||||
jar-put!
|
||||
jar-take-rv))
|
||||
|
||||
(define-structure trans-ids trans-ids-interface
|
||||
(open scheme
|
||||
srfi-9 big-util
|
||||
threads threads-internal interrupts
|
||||
locks placeholders)
|
||||
(files trans-id))
|
||||
|
||||
(define-structures ((rendezvous rendezvous-interface)
|
||||
(make-rendezvous make-rendezvous-interface))
|
||||
(open scheme
|
||||
srfi-9 (subset define-record-types (define-record-discloser))
|
||||
trans-ids
|
||||
threads threads-internal
|
||||
big-util
|
||||
(subset util (unspecific)))
|
||||
(files rendezvous))
|
||||
|
||||
(define-structure rendezvous-channels rendezvous-channels-interface
|
||||
(open scheme
|
||||
srfi-9
|
||||
trans-ids rendezvous make-rendezvous
|
||||
queues
|
||||
big-util
|
||||
(subset util (unspecific)))
|
||||
(files channel))
|
||||
|
||||
(define-structure rendezvous-async-channels rendezvous-async-channels-interface
|
||||
(open scheme
|
||||
rendezvous
|
||||
rendezvous-channels
|
||||
threads
|
||||
queues
|
||||
srfi-9)
|
||||
(files async-channels))
|
||||
|
||||
(define-structure rendezvous-placeholders rendezvous-placeholders-interface
|
||||
(open scheme
|
||||
srfi-9 (subset define-record-types (define-record-discloser))
|
||||
trans-ids rendezvous make-rendezvous
|
||||
queues
|
||||
signals
|
||||
(subset util (unspecific)))
|
||||
(files placeholder))
|
||||
|
||||
(define-structure rendezvous-jars rendezvous-jars-interface
|
||||
(open scheme
|
||||
srfi-9 (subset define-record-types (define-record-discloser))
|
||||
trans-ids rendezvous make-rendezvous
|
||||
queues
|
||||
signals
|
||||
(subset util (unspecific)))
|
||||
(files jar))
|
|
@ -0,0 +1,101 @@
|
|||
; Placeholders (single-assignment cells for use with threads)
|
||||
|
||||
(define-record-type :placeholder
|
||||
(really-make-placeholder priority queue value id)
|
||||
placeholder?
|
||||
(priority placeholder-priority set-placeholder-priority!)
|
||||
(queue placeholder-queue set-placeholder-queue!)
|
||||
(value placeholder-value-internal set-placeholder-value!)
|
||||
(id placeholder-id))
|
||||
|
||||
(define-record-discloser :placeholder
|
||||
(lambda (placeholder)
|
||||
(cons 'placeholder
|
||||
(if (placeholder-id placeholder)
|
||||
(list (placeholder-id placeholder))
|
||||
'()))))
|
||||
|
||||
(define-record-type :q-item
|
||||
(make-q-item trans-id cleanup-thunk wrap-proc)
|
||||
q-item?
|
||||
(trans-id q-item-trans-id)
|
||||
(cleanup-thunk q-item-cleanup-thunk)
|
||||
(wrap-proc q-item-wrap-proc))
|
||||
|
||||
(define (clean-and-enqueue! queue value)
|
||||
(clean-queue-head! queue)
|
||||
(enqueue! queue value))
|
||||
|
||||
(define (clean-and-dequeue! queue)
|
||||
(let loop ()
|
||||
(if (queue-empty? queue)
|
||||
#f
|
||||
(let ((front (dequeue! queue)))
|
||||
(if (trans-id-cancelled? (q-item-trans-id front))
|
||||
(loop)
|
||||
front)))))
|
||||
|
||||
(define (clean-queue-head! queue)
|
||||
(let loop ()
|
||||
(if (not (queue-empty? queue))
|
||||
(let ((front (queue-front queue)))
|
||||
(if (trans-id-cancelled? (q-item-trans-id front))
|
||||
(begin
|
||||
(dequeue! queue)
|
||||
(loop)))))))
|
||||
|
||||
(define (make-placeholder . id-option)
|
||||
(really-make-placeholder 0
|
||||
(make-queue)
|
||||
(unspecific)
|
||||
(if (null? id-option)
|
||||
#f
|
||||
(car id-option))))
|
||||
|
||||
(define (placeholder-value-rv placeholder)
|
||||
(make-base
|
||||
(lambda ()
|
||||
(cond
|
||||
((placeholder-queue placeholder)
|
||||
=> (lambda (queue)
|
||||
(make-blocked
|
||||
(lambda (trans-id cleanup-thunk wrap-proc)
|
||||
(clean-and-enqueue! queue
|
||||
(make-q-item trans-id
|
||||
cleanup-thunk
|
||||
wrap-proc))))))
|
||||
(else
|
||||
(let ((priority (placeholder-priority placeholder)))
|
||||
(set-placeholder-priority! placeholder (+ 1 priority))
|
||||
(make-enabled
|
||||
priority
|
||||
(lambda ()
|
||||
(placeholder-value-internal placeholder)))))))))
|
||||
|
||||
(define (placeholder-set! placeholder value)
|
||||
(enter-cr!)
|
||||
(cond
|
||||
((placeholder-queue placeholder)
|
||||
=> (lambda (queue)
|
||||
(set-placeholder-value! placeholder value)
|
||||
(set-placeholder-queue! placeholder #f)
|
||||
(let loop ()
|
||||
(cond
|
||||
((clean-and-dequeue! queue)
|
||||
=> (lambda (q-item)
|
||||
((q-item-cleanup-thunk q-item))
|
||||
(cr-trans-id-wakeup (q-item-trans-id q-item)
|
||||
(cons value
|
||||
(q-item-wrap-proc q-item)))
|
||||
(loop)))))
|
||||
(leave-cr!)
|
||||
(unspecific)))
|
||||
(else
|
||||
(leave-cr!)
|
||||
(error "placeholder is already assigned" placeholder value))))
|
||||
|
||||
(define (placeholder-value placeholder)
|
||||
(sync (placeholder-value-rv placeholder)))
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,619 @@
|
|||
(define-record-type :prim-rv
|
||||
(really-make-prim-rv wrap-proc poll-thunk)
|
||||
prim-rv?
|
||||
(wrap-proc prim-rv-wrap-proc)
|
||||
(poll-thunk prim-rv-poll-thunk))
|
||||
|
||||
(define (make-prim-rv poll-thunk)
|
||||
(really-make-prim-rv identity poll-thunk))
|
||||
|
||||
(define-record-type :enabled
|
||||
(make-enabled priority do-thunk)
|
||||
enabled?
|
||||
(priority enabled-priority)
|
||||
(do-thunk enabled-do-thunk))
|
||||
|
||||
;; PROC is a procedure with two arguments:
|
||||
;; a TRANS-ID and a WRAP-PROC.
|
||||
|
||||
;; TRANS-ID is the transaction ID of the blocked thread. WRAP-PROC is
|
||||
;; the complete, composed-together chain of WRAP procedures of the
|
||||
;; event.
|
||||
|
||||
;; The TRANS-ID should be fed, when it's woken up, a pair
|
||||
;; consisting of a return value and a wrap-proc procedure.
|
||||
|
||||
(define-record-type :blocked
|
||||
(make-blocked proc)
|
||||
blocked?
|
||||
(proc blocked-proc))
|
||||
|
||||
(define-record-type :base
|
||||
(really-make-base prim-rvs)
|
||||
base?
|
||||
(prim-rvs base-prim-rvs))
|
||||
|
||||
(define (make-base poll-thunk)
|
||||
(really-make-base (list (make-prim-rv poll-thunk))))
|
||||
|
||||
(define-record-type :choose
|
||||
(make-choose rvs)
|
||||
choose?
|
||||
(rvs choose-rvs))
|
||||
|
||||
(define-record-type :guard
|
||||
(make-guard thunk)
|
||||
guard?
|
||||
(thunk guard-thunk))
|
||||
|
||||
(define-record-type :with-nack
|
||||
(make-nack proc)
|
||||
nack?
|
||||
(proc nack-proc))
|
||||
|
||||
;; Condition variables for internal use
|
||||
|
||||
(define-record-type :cvar
|
||||
(really-make-cvar state)
|
||||
cvar?
|
||||
;; this can be one of the two below:
|
||||
(state cvar-state set-cvar-state!))
|
||||
|
||||
(define-record-type :cvar-unset-state
|
||||
(make-cvar-unset-state blocked)
|
||||
cvar-unset-state?
|
||||
;; this is a list of :CVAR-ITEM
|
||||
(blocked cvar-unset-state-blocked set-cvar-unset-state-blocked!))
|
||||
|
||||
(define-record-type :cvar-item
|
||||
(make-cvar-item trans-id cleanup-thunk wrap-proc)
|
||||
cvar-item?
|
||||
(trans-id cvar-item-trans-id)
|
||||
(cleanup-thunk cvar-item-cleanup-thunk)
|
||||
(wrap-proc cvar-item-wrap-proc))
|
||||
|
||||
(define-record-type :cvar-set-state
|
||||
(make-cvar-set-state priority)
|
||||
cvar-set-state?
|
||||
(priority cvar-set-state-priority set-cvar-set-state-priority!))
|
||||
|
||||
(define (make-cvar)
|
||||
(really-make-cvar (make-cvar-unset-state '())))
|
||||
|
||||
(define (cr-cvar-set! cvar)
|
||||
(let ((state (cvar-state cvar)))
|
||||
(cond
|
||||
((cvar-unset-state? state)
|
||||
(for-each (lambda (cvar-item)
|
||||
((cvar-item-cleanup-thunk cvar-item))
|
||||
(cr-maybe-trans-id-wakeup (cvar-item-trans-id cvar-item)
|
||||
(cons (unspecific)
|
||||
(cvar-item-wrap-proc cvar-item))))
|
||||
(cvar-unset-state-blocked state))
|
||||
(set-cvar-state! cvar (make-cvar-set-state 1)))
|
||||
(else
|
||||
(error "cvar already set")))))
|
||||
|
||||
(define (cvar-get-rv cvar)
|
||||
(make-base
|
||||
(lambda ()
|
||||
(let ((state (cvar-state cvar)))
|
||||
(cond
|
||||
((cvar-set-state? state)
|
||||
(let ((priority (cvar-set-state-priority state)))
|
||||
(set-cvar-set-state-priority! state (+ 1 priority))
|
||||
(make-enabled priority
|
||||
(lambda ()
|
||||
(set-cvar-set-state-priority! state 1)
|
||||
(unspecific)))))
|
||||
(else
|
||||
(make-blocked
|
||||
(lambda (trans-id cleanup-thunk wrap-proc)
|
||||
(set-cvar-unset-state-blocked!
|
||||
state
|
||||
(cons (make-cvar-item trans-id cleanup-thunk wrap-proc)
|
||||
(cvar-unset-state-blocked state)))))))))))
|
||||
|
||||
(define (always-rv value)
|
||||
(make-base
|
||||
(lambda ()
|
||||
(make-enabled -1
|
||||
(lambda ()
|
||||
value)))))
|
||||
|
||||
(define (never-rv)
|
||||
(really-make-base '()))
|
||||
|
||||
(define (guard rv)
|
||||
(make-guard rv))
|
||||
|
||||
(define (with-nack rv)
|
||||
(make-nack rv))
|
||||
|
||||
(define (gather-prim-rvs rev-rvs prim-rvs)
|
||||
(cond
|
||||
((null? rev-rvs) (really-make-base prim-rvs))
|
||||
((not (base? (car rev-rvs)))
|
||||
(if (null? prim-rvs)
|
||||
(gather rev-rvs '())
|
||||
(gather rev-rvs (list (really-make-base prim-rvs)))))
|
||||
;; (car rev-rvs) is base
|
||||
(else
|
||||
(gather-prim-rvs (cdr rev-rvs)
|
||||
(append (base-prim-rvs (car rev-rvs))
|
||||
prim-rvs)))))
|
||||
|
||||
(define (gather rev-rvs rvs)
|
||||
(cond
|
||||
((not (null? rev-rvs))
|
||||
(let ((rv (car rev-rvs)))
|
||||
(cond
|
||||
((choose? rv)
|
||||
(gather (cdr rev-rvs) (append (choose-rvs rv) rvs)))
|
||||
((and (base? rv)
|
||||
(not (null? rvs))
|
||||
(base? (car rvs)))
|
||||
(gather (cdr rev-rvs)
|
||||
(cons (really-make-base (append (base-prim-rvs rv)
|
||||
(base-prim-rvs (car rvs))))
|
||||
(cdr rvs))))
|
||||
(else
|
||||
(gather (cdr rev-rvs) (cons rv rvs))))))
|
||||
((null? (cdr rvs)) (car rvs))
|
||||
(else (make-choose rvs))))
|
||||
|
||||
(define (choose . rvs)
|
||||
(gather-prim-rvs (reverse rvs) '()))
|
||||
|
||||
(define (compose f g)
|
||||
(lambda (x)
|
||||
(f (g x))))
|
||||
|
||||
(define (wrap-prim-rv prim-rv wrap-proc)
|
||||
(really-make-prim-rv (compose wrap-proc
|
||||
(prim-rv-wrap-proc prim-rv))
|
||||
(prim-rv-poll-thunk prim-rv)))
|
||||
|
||||
(define (wrap rv wrap-proc)
|
||||
(cond
|
||||
((base? rv)
|
||||
(really-make-base (map (lambda (prim-rv)
|
||||
(wrap-prim-rv prim-rv wrap-proc))
|
||||
(base-prim-rvs rv))))
|
||||
((choose? rv)
|
||||
(make-choose (map (lambda (rv)
|
||||
(wrap rv wrap-proc))
|
||||
(choose-rvs rv))))
|
||||
((guard? rv)
|
||||
(make-guard (lambda ()
|
||||
(wrap ((guard-thunk rv)) wrap-proc))))
|
||||
((nack? rv)
|
||||
(make-nack (lambda (nack-rv)
|
||||
(wrap ((nack-proc rv) nack-rv) wrap-proc))))))
|
||||
|
||||
(define-record-type :base-group
|
||||
(really-make-base-group prim-rvs)
|
||||
base-group?
|
||||
(prim-rvs base-group-prim-rvs))
|
||||
|
||||
(define-record-discloser :base-group
|
||||
(lambda (base-group)
|
||||
(cons 'base-group
|
||||
(base-group-prim-rvs base-group))))
|
||||
|
||||
(define-record-type :choose-group
|
||||
(make-choose-group groups)
|
||||
choose-group?
|
||||
(groups choose-group-groups))
|
||||
|
||||
(define-record-discloser :choose-group
|
||||
(lambda (choose-group)
|
||||
(cons 'choose-group
|
||||
(choose-group-groups choose-group))))
|
||||
|
||||
(define-record-type :nack-group
|
||||
(make-nack-group cvar group)
|
||||
nack-group?
|
||||
(cvar nack-group-cvar)
|
||||
(group nack-group-group))
|
||||
|
||||
(define-record-discloser :nack-group
|
||||
(lambda (nack-group)
|
||||
(list 'nack-group
|
||||
(nack-group-group nack-group))))
|
||||
|
||||
(define (force-rv rv)
|
||||
(cond
|
||||
((base? rv)
|
||||
(really-make-base-group (base-prim-rvs rv)))
|
||||
(else
|
||||
(really-force-rv rv))))
|
||||
|
||||
(define (force-prim-rvs rvs prim-rvs)
|
||||
(if (null? rvs)
|
||||
(really-make-base-group prim-rvs)
|
||||
(let* ((rv (car rvs))
|
||||
(group (really-force-rv rv)))
|
||||
(cond
|
||||
((base-group? group)
|
||||
(force-prim-rvs (cdr rvs)
|
||||
(append (base-group-prim-rvs group)
|
||||
prim-rvs)))
|
||||
((choose-group? group)
|
||||
(force-rvs (cdr rvs)
|
||||
(append (choose-group-groups group)
|
||||
(list (really-make-base-group prim-rvs)))))
|
||||
(else
|
||||
(force-rvs (cdr rvs)
|
||||
(list group (really-make-base-group prim-rvs))))))))
|
||||
|
||||
(define (force-rvs rvs groups)
|
||||
(cond
|
||||
((not (null? rvs))
|
||||
(let* ((rv (car rvs))
|
||||
(group (really-force-rv rv)))
|
||||
(cond
|
||||
((and (base-group? group)
|
||||
(not (null? groups))
|
||||
(base-group? (car groups)))
|
||||
(force-rvs (cdr rvs)
|
||||
(cons (really-make-base-group
|
||||
(append (base-group-prim-rvs group)
|
||||
(base-group-prim-rvs (car groups))))
|
||||
(cdr groups))))
|
||||
((choose-group? group)
|
||||
(force-rvs (cdr rvs)
|
||||
(append (choose-group-groups group)
|
||||
groups)))
|
||||
(else
|
||||
(force-rvs (cdr rvs) (cons group groups))))))
|
||||
((null? (cdr groups))
|
||||
(car groups))
|
||||
(else
|
||||
(make-choose-group groups))))
|
||||
|
||||
;; this corresponds to force' in Reppy's implementation
|
||||
|
||||
(define (really-force-rv rv)
|
||||
(cond
|
||||
((guard? rv)
|
||||
(really-force-rv ((guard-thunk rv))))
|
||||
((nack? rv)
|
||||
(let ((cvar (make-cvar)))
|
||||
(make-nack-group cvar
|
||||
(really-force-rv
|
||||
((nack-proc rv)
|
||||
(cvar-get-rv cvar))))))
|
||||
((base? rv)
|
||||
(really-make-base-group (base-prim-rvs rv)))
|
||||
((choose? rv)
|
||||
(force-prim-rvs (choose-rvs rv) '()))))
|
||||
|
||||
(define (sync-prim-rv prim-rv)
|
||||
(let ((poll-thunk (prim-rv-poll-thunk prim-rv))
|
||||
(wrap-proc (prim-rv-wrap-proc prim-rv)))
|
||||
(enter-cr!)
|
||||
(let ((status ((prim-rv-poll-thunk prim-rv))))
|
||||
(cond
|
||||
((enabled? status)
|
||||
(let ((value ((enabled-do-thunk status))))
|
||||
(leave-cr!)
|
||||
(wrap-proc value)))
|
||||
((blocked? status)
|
||||
(let ((trans-id (make-trans-id)))
|
||||
((blocked-proc status) trans-id values wrap-proc)
|
||||
(let ((pair (cr-trans-id-wait trans-id)))
|
||||
((cdr pair) (car pair)))))))))
|
||||
|
||||
(define (select-do-thunk priority+do-list n)
|
||||
(cond
|
||||
((null? (cdr priority+do-list))
|
||||
(cdar priority+do-list))
|
||||
(else
|
||||
(let ((priority
|
||||
(lambda (p)
|
||||
(if (= p -1)
|
||||
n
|
||||
p))))
|
||||
(let max ((rest priority+do-list)
|
||||
(maximum 0)
|
||||
(k 0) ; (length do-thunks)
|
||||
(do-list '())) ; #### list of pairs do-thunk * wrap-proc
|
||||
(cond
|
||||
((not (null? rest))
|
||||
(let* ((pair (car rest))
|
||||
(p (priority (car pair)))
|
||||
(stuff (cdr pair)))
|
||||
(cond
|
||||
((> p maximum)
|
||||
(max (cdr rest) p 1 (list stuff)))
|
||||
((= p maximum)
|
||||
(max (cdr rest) maximum (+ 1 k) (cons stuff do-list)))
|
||||
(else
|
||||
(max (cdr rest) maximum k do-list)))))
|
||||
((null? (cdr do-list))
|
||||
(car do-list))
|
||||
(else
|
||||
;; List.nth(doFns, random k)
|
||||
(car do-list))))))))
|
||||
|
||||
(define (sync-prim-rvs prim-rvs)
|
||||
(cond
|
||||
((null? prim-rvs) (block))
|
||||
((null? (cdr prim-rvs)) (sync-prim-rv (car prim-rvs)))
|
||||
(else
|
||||
(let ()
|
||||
|
||||
(define (find-enabled prim-rvs block-procs wrap-procs)
|
||||
(if (null? prim-rvs)
|
||||
(let ((trans-id (make-trans-id)))
|
||||
(for-each (lambda (block-proc wrap-proc)
|
||||
(block-proc trans-id values wrap-proc))
|
||||
block-procs wrap-procs)
|
||||
(let ((pair (cr-trans-id-wait trans-id)))
|
||||
((cdr pair) (car pair))))
|
||||
(let* ((prim-rv (car prim-rvs))
|
||||
(poll-thunk (prim-rv-poll-thunk prim-rv))
|
||||
(wrap-proc (prim-rv-wrap-proc prim-rv))
|
||||
(status (poll-thunk)))
|
||||
(cond
|
||||
((enabled? status)
|
||||
(handle-enabled (cdr prim-rvs)
|
||||
(list
|
||||
(cons (enabled-priority status)
|
||||
(cons (enabled-do-thunk status)
|
||||
wrap-proc)))
|
||||
1))
|
||||
((blocked? status)
|
||||
(find-enabled (cdr prim-rvs)
|
||||
(cons (blocked-proc status)
|
||||
block-procs)
|
||||
(cons wrap-proc wrap-procs)))))))
|
||||
|
||||
(define (handle-enabled prim-rvs priority+do-list priority)
|
||||
(if (null? prim-rvs)
|
||||
(let* ((stuff (select-do-thunk priority+do-list priority))
|
||||
(do-thunk (car stuff))
|
||||
(wrap-proc (cdr stuff)))
|
||||
(let ((value (do-thunk)))
|
||||
(leave-cr!)
|
||||
(wrap-proc value)))
|
||||
(let* ((prim-rv (car prim-rvs))
|
||||
(poll-thunk (prim-rv-poll-thunk prim-rv))
|
||||
(wrap-proc (prim-rv-wrap-proc prim-rv))
|
||||
(status (poll-thunk)))
|
||||
(cond
|
||||
((enabled? status)
|
||||
(handle-enabled (cdr prim-rvs)
|
||||
(cons (cons (enabled-priority status)
|
||||
(cons (enabled-do-thunk status)
|
||||
wrap-proc))
|
||||
priority+do-list)
|
||||
(+ 1 priority)))
|
||||
(else
|
||||
(handle-enabled (cdr prim-rvs)
|
||||
priority+do-list
|
||||
priority))))))
|
||||
|
||||
(enter-cr!)
|
||||
(find-enabled prim-rvs '() '())))))
|
||||
|
||||
(define (sync rv)
|
||||
(let ((group (force-rv rv)))
|
||||
(cond
|
||||
((base-group? group)
|
||||
(sync-prim-rvs (base-group-prim-rvs group)))
|
||||
(else
|
||||
(sync-group group)))))
|
||||
|
||||
(define-record-type :ack-flag
|
||||
(really-make-ack-flag acked?)
|
||||
ack-flag?
|
||||
(acked? flag-acked? set-flag-acked?!))
|
||||
|
||||
(define (make-ack-flag)
|
||||
(really-make-ack-flag #f))
|
||||
|
||||
(define (ack-flag! ack-flag)
|
||||
(set-flag-acked?! ack-flag #t))
|
||||
|
||||
(define-record-type :flag-set
|
||||
(make-flag-set cvar ack-flags)
|
||||
flag-set?
|
||||
(cvar flag-set-cvar)
|
||||
(ack-flags flag-set-ack-flags))
|
||||
|
||||
(define (check-cvars! flag-sets)
|
||||
(for-each check-cvar! flag-sets))
|
||||
|
||||
(define (check-cvar! flag-set)
|
||||
(let loop ((ack-flags (flag-set-ack-flags flag-set)))
|
||||
(cond
|
||||
((null? ack-flags)
|
||||
(cr-cvar-set! (flag-set-cvar flag-set)))
|
||||
((flag-acked? (car ack-flags))
|
||||
(values))
|
||||
(else
|
||||
(loop (cdr ack-flags))))))
|
||||
|
||||
;; this corresponds to syncOnGrp from Reppy's code
|
||||
(define (sync-group group)
|
||||
(call-with-values
|
||||
(lambda () (collect-group group))
|
||||
(lambda (prim-rv+ack-flag-list flag-sets)
|
||||
(if (null? (cdr prim-rv+ack-flag-list))
|
||||
(sync-prim-rv (caar prim-rv+ack-flag-list))
|
||||
(really-sync-group prim-rv+ack-flag-list flag-sets)))))
|
||||
|
||||
;; This is analogous to SYNC-PRIM-RVS
|
||||
|
||||
(define (really-sync-group prim-rv+ack-flag-list flag-sets)
|
||||
|
||||
(define (find-enabled prim-rv+ack-flag-list
|
||||
block-proc+ack-flag-list
|
||||
wrap-procs)
|
||||
(if (null? prim-rv+ack-flag-list)
|
||||
(let ((trans-id (make-trans-id)))
|
||||
(for-each (lambda (block-proc+ack-flag wrap-proc)
|
||||
(let ((block-proc (car block-proc+ack-flag))
|
||||
(ack-flag (cdr block-proc+ack-flag)))
|
||||
(block-proc trans-id
|
||||
(lambda ()
|
||||
(ack-flag! ack-flag)
|
||||
(check-cvars! flag-sets))
|
||||
wrap-proc)))
|
||||
block-proc+ack-flag-list wrap-procs)
|
||||
(let ((pair (cr-trans-id-wait trans-id)))
|
||||
((cdr pair) (car pair))))
|
||||
(let* ((prim-rv (caar prim-rv+ack-flag-list))
|
||||
(ack-flag (cdar prim-rv+ack-flag-list))
|
||||
(poll-thunk (prim-rv-poll-thunk prim-rv))
|
||||
(wrap-proc (prim-rv-wrap-proc prim-rv))
|
||||
(status (poll-thunk)))
|
||||
(cond
|
||||
((enabled? status)
|
||||
(handle-enabled (cdr prim-rv+ack-flag-list)
|
||||
(list
|
||||
(cons (enabled-priority status)
|
||||
(cons (cons (enabled-do-thunk status) ack-flag)
|
||||
wrap-proc)))
|
||||
1))
|
||||
((blocked? status)
|
||||
(find-enabled (cdr prim-rv+ack-flag-list)
|
||||
(cons (cons (blocked-proc status) ack-flag)
|
||||
block-proc+ack-flag-list)
|
||||
(cons wrap-proc wrap-procs)))))))
|
||||
|
||||
(define (handle-enabled prim-rv+ack-flag-list priority+do-list priority)
|
||||
(if (null? prim-rv+ack-flag-list)
|
||||
(let* ((stuff (select-do-thunk priority+do-list priority))
|
||||
(more-stuff (car stuff))
|
||||
(do-thunk (car more-stuff))
|
||||
(ack-flag (cdr more-stuff))
|
||||
(wrap-proc (cdr stuff)))
|
||||
(ack-flag! ack-flag)
|
||||
(check-cvars! flag-sets)
|
||||
(let ((value (do-thunk)))
|
||||
(leave-cr!)
|
||||
(wrap-proc value)))
|
||||
(let* ((prim-rv+ack-flag (car prim-rv+ack-flag-list))
|
||||
(prim-rv (car prim-rv+ack-flag))
|
||||
(ack-flag (cdr prim-rv+ack-flag))
|
||||
(poll-thunk (prim-rv-poll-thunk prim-rv))
|
||||
(wrap-proc (prim-rv-wrap-proc prim-rv))
|
||||
(status (poll-thunk)))
|
||||
(cond
|
||||
((enabled? status)
|
||||
(handle-enabled (cdr prim-rv+ack-flag-list)
|
||||
(cons (cons (enabled-priority status)
|
||||
(cons (cons (enabled-do-thunk status) ack-flag)
|
||||
wrap-proc))
|
||||
priority+do-list)
|
||||
(+ 1 priority)))
|
||||
(else
|
||||
(handle-enabled (cdr prim-rv+ack-flag-list)
|
||||
priority+do-list
|
||||
priority))))))
|
||||
|
||||
(enter-cr!)
|
||||
(find-enabled prim-rv+ack-flag-list '() '()))
|
||||
|
||||
(define (collect-group group)
|
||||
(cond
|
||||
((choose-group? group)
|
||||
(gather-choose-group group))
|
||||
(else
|
||||
(gather-wrapped group '() '()))))
|
||||
|
||||
(define (gather-choose-group group)
|
||||
(let ((ack-flag (make-ack-flag)))
|
||||
(let gather ((group group)
|
||||
(prim-rv+ack-flag-list '())
|
||||
(flag-sets '()))
|
||||
(cond
|
||||
((base-group? group)
|
||||
(let append ((prim-rvs (base-group-prim-rvs group))
|
||||
(prim-rv+ack-flag-list prim-rv+ack-flag-list))
|
||||
(if (null? prim-rvs)
|
||||
(values prim-rv+ack-flag-list flag-sets)
|
||||
(append (cdr prim-rvs)
|
||||
(cons (cons (car prim-rvs) ack-flag)
|
||||
prim-rv+ack-flag-list)))))
|
||||
((choose-group? group)
|
||||
;; fold-left
|
||||
(let loop ((groups (choose-group-groups group))
|
||||
(prim-rv+ack-flag-list prim-rv+ack-flag-list)
|
||||
(flag-sets flag-sets))
|
||||
(if (null? groups)
|
||||
(values prim-rv+ack-flag-list flag-sets)
|
||||
(call-with-values
|
||||
(lambda ()
|
||||
(gather (car groups)
|
||||
prim-rv+ack-flag-list
|
||||
flag-sets))
|
||||
(lambda (prim-rv+ack-flag-list flag-sets)
|
||||
(loop (cdr groups)
|
||||
prim-rv+ack-flag-list
|
||||
flag-sets))))))
|
||||
((nack-group? group)
|
||||
(gather-wrapped group prim-rv+ack-flag-list flag-sets))))))
|
||||
|
||||
(define (gather-wrapped group prim-rv+ack-flag-list flag-sets)
|
||||
(call-with-values
|
||||
(lambda ()
|
||||
(let gather ((group group)
|
||||
(prim-rv+ack-flag-list prim-rv+ack-flag-list)
|
||||
(all-flags '())
|
||||
(flag-sets flag-sets))
|
||||
(cond
|
||||
((base-group? group)
|
||||
(let append ((prim-rvs (base-group-prim-rvs group))
|
||||
(prim-rv+ack-flag-list prim-rv+ack-flag-list)
|
||||
(all-flags all-flags))
|
||||
(if (null? prim-rvs)
|
||||
(values prim-rv+ack-flag-list
|
||||
all-flags
|
||||
flag-sets)
|
||||
(let ((ack-flag (make-ack-flag)))
|
||||
(append (cdr prim-rvs)
|
||||
(cons (cons (car prim-rvs) ack-flag)
|
||||
prim-rv+ack-flag-list)
|
||||
(cons ack-flag all-flags))))))
|
||||
((choose-group? group)
|
||||
;; fold-left
|
||||
(let loop ((groups (choose-group-groups group))
|
||||
(prim-rv+ack-flag-list prim-rv+ack-flag-list)
|
||||
(all-flags all-flags)
|
||||
(flag-sets flag-sets))
|
||||
(if (null? groups)
|
||||
(values prim-rv+ack-flag-list
|
||||
all-flags
|
||||
flag-sets)
|
||||
(call-with-values
|
||||
(lambda ()
|
||||
(gather (car groups)
|
||||
prim-rv+ack-flag-list
|
||||
all-flags
|
||||
flag-sets))
|
||||
(lambda (prim-rv+ack-flag-list all-flags flag-sets)
|
||||
(loop (cdr groups)
|
||||
prim-rv+ack-flag-list all-flags flag-sets))))))
|
||||
((nack-group? group)
|
||||
(call-with-values
|
||||
(lambda ()
|
||||
(gather (nack-group-group group)
|
||||
prim-rv+ack-flag-list
|
||||
'()
|
||||
flag-sets))
|
||||
(lambda (prim-rv+ack-flag-list all-flags-new flag-sets)
|
||||
(values prim-rv+ack-flag-list
|
||||
(append all-flags-new all-flags)
|
||||
(cons (make-flag-set (nack-group-cvar group)
|
||||
all-flags-new)
|
||||
flag-sets))))))))
|
||||
(lambda (prim-rv+ack-flag-list all-flags flag-sets)
|
||||
(values prim-rv+ack-flag-list flag-sets))))
|
||||
|
||||
|
||||
(define (select . rvs)
|
||||
(sync (apply choose rvs)))
|
|
@ -0,0 +1,70 @@
|
|||
(define *cr-lock* (make-lock))
|
||||
|
||||
(define *cr-interrupt-mask* #f)
|
||||
|
||||
(define (enter-cr!)
|
||||
(let ((old-enabled (set-enabled-interrupts! no-interrupts)))
|
||||
(if (= old-enabled no-interrupts)
|
||||
(error "tried to enter critical region from critical region")
|
||||
(set! *cr-interrupt-mask* old-enabled))))
|
||||
|
||||
(define (leave-cr!)
|
||||
(set-enabled-interrupts! *cr-interrupt-mask*))
|
||||
|
||||
(define (leave-cr-and-block!)
|
||||
(leave-cr!)
|
||||
(block))
|
||||
|
||||
(define (in-cr?)
|
||||
(let* ((old-enabled (set-enabled-interrupts! no-interrupts))
|
||||
(yes? (= old-enabled no-interrupts)))
|
||||
(set-enabled-interrupts! old-enabled)
|
||||
yes?))
|
||||
|
||||
|
||||
;; This replaces trans-id REF in Reppy's code
|
||||
|
||||
(define-record-type :trans-id
|
||||
(really-make-trans-id maybe-thread-uid placeholder)
|
||||
trans-id?
|
||||
(maybe-thread-uid trans-id-maybe-thread-uid
|
||||
set-trans-id-maybe-thread-uid!)
|
||||
(placeholder trans-id-placeholder set-trans-id-placeholder!))
|
||||
|
||||
(define (make-trans-id)
|
||||
(really-make-trans-id (thread-uid (current-thread))
|
||||
(make-placeholder)))
|
||||
|
||||
;; this stuff needs to move into WAKEUP
|
||||
(define (cr-trans-id-wait trans-id)
|
||||
(if (not (in-cr?))
|
||||
(error "not in critical region"))
|
||||
(if (trans-id-cancelled? trans-id)
|
||||
(error "wait on cancelled trans-id"))
|
||||
(let ((placeholder (trans-id-placeholder trans-id)))
|
||||
(leave-cr!)
|
||||
(placeholder-value placeholder)))
|
||||
|
||||
(define (cr-trans-id-wakeup trans-id value)
|
||||
(if (not (in-cr?))
|
||||
(error "not in critical region"))
|
||||
(if (trans-id-cancelled? trans-id)
|
||||
(error "wakeup on cancelled trans-id"))
|
||||
(let ((placeholder (trans-id-placeholder trans-id)))
|
||||
(set-trans-id-maybe-thread-uid! trans-id #f)
|
||||
(set-trans-id-placeholder! trans-id 'no-placeholder)
|
||||
(placeholder-set! placeholder value)))
|
||||
|
||||
(define (cr-maybe-trans-id-wakeup trans-id value)
|
||||
(if (not (in-cr?))
|
||||
(error "not in critical region"))
|
||||
(if (not (trans-id-cancelled? trans-id))
|
||||
(cr-trans-id-wakeup trans-id value)))
|
||||
|
||||
(define (trans-id-cancelled? trans-id)
|
||||
(not (trans-id-maybe-thread-uid trans-id)))
|
||||
|
||||
(define (trans-id-thread-uid trans-id)
|
||||
(if (trans-id-cancelled? trans-id)
|
||||
(error "trans-id cancelled"))
|
||||
(trans-id-maybe-thread-uid trans-id))
|
Loading…
Reference in New Issue