sunterlib/s48/cml/rendezvous.scm

620 lines
17 KiB
Scheme

(define-record-type :prim-rv
(really-make-prim-rv wrap-proc poll-thunk)
prim-rv?
(wrap-proc prim-rv-wrap-proc)
(poll-thunk prim-rv-poll-thunk))
(define (make-prim-rv poll-thunk)
(really-make-prim-rv identity poll-thunk))
(define-record-type :enabled
(make-enabled priority do-thunk)
enabled?
(priority enabled-priority)
(do-thunk enabled-do-thunk))
;; PROC is a procedure with two arguments:
;; a TRANS-ID and a WRAP-PROC.
;; TRANS-ID is the transaction ID of the blocked thread. WRAP-PROC is
;; the complete, composed-together chain of WRAP procedures of the
;; event.
;; The TRANS-ID should be fed, when it's woken up, a pair
;; consisting of a return value and a wrap-proc procedure.
(define-record-type :blocked
(make-blocked proc)
blocked?
(proc blocked-proc))
(define-record-type :base
(really-make-base prim-rvs)
base?
(prim-rvs base-prim-rvs))
(define (make-base poll-thunk)
(really-make-base (list (make-prim-rv poll-thunk))))
(define-record-type :choose
(make-choose rvs)
choose?
(rvs choose-rvs))
(define-record-type :guard
(make-guard thunk)
guard?
(thunk guard-thunk))
(define-record-type :with-nack
(make-nack proc)
nack?
(proc nack-proc))
;; Condition variables for internal use
(define-record-type :cvar
(really-make-cvar state)
cvar?
;; this can be one of the two below:
(state cvar-state set-cvar-state!))
(define-record-type :cvar-unset-state
(make-cvar-unset-state blocked)
cvar-unset-state?
;; this is a list of :CVAR-ITEM
(blocked cvar-unset-state-blocked set-cvar-unset-state-blocked!))
(define-record-type :cvar-item
(make-cvar-item trans-id cleanup-thunk wrap-proc)
cvar-item?
(trans-id cvar-item-trans-id)
(cleanup-thunk cvar-item-cleanup-thunk)
(wrap-proc cvar-item-wrap-proc))
(define-record-type :cvar-set-state
(make-cvar-set-state priority)
cvar-set-state?
(priority cvar-set-state-priority set-cvar-set-state-priority!))
(define (make-cvar)
(really-make-cvar (make-cvar-unset-state '())))
(define (cr-cvar-set! cvar)
(let ((state (cvar-state cvar)))
(cond
((cvar-unset-state? state)
(for-each (lambda (cvar-item)
((cvar-item-cleanup-thunk cvar-item))
(cr-maybe-trans-id-wakeup (cvar-item-trans-id cvar-item)
(cons (unspecific)
(cvar-item-wrap-proc cvar-item))))
(cvar-unset-state-blocked state))
(set-cvar-state! cvar (make-cvar-set-state 1)))
(else
(error "cvar already set")))))
(define (cvar-get-rv cvar)
(make-base
(lambda ()
(let ((state (cvar-state cvar)))
(cond
((cvar-set-state? state)
(let ((priority (cvar-set-state-priority state)))
(set-cvar-set-state-priority! state (+ 1 priority))
(make-enabled priority
(lambda ()
(set-cvar-set-state-priority! state 1)
(unspecific)))))
(else
(make-blocked
(lambda (trans-id cleanup-thunk wrap-proc)
(set-cvar-unset-state-blocked!
state
(cons (make-cvar-item trans-id cleanup-thunk wrap-proc)
(cvar-unset-state-blocked state)))))))))))
(define (always-rv value)
(make-base
(lambda ()
(make-enabled -1
(lambda ()
value)))))
(define (never-rv)
(really-make-base '()))
(define (guard rv)
(make-guard rv))
(define (with-nack rv)
(make-nack rv))
(define (gather-prim-rvs rev-rvs prim-rvs)
(cond
((null? rev-rvs) (really-make-base prim-rvs))
((not (base? (car rev-rvs)))
(if (null? prim-rvs)
(gather rev-rvs '())
(gather rev-rvs (list (really-make-base prim-rvs)))))
;; (car rev-rvs) is base
(else
(gather-prim-rvs (cdr rev-rvs)
(append (base-prim-rvs (car rev-rvs))
prim-rvs)))))
(define (gather rev-rvs rvs)
(cond
((not (null? rev-rvs))
(let ((rv (car rev-rvs)))
(cond
((choose? rv)
(gather (cdr rev-rvs) (append (choose-rvs rv) rvs)))
((and (base? rv)
(not (null? rvs))
(base? (car rvs)))
(gather (cdr rev-rvs)
(cons (really-make-base (append (base-prim-rvs rv)
(base-prim-rvs (car rvs))))
(cdr rvs))))
(else
(gather (cdr rev-rvs) (cons rv rvs))))))
((null? (cdr rvs)) (car rvs))
(else (make-choose rvs))))
(define (choose . rvs)
(gather-prim-rvs (reverse rvs) '()))
(define (compose f g)
(lambda (x)
(f (g x))))
(define (wrap-prim-rv prim-rv wrap-proc)
(really-make-prim-rv (compose wrap-proc
(prim-rv-wrap-proc prim-rv))
(prim-rv-poll-thunk prim-rv)))
(define (wrap rv wrap-proc)
(cond
((base? rv)
(really-make-base (map (lambda (prim-rv)
(wrap-prim-rv prim-rv wrap-proc))
(base-prim-rvs rv))))
((choose? rv)
(make-choose (map (lambda (rv)
(wrap rv wrap-proc))
(choose-rvs rv))))
((guard? rv)
(make-guard (lambda ()
(wrap ((guard-thunk rv)) wrap-proc))))
((nack? rv)
(make-nack (lambda (nack-rv)
(wrap ((nack-proc rv) nack-rv) wrap-proc))))))
(define-record-type :base-group
(really-make-base-group prim-rvs)
base-group?
(prim-rvs base-group-prim-rvs))
(define-record-discloser :base-group
(lambda (base-group)
(cons 'base-group
(base-group-prim-rvs base-group))))
(define-record-type :choose-group
(make-choose-group groups)
choose-group?
(groups choose-group-groups))
(define-record-discloser :choose-group
(lambda (choose-group)
(cons 'choose-group
(choose-group-groups choose-group))))
(define-record-type :nack-group
(make-nack-group cvar group)
nack-group?
(cvar nack-group-cvar)
(group nack-group-group))
(define-record-discloser :nack-group
(lambda (nack-group)
(list 'nack-group
(nack-group-group nack-group))))
(define (force-rv rv)
(cond
((base? rv)
(really-make-base-group (base-prim-rvs rv)))
(else
(really-force-rv rv))))
(define (force-prim-rvs rvs prim-rvs)
(if (null? rvs)
(really-make-base-group prim-rvs)
(let* ((rv (car rvs))
(group (really-force-rv rv)))
(cond
((base-group? group)
(force-prim-rvs (cdr rvs)
(append (base-group-prim-rvs group)
prim-rvs)))
((choose-group? group)
(force-rvs (cdr rvs)
(append (choose-group-groups group)
(list (really-make-base-group prim-rvs)))))
(else
(force-rvs (cdr rvs)
(list group (really-make-base-group prim-rvs))))))))
(define (force-rvs rvs groups)
(cond
((not (null? rvs))
(let* ((rv (car rvs))
(group (really-force-rv rv)))
(cond
((and (base-group? group)
(not (null? groups))
(base-group? (car groups)))
(force-rvs (cdr rvs)
(cons (really-make-base-group
(append (base-group-prim-rvs group)
(base-group-prim-rvs (car groups))))
(cdr groups))))
((choose-group? group)
(force-rvs (cdr rvs)
(append (choose-group-groups group)
groups)))
(else
(force-rvs (cdr rvs) (cons group groups))))))
((null? (cdr groups))
(car groups))
(else
(make-choose-group groups))))
;; this corresponds to force' in Reppy's implementation
(define (really-force-rv rv)
(cond
((guard? rv)
(really-force-rv ((guard-thunk rv))))
((nack? rv)
(let ((cvar (make-cvar)))
(make-nack-group cvar
(really-force-rv
((nack-proc rv)
(cvar-get-rv cvar))))))
((base? rv)
(really-make-base-group (base-prim-rvs rv)))
((choose? rv)
(force-prim-rvs (choose-rvs rv) '()))))
(define (sync-prim-rv prim-rv)
(let ((poll-thunk (prim-rv-poll-thunk prim-rv))
(wrap-proc (prim-rv-wrap-proc prim-rv)))
(enter-cr!)
(let ((status ((prim-rv-poll-thunk prim-rv))))
(cond
((enabled? status)
(let ((value ((enabled-do-thunk status))))
(leave-cr!)
(wrap-proc value)))
((blocked? status)
(let ((trans-id (make-trans-id)))
((blocked-proc status) trans-id values wrap-proc)
(let ((pair (cr-trans-id-wait trans-id)))
((cdr pair) (car pair)))))))))
(define (select-do-thunk priority+do-list n)
(cond
((null? (cdr priority+do-list))
(cdar priority+do-list))
(else
(let ((priority
(lambda (p)
(if (= p -1)
n
p))))
(let max ((rest priority+do-list)
(maximum 0)
(k 0) ; (length do-thunks)
(do-list '())) ; #### list of pairs do-thunk * wrap-proc
(cond
((not (null? rest))
(let* ((pair (car rest))
(p (priority (car pair)))
(stuff (cdr pair)))
(cond
((> p maximum)
(max (cdr rest) p 1 (list stuff)))
((= p maximum)
(max (cdr rest) maximum (+ 1 k) (cons stuff do-list)))
(else
(max (cdr rest) maximum k do-list)))))
((null? (cdr do-list))
(car do-list))
(else
;; List.nth(doFns, random k)
(car do-list))))))))
(define (sync-prim-rvs prim-rvs)
(cond
((null? prim-rvs) (block))
((null? (cdr prim-rvs)) (sync-prim-rv (car prim-rvs)))
(else
(let ()
(define (find-enabled prim-rvs block-procs wrap-procs)
(if (null? prim-rvs)
(let ((trans-id (make-trans-id)))
(for-each (lambda (block-proc wrap-proc)
(block-proc trans-id values wrap-proc))
block-procs wrap-procs)
(let ((pair (cr-trans-id-wait trans-id)))
((cdr pair) (car pair))))
(let* ((prim-rv (car prim-rvs))
(poll-thunk (prim-rv-poll-thunk prim-rv))
(wrap-proc (prim-rv-wrap-proc prim-rv))
(status (poll-thunk)))
(cond
((enabled? status)
(handle-enabled (cdr prim-rvs)
(list
(cons (enabled-priority status)
(cons (enabled-do-thunk status)
wrap-proc)))
1))
((blocked? status)
(find-enabled (cdr prim-rvs)
(cons (blocked-proc status)
block-procs)
(cons wrap-proc wrap-procs)))))))
(define (handle-enabled prim-rvs priority+do-list priority)
(if (null? prim-rvs)
(let* ((stuff (select-do-thunk priority+do-list priority))
(do-thunk (car stuff))
(wrap-proc (cdr stuff)))
(let ((value (do-thunk)))
(leave-cr!)
(wrap-proc value)))
(let* ((prim-rv (car prim-rvs))
(poll-thunk (prim-rv-poll-thunk prim-rv))
(wrap-proc (prim-rv-wrap-proc prim-rv))
(status (poll-thunk)))
(cond
((enabled? status)
(handle-enabled (cdr prim-rvs)
(cons (cons (enabled-priority status)
(cons (enabled-do-thunk status)
wrap-proc))
priority+do-list)
(+ 1 priority)))
(else
(handle-enabled (cdr prim-rvs)
priority+do-list
priority))))))
(enter-cr!)
(find-enabled prim-rvs '() '())))))
(define (sync rv)
(let ((group (force-rv rv)))
(cond
((base-group? group)
(sync-prim-rvs (base-group-prim-rvs group)))
(else
(sync-group group)))))
(define-record-type :ack-flag
(really-make-ack-flag acked?)
ack-flag?
(acked? flag-acked? set-flag-acked?!))
(define (make-ack-flag)
(really-make-ack-flag #f))
(define (ack-flag! ack-flag)
(set-flag-acked?! ack-flag #t))
(define-record-type :flag-set
(make-flag-set cvar ack-flags)
flag-set?
(cvar flag-set-cvar)
(ack-flags flag-set-ack-flags))
(define (check-cvars! flag-sets)
(for-each check-cvar! flag-sets))
(define (check-cvar! flag-set)
(let loop ((ack-flags (flag-set-ack-flags flag-set)))
(cond
((null? ack-flags)
(cr-cvar-set! (flag-set-cvar flag-set)))
((flag-acked? (car ack-flags))
(values))
(else
(loop (cdr ack-flags))))))
;; this corresponds to syncOnGrp from Reppy's code
(define (sync-group group)
(call-with-values
(lambda () (collect-group group))
(lambda (prim-rv+ack-flag-list flag-sets)
(if (null? (cdr prim-rv+ack-flag-list))
(sync-prim-rv (caar prim-rv+ack-flag-list))
(really-sync-group prim-rv+ack-flag-list flag-sets)))))
;; This is analogous to SYNC-PRIM-RVS
(define (really-sync-group prim-rv+ack-flag-list flag-sets)
(define (find-enabled prim-rv+ack-flag-list
block-proc+ack-flag-list
wrap-procs)
(if (null? prim-rv+ack-flag-list)
(let ((trans-id (make-trans-id)))
(for-each (lambda (block-proc+ack-flag wrap-proc)
(let ((block-proc (car block-proc+ack-flag))
(ack-flag (cdr block-proc+ack-flag)))
(block-proc trans-id
(lambda ()
(ack-flag! ack-flag)
(check-cvars! flag-sets))
wrap-proc)))
block-proc+ack-flag-list wrap-procs)
(let ((pair (cr-trans-id-wait trans-id)))
((cdr pair) (car pair))))
(let* ((prim-rv (caar prim-rv+ack-flag-list))
(ack-flag (cdar prim-rv+ack-flag-list))
(poll-thunk (prim-rv-poll-thunk prim-rv))
(wrap-proc (prim-rv-wrap-proc prim-rv))
(status (poll-thunk)))
(cond
((enabled? status)
(handle-enabled (cdr prim-rv+ack-flag-list)
(list
(cons (enabled-priority status)
(cons (cons (enabled-do-thunk status) ack-flag)
wrap-proc)))
1))
((blocked? status)
(find-enabled (cdr prim-rv+ack-flag-list)
(cons (cons (blocked-proc status) ack-flag)
block-proc+ack-flag-list)
(cons wrap-proc wrap-procs)))))))
(define (handle-enabled prim-rv+ack-flag-list priority+do-list priority)
(if (null? prim-rv+ack-flag-list)
(let* ((stuff (select-do-thunk priority+do-list priority))
(more-stuff (car stuff))
(do-thunk (car more-stuff))
(ack-flag (cdr more-stuff))
(wrap-proc (cdr stuff)))
(ack-flag! ack-flag)
(check-cvars! flag-sets)
(let ((value (do-thunk)))
(leave-cr!)
(wrap-proc value)))
(let* ((prim-rv+ack-flag (car prim-rv+ack-flag-list))
(prim-rv (car prim-rv+ack-flag))
(ack-flag (cdr prim-rv+ack-flag))
(poll-thunk (prim-rv-poll-thunk prim-rv))
(wrap-proc (prim-rv-wrap-proc prim-rv))
(status (poll-thunk)))
(cond
((enabled? status)
(handle-enabled (cdr prim-rv+ack-flag-list)
(cons (cons (enabled-priority status)
(cons (cons (enabled-do-thunk status) ack-flag)
wrap-proc))
priority+do-list)
(+ 1 priority)))
(else
(handle-enabled (cdr prim-rv+ack-flag-list)
priority+do-list
priority))))))
(enter-cr!)
(find-enabled prim-rv+ack-flag-list '() '()))
(define (collect-group group)
(cond
((choose-group? group)
(gather-choose-group group))
(else
(gather-wrapped group '() '()))))
(define (gather-choose-group group)
(let ((ack-flag (make-ack-flag)))
(let gather ((group group)
(prim-rv+ack-flag-list '())
(flag-sets '()))
(cond
((base-group? group)
(let append ((prim-rvs (base-group-prim-rvs group))
(prim-rv+ack-flag-list prim-rv+ack-flag-list))
(if (null? prim-rvs)
(values prim-rv+ack-flag-list flag-sets)
(append (cdr prim-rvs)
(cons (cons (car prim-rvs) ack-flag)
prim-rv+ack-flag-list)))))
((choose-group? group)
;; fold-left
(let loop ((groups (choose-group-groups group))
(prim-rv+ack-flag-list prim-rv+ack-flag-list)
(flag-sets flag-sets))
(if (null? groups)
(values prim-rv+ack-flag-list flag-sets)
(call-with-values
(lambda ()
(gather (car groups)
prim-rv+ack-flag-list
flag-sets))
(lambda (prim-rv+ack-flag-list flag-sets)
(loop (cdr groups)
prim-rv+ack-flag-list
flag-sets))))))
((nack-group? group)
(gather-wrapped group prim-rv+ack-flag-list flag-sets))))))
(define (gather-wrapped group prim-rv+ack-flag-list flag-sets)
(call-with-values
(lambda ()
(let gather ((group group)
(prim-rv+ack-flag-list prim-rv+ack-flag-list)
(all-flags '())
(flag-sets flag-sets))
(cond
((base-group? group)
(let append ((prim-rvs (base-group-prim-rvs group))
(prim-rv+ack-flag-list prim-rv+ack-flag-list)
(all-flags all-flags))
(if (null? prim-rvs)
(values prim-rv+ack-flag-list
all-flags
flag-sets)
(let ((ack-flag (make-ack-flag)))
(append (cdr prim-rvs)
(cons (cons (car prim-rvs) ack-flag)
prim-rv+ack-flag-list)
(cons ack-flag all-flags))))))
((choose-group? group)
;; fold-left
(let loop ((groups (choose-group-groups group))
(prim-rv+ack-flag-list prim-rv+ack-flag-list)
(all-flags all-flags)
(flag-sets flag-sets))
(if (null? groups)
(values prim-rv+ack-flag-list
all-flags
flag-sets)
(call-with-values
(lambda ()
(gather (car groups)
prim-rv+ack-flag-list
all-flags
flag-sets))
(lambda (prim-rv+ack-flag-list all-flags flag-sets)
(loop (cdr groups)
prim-rv+ack-flag-list all-flags flag-sets))))))
((nack-group? group)
(call-with-values
(lambda ()
(gather (nack-group-group group)
prim-rv+ack-flag-list
'()
flag-sets))
(lambda (prim-rv+ack-flag-list all-flags-new flag-sets)
(values prim-rv+ack-flag-list
(append all-flags-new all-flags)
(cons (make-flag-set (nack-group-cvar group)
all-flags-new)
flag-sets))))))))
(lambda (prim-rv+ack-flag-list all-flags flag-sets)
(values prim-rv+ack-flag-list flag-sets))))
(define (select . rvs)
(sync (apply choose rvs)))