From 91d2bb15c856214c18dd8de1f52908928196e3a4 Mon Sep 17 00:00:00 2001 From: Mike Sperber Date: Mon, 12 May 2003 07:38:33 +0000 Subject: [PATCH] Add rough, but fairly well-debugged implementation of many CML primitives. --- cml/async-channels.scm | 38 +++ cml/channel.scm | 100 +++++++ cml/jar.scm | 102 +++++++ cml/packages.scm | 96 +++++++ cml/placeholder.scm | 101 +++++++ cml/rendezvous.scm | 619 +++++++++++++++++++++++++++++++++++++++++ cml/trans-id.scm | 70 +++++ 7 files changed, 1126 insertions(+) create mode 100644 cml/async-channels.scm create mode 100644 cml/channel.scm create mode 100644 cml/jar.scm create mode 100644 cml/packages.scm create mode 100644 cml/placeholder.scm create mode 100644 cml/rendezvous.scm create mode 100644 cml/trans-id.scm diff --git a/cml/async-channels.scm b/cml/async-channels.scm new file mode 100644 index 0000000..3313dd9 --- /dev/null +++ b/cml/async-channels.scm @@ -0,0 +1,38 @@ +(define-record-type :async-channel + (really-make-async-channel in-channel out-channel) + async-channel? + (in-channel async-channel-in-channel) + (out-channel async-channel-out-channel)) + +(define (make-async-channel) + (let ((in-channel (make-channel)) + (out-channel (make-channel))) + (spawn + (lambda () + (let ((queue (make-queue))) + (let loop () + (if (queue-empty? queue) + (begin + (enqueue! queue (receive in-channel)) + (loop)) + (select + (list + (wrap (receive-rv in-channel) + (lambda (message) + (enqueue! queue message) + (loop))) + (wrap (send-rv out-channel (queue-front queue)) + (lambda (ignore) + (dequeue! queue) + (loop)))))))))) + (really-make-async-channel in-channel + out-channel))) + +(define (send-async channel message) + (send (async-channel-in-channel channel) message)) + +(define (receive-async-rv channel) + (receive-rv (async-channel-out-channel channel))) + +(define (receive-async channel) + (sync (receive-async-rv channel))) diff --git a/cml/channel.scm b/cml/channel.scm new file mode 100644 index 0000000..66abd70 --- /dev/null +++ b/cml/channel.scm @@ -0,0 +1,100 @@ +(define-record-type :channel + (really-make-channel priority in out) + channel? + (priority channel-priority set-channel-priority!) + ;; queue of trans-id * #f + (in channel-in) + ;; queue of trans-id * message + (out channel-out)) + +(define-record-type :q-item + (make-q-item trans-id message cleanup-thunk wrap-proc) + q-item? + (trans-id q-item-trans-id) + (message q-item-message) + (cleanup-thunk q-item-cleanup-thunk) + (wrap-proc q-item-wrap-proc)) + +(define (make-channel) + (really-make-channel 1 (make-queue) (make-queue))) + +(define (channel=? channel-1 channel-2) + (eq? channel-1 channel-2)) + +(define (clean-and-enqueue! queue value) + (clean-queue-head! queue) + (enqueue! queue value)) + +(define (clean-and-dequeue! queue) + (let loop () + (if (queue-empty? queue) + #f + (let ((front (dequeue! queue))) + (if (trans-id-cancelled? (q-item-trans-id front)) + (loop) + front))))) + +(define (clean-queue-head! queue) + (let loop () + (if (not (queue-empty? queue)) + (let ((front (queue-front queue))) + (if (trans-id-cancelled? (q-item-trans-id front)) + (begin + (dequeue! queue) + (loop))))))) + +(define (send-rv channel message) + (make-base + (lambda () + (let ((in (channel-in channel))) + (clean-queue-head! in) + (if (queue-empty? in) + (make-blocked (lambda (trans-id cleanup-thunk wrap-proc) + (clean-and-enqueue! (channel-out channel) + (make-q-item trans-id + message + cleanup-thunk + wrap-proc)))) + (let ((priority (channel-priority channel))) + (set-channel-priority! channel (+ 1 priority)) + (make-enabled + priority + (lambda () + (let ((q-item (dequeue! in))) + (set-channel-priority! channel 1) + ((q-item-cleanup-thunk q-item)) + (cr-trans-id-wakeup (q-item-trans-id q-item) + (cons message + (q-item-wrap-proc q-item))) + (unspecific)))))))))) + +(define (send channel message) + (sync (send-rv channel message))) + +(define (receive-rv channel) + (make-base + (lambda () + (let ((out (channel-out channel))) + (clean-queue-head! out) + (if (queue-empty? out) + (make-blocked (lambda (trans-id cleanup-thunk wrap-proc) + (clean-and-enqueue! (channel-in channel) + (make-q-item trans-id + #f + cleanup-thunk + wrap-proc)))) + (let ((priority (channel-priority channel))) + (set-channel-priority! channel (+ 1 priority)) + (make-enabled + priority + (lambda () + (let ((q-item (dequeue! out))) + (set-channel-priority! channel 1) + ((q-item-cleanup-thunk q-item)) + (cr-trans-id-wakeup (q-item-trans-id q-item) + (cons (unspecific) + (q-item-wrap-proc q-item))) + (q-item-message q-item)))))))))) + +(define (receive channel) + (sync (receive-rv channel))) diff --git a/cml/jar.scm b/cml/jar.scm new file mode 100644 index 0000000..1f8e5af --- /dev/null +++ b/cml/jar.scm @@ -0,0 +1,102 @@ +; Jars (multiple-assignment cells for use with threads) +; these are equivalent to ID-90 M-structures + +(define-record-type :jar + (really-make-jar priority queue value id) + jar? + (priority jar-priority set-jar-priority!) + (queue jar-queue) + (value jar-value set-jar-value!) + (id jar-id)) + +(define the-empty-jar-value (list 'empty-jar)) + +(define (empty-jar-value? thing) + (eq? thing the-empty-jar-value)) + +(define-record-discloser :jar + (lambda (jar) + (cons 'jar + (if (jar-id jar) + (list (jar-id jar)) + '())))) + +(define-record-type :q-item + (make-q-item trans-id cleanup-thunk wrap-proc) + q-item? + (trans-id q-item-trans-id) + (cleanup-thunk q-item-cleanup-thunk) + (wrap-proc q-item-wrap-proc)) + +(define (clean-and-enqueue! queue value) + (clean-queue-head! queue) + (enqueue! queue value)) + +(define (clean-and-dequeue! queue) + (let loop () + (if (queue-empty? queue) + #f + (let ((front (dequeue! queue))) + (if (trans-id-cancelled? (q-item-trans-id front)) + (loop) + front))))) + +(define (clean-queue-head! queue) + (let loop () + (if (not (queue-empty? queue)) + (let ((front (queue-front queue))) + (if (trans-id-cancelled? (q-item-trans-id front)) + (begin + (dequeue! queue) + (loop))))))) + +(define (make-jar . id-option) + (really-make-jar 0 + (make-queue) + the-empty-jar-value + (if (null? id-option) + #f + (car id-option)))) + +(define (jar-take-rv jar) + (make-base + (lambda () + (cond + ((empty-jar-value? (jar-value jar)) + (make-blocked + (lambda (trans-id cleanup-thunk wrap-proc) + (clean-and-enqueue! (jar-queue jar) + (make-q-item trans-id + cleanup-thunk + wrap-proc))))) + (else + (let ((priority (jar-priority jar))) + (set-jar-priority! jar (+ 1 priority)) + (make-enabled + priority + (lambda () + (let ((value (jar-value jar))) + (set-jar-value! jar the-empty-jar-value) + value))))))))) + +(define (jar-put! jar value) + (enter-cr!) + (cond + ((empty-jar-value? (jar-value jar)) + (cond + ((clean-and-dequeue! (jar-queue jar)) + => (lambda (q-item) + ((q-item-cleanup-thunk q-item)) + (cr-trans-id-wakeup (q-item-trans-id q-item) + (cons value + (q-item-wrap-proc q-item))))) + (else + (set-jar-value! jar value))) + (leave-cr!) + (unspecific)) + (else + (leave-cr!) + (error "jar is already full" jar value)))) + +(define (jar-take jar) + (sync (jar-take-rv jar))) diff --git a/cml/packages.scm b/cml/packages.scm new file mode 100644 index 0000000..3e32002 --- /dev/null +++ b/cml/packages.scm @@ -0,0 +1,96 @@ +(define-interface trans-ids-interface + (export enter-cr! leave-cr! + leave-cr-and-block! + trans-id? + make-trans-id + cr-trans-id-wait cr-trans-id-wakeup cr-maybe-trans-id-wakeup + trans-id-thread-uid trans-id-cancelled?)) + +(define-interface rendezvous-interface + (export always-rv never-rv + guard with-nack choose wrap + sync + select)) + +(define-interface make-rendezvous-interface + (export make-blocked make-enabled make-base)) + +(define-interface rendezvous-channels-interface + (export make-channel + channel? + send-rv send + receive-rv receive)) + +(define-interface rendezvous-async-channels-interface + (export make-async-channel + async-channel? + send-async + receive-async-rv + receive-async)) + +(define-interface rendezvous-placeholders-interface + (export make-placeholder + placeholder? + placeholder-value + placeholder-set! + placeholder-value-rv)) + +(define-interface rendezvous-jars-interface + (export make-jar + jar? + jar-take + jar-put! + jar-take-rv)) + +(define-structure trans-ids trans-ids-interface + (open scheme + srfi-9 big-util + threads threads-internal interrupts + locks placeholders) + (files trans-id)) + +(define-structures ((rendezvous rendezvous-interface) + (make-rendezvous make-rendezvous-interface)) + (open scheme + srfi-9 (subset define-record-types (define-record-discloser)) + trans-ids + threads threads-internal + big-util + (subset util (unspecific))) + (files rendezvous)) + +(define-structure rendezvous-channels rendezvous-channels-interface + (open scheme + srfi-9 + trans-ids rendezvous make-rendezvous + queues + big-util + (subset util (unspecific))) + (files channel)) + +(define-structure rendezvous-async-channels rendezvous-async-channels-interface + (open scheme + rendezvous + rendezvous-channels + threads + queues + srfi-9) + (files async-channels)) + +(define-structure rendezvous-placeholders rendezvous-placeholders-interface + (open scheme + srfi-9 (subset define-record-types (define-record-discloser)) + trans-ids rendezvous make-rendezvous + queues + signals + (subset util (unspecific))) + (files placeholder)) + +(define-structure rendezvous-jars rendezvous-jars-interface + (open scheme + srfi-9 (subset define-record-types (define-record-discloser)) + trans-ids rendezvous make-rendezvous + queues + signals + (subset util (unspecific))) + (files jar)) diff --git a/cml/placeholder.scm b/cml/placeholder.scm new file mode 100644 index 0000000..b4e0e75 --- /dev/null +++ b/cml/placeholder.scm @@ -0,0 +1,101 @@ +; Placeholders (single-assignment cells for use with threads) + +(define-record-type :placeholder + (really-make-placeholder priority queue value id) + placeholder? + (priority placeholder-priority set-placeholder-priority!) + (queue placeholder-queue set-placeholder-queue!) + (value placeholder-value-internal set-placeholder-value!) + (id placeholder-id)) + +(define-record-discloser :placeholder + (lambda (placeholder) + (cons 'placeholder + (if (placeholder-id placeholder) + (list (placeholder-id placeholder)) + '())))) + +(define-record-type :q-item + (make-q-item trans-id cleanup-thunk wrap-proc) + q-item? + (trans-id q-item-trans-id) + (cleanup-thunk q-item-cleanup-thunk) + (wrap-proc q-item-wrap-proc)) + +(define (clean-and-enqueue! queue value) + (clean-queue-head! queue) + (enqueue! queue value)) + +(define (clean-and-dequeue! queue) + (let loop () + (if (queue-empty? queue) + #f + (let ((front (dequeue! queue))) + (if (trans-id-cancelled? (q-item-trans-id front)) + (loop) + front))))) + +(define (clean-queue-head! queue) + (let loop () + (if (not (queue-empty? queue)) + (let ((front (queue-front queue))) + (if (trans-id-cancelled? (q-item-trans-id front)) + (begin + (dequeue! queue) + (loop))))))) + +(define (make-placeholder . id-option) + (really-make-placeholder 0 + (make-queue) + (unspecific) + (if (null? id-option) + #f + (car id-option)))) + +(define (placeholder-value-rv placeholder) + (make-base + (lambda () + (cond + ((placeholder-queue placeholder) + => (lambda (queue) + (make-blocked + (lambda (trans-id cleanup-thunk wrap-proc) + (clean-and-enqueue! queue + (make-q-item trans-id + cleanup-thunk + wrap-proc)))))) + (else + (let ((priority (placeholder-priority placeholder))) + (set-placeholder-priority! placeholder (+ 1 priority)) + (make-enabled + priority + (lambda () + (placeholder-value-internal placeholder))))))))) + +(define (placeholder-set! placeholder value) + (enter-cr!) + (cond + ((placeholder-queue placeholder) + => (lambda (queue) + (set-placeholder-value! placeholder value) + (set-placeholder-queue! placeholder #f) + (let loop () + (cond + ((clean-and-dequeue! queue) + => (lambda (q-item) + ((q-item-cleanup-thunk q-item)) + (cr-trans-id-wakeup (q-item-trans-id q-item) + (cons value + (q-item-wrap-proc q-item))) + (loop))))) + (leave-cr!) + (unspecific))) + (else + (leave-cr!) + (error "placeholder is already assigned" placeholder value)))) + +(define (placeholder-value placeholder) + (sync (placeholder-value-rv placeholder))) + + + \ No newline at end of file diff --git a/cml/rendezvous.scm b/cml/rendezvous.scm new file mode 100644 index 0000000..c29f87f --- /dev/null +++ b/cml/rendezvous.scm @@ -0,0 +1,619 @@ +(define-record-type :prim-rv + (really-make-prim-rv wrap-proc poll-thunk) + prim-rv? + (wrap-proc prim-rv-wrap-proc) + (poll-thunk prim-rv-poll-thunk)) + +(define (make-prim-rv poll-thunk) + (really-make-prim-rv identity poll-thunk)) + +(define-record-type :enabled + (make-enabled priority do-thunk) + enabled? + (priority enabled-priority) + (do-thunk enabled-do-thunk)) + +;; PROC is a procedure with two arguments: +;; a TRANS-ID and a WRAP-PROC. + +;; TRANS-ID is the transaction ID of the blocked thread. WRAP-PROC is +;; the complete, composed-together chain of WRAP procedures of the +;; event. + +;; The TRANS-ID should be fed, when it's woken up, a pair +;; consisting of a return value and a wrap-proc procedure. + +(define-record-type :blocked + (make-blocked proc) + blocked? + (proc blocked-proc)) + +(define-record-type :base + (really-make-base prim-rvs) + base? + (prim-rvs base-prim-rvs)) + +(define (make-base poll-thunk) + (really-make-base (list (make-prim-rv poll-thunk)))) + +(define-record-type :choose + (make-choose rvs) + choose? + (rvs choose-rvs)) + +(define-record-type :guard + (make-guard thunk) + guard? + (thunk guard-thunk)) + +(define-record-type :with-nack + (make-nack proc) + nack? + (proc nack-proc)) + +;; Condition variables for internal use + +(define-record-type :cvar + (really-make-cvar state) + cvar? + ;; this can be one of the two below: + (state cvar-state set-cvar-state!)) + +(define-record-type :cvar-unset-state + (make-cvar-unset-state blocked) + cvar-unset-state? + ;; this is a list of :CVAR-ITEM + (blocked cvar-unset-state-blocked set-cvar-unset-state-blocked!)) + +(define-record-type :cvar-item + (make-cvar-item trans-id cleanup-thunk wrap-proc) + cvar-item? + (trans-id cvar-item-trans-id) + (cleanup-thunk cvar-item-cleanup-thunk) + (wrap-proc cvar-item-wrap-proc)) + +(define-record-type :cvar-set-state + (make-cvar-set-state priority) + cvar-set-state? + (priority cvar-set-state-priority set-cvar-set-state-priority!)) + +(define (make-cvar) + (really-make-cvar (make-cvar-unset-state '()))) + +(define (cr-cvar-set! cvar) + (let ((state (cvar-state cvar))) + (cond + ((cvar-unset-state? state) + (for-each (lambda (cvar-item) + ((cvar-item-cleanup-thunk cvar-item)) + (cr-maybe-trans-id-wakeup (cvar-item-trans-id cvar-item) + (cons (unspecific) + (cvar-item-wrap-proc cvar-item)))) + (cvar-unset-state-blocked state)) + (set-cvar-state! cvar (make-cvar-set-state 1))) + (else + (error "cvar already set"))))) + +(define (cvar-get-rv cvar) + (make-base + (lambda () + (let ((state (cvar-state cvar))) + (cond + ((cvar-set-state? state) + (let ((priority (cvar-set-state-priority state))) + (set-cvar-set-state-priority! state (+ 1 priority)) + (make-enabled priority + (lambda () + (set-cvar-set-state-priority! state 1) + (unspecific))))) + (else + (make-blocked + (lambda (trans-id cleanup-thunk wrap-proc) + (set-cvar-unset-state-blocked! + state + (cons (make-cvar-item trans-id cleanup-thunk wrap-proc) + (cvar-unset-state-blocked state))))))))))) + +(define (always-rv value) + (make-base + (lambda () + (make-enabled -1 + (lambda () + value))))) + +(define (never-rv) + (really-make-base '())) + +(define (guard rv) + (make-guard rv)) + +(define (with-nack rv) + (make-nack rv)) + +(define (gather-prim-rvs rev-rvs prim-rvs) + (cond + ((null? rev-rvs) (really-make-base prim-rvs)) + ((not (base? (car rev-rvs))) + (if (null? prim-rvs) + (gather rev-rvs '()) + (gather rev-rvs (list (really-make-base prim-rvs))))) + ;; (car rev-rvs) is base + (else + (gather-prim-rvs (cdr rev-rvs) + (append (base-prim-rvs (car rev-rvs)) + prim-rvs))))) + +(define (gather rev-rvs rvs) + (cond + ((not (null? rev-rvs)) + (let ((rv (car rev-rvs))) + (cond + ((choose? rv) + (gather (cdr rev-rvs) (append (choose-rvs rv) rvs))) + ((and (base? rv) + (not (null? rvs)) + (base? (car rvs))) + (gather (cdr rev-rvs) + (cons (really-make-base (append (base-prim-rvs rv) + (base-prim-rvs (car rvs)))) + (cdr rvs)))) + (else + (gather (cdr rev-rvs) (cons rv rvs)))))) + ((null? (cdr rvs)) (car rvs)) + (else (make-choose rvs)))) + +(define (choose . rvs) + (gather-prim-rvs (reverse rvs) '())) + +(define (compose f g) + (lambda (x) + (f (g x)))) + +(define (wrap-prim-rv prim-rv wrap-proc) + (really-make-prim-rv (compose wrap-proc + (prim-rv-wrap-proc prim-rv)) + (prim-rv-poll-thunk prim-rv))) + +(define (wrap rv wrap-proc) + (cond + ((base? rv) + (really-make-base (map (lambda (prim-rv) + (wrap-prim-rv prim-rv wrap-proc)) + (base-prim-rvs rv)))) + ((choose? rv) + (make-choose (map (lambda (rv) + (wrap rv wrap-proc)) + (choose-rvs rv)))) + ((guard? rv) + (make-guard (lambda () + (wrap ((guard-thunk rv)) wrap-proc)))) + ((nack? rv) + (make-nack (lambda (nack-rv) + (wrap ((nack-proc rv) nack-rv) wrap-proc)))))) + +(define-record-type :base-group + (really-make-base-group prim-rvs) + base-group? + (prim-rvs base-group-prim-rvs)) + +(define-record-discloser :base-group + (lambda (base-group) + (cons 'base-group + (base-group-prim-rvs base-group)))) + +(define-record-type :choose-group + (make-choose-group groups) + choose-group? + (groups choose-group-groups)) + +(define-record-discloser :choose-group + (lambda (choose-group) + (cons 'choose-group + (choose-group-groups choose-group)))) + +(define-record-type :nack-group + (make-nack-group cvar group) + nack-group? + (cvar nack-group-cvar) + (group nack-group-group)) + +(define-record-discloser :nack-group + (lambda (nack-group) + (list 'nack-group + (nack-group-group nack-group)))) + +(define (force-rv rv) + (cond + ((base? rv) + (really-make-base-group (base-prim-rvs rv))) + (else + (really-force-rv rv)))) + +(define (force-prim-rvs rvs prim-rvs) + (if (null? rvs) + (really-make-base-group prim-rvs) + (let* ((rv (car rvs)) + (group (really-force-rv rv))) + (cond + ((base-group? group) + (force-prim-rvs (cdr rvs) + (append (base-group-prim-rvs group) + prim-rvs))) + ((choose-group? group) + (force-rvs (cdr rvs) + (append (choose-group-groups group) + (list (really-make-base-group prim-rvs))))) + (else + (force-rvs (cdr rvs) + (list group (really-make-base-group prim-rvs)))))))) + +(define (force-rvs rvs groups) + (cond + ((not (null? rvs)) + (let* ((rv (car rvs)) + (group (really-force-rv rv))) + (cond + ((and (base-group? group) + (not (null? groups)) + (base-group? (car groups))) + (force-rvs (cdr rvs) + (cons (really-make-base-group + (append (base-group-prim-rvs group) + (base-group-prim-rvs (car groups)))) + (cdr groups)))) + ((choose-group? group) + (force-rvs (cdr rvs) + (append (choose-group-groups group) + groups))) + (else + (force-rvs (cdr rvs) (cons group groups)))))) + ((null? (cdr groups)) + (car groups)) + (else + (make-choose-group groups)))) + +;; this corresponds to force' in Reppy's implementation + +(define (really-force-rv rv) + (cond + ((guard? rv) + (really-force-rv ((guard-thunk rv)))) + ((nack? rv) + (let ((cvar (make-cvar))) + (make-nack-group cvar + (really-force-rv + ((nack-proc rv) + (cvar-get-rv cvar)))))) + ((base? rv) + (really-make-base-group (base-prim-rvs rv))) + ((choose? rv) + (force-prim-rvs (choose-rvs rv) '())))) + +(define (sync-prim-rv prim-rv) + (let ((poll-thunk (prim-rv-poll-thunk prim-rv)) + (wrap-proc (prim-rv-wrap-proc prim-rv))) + (enter-cr!) + (let ((status ((prim-rv-poll-thunk prim-rv)))) + (cond + ((enabled? status) + (let ((value ((enabled-do-thunk status)))) + (leave-cr!) + (wrap-proc value))) + ((blocked? status) + (let ((trans-id (make-trans-id))) + ((blocked-proc status) trans-id values wrap-proc) + (let ((pair (cr-trans-id-wait trans-id))) + ((cdr pair) (car pair))))))))) + +(define (select-do-thunk priority+do-list n) + (cond + ((null? (cdr priority+do-list)) + (cdar priority+do-list)) + (else + (let ((priority + (lambda (p) + (if (= p -1) + n + p)))) + (let max ((rest priority+do-list) + (maximum 0) + (k 0) ; (length do-thunks) + (do-list '())) ; #### list of pairs do-thunk * wrap-proc + (cond + ((not (null? rest)) + (let* ((pair (car rest)) + (p (priority (car pair))) + (stuff (cdr pair))) + (cond + ((> p maximum) + (max (cdr rest) p 1 (list stuff))) + ((= p maximum) + (max (cdr rest) maximum (+ 1 k) (cons stuff do-list))) + (else + (max (cdr rest) maximum k do-list))))) + ((null? (cdr do-list)) + (car do-list)) + (else + ;; List.nth(doFns, random k) + (car do-list)))))))) + +(define (sync-prim-rvs prim-rvs) + (cond + ((null? prim-rvs) (block)) + ((null? (cdr prim-rvs)) (sync-prim-rv (car prim-rvs))) + (else + (let () + + (define (find-enabled prim-rvs block-procs wrap-procs) + (if (null? prim-rvs) + (let ((trans-id (make-trans-id))) + (for-each (lambda (block-proc wrap-proc) + (block-proc trans-id values wrap-proc)) + block-procs wrap-procs) + (let ((pair (cr-trans-id-wait trans-id))) + ((cdr pair) (car pair)))) + (let* ((prim-rv (car prim-rvs)) + (poll-thunk (prim-rv-poll-thunk prim-rv)) + (wrap-proc (prim-rv-wrap-proc prim-rv)) + (status (poll-thunk))) + (cond + ((enabled? status) + (handle-enabled (cdr prim-rvs) + (list + (cons (enabled-priority status) + (cons (enabled-do-thunk status) + wrap-proc))) + 1)) + ((blocked? status) + (find-enabled (cdr prim-rvs) + (cons (blocked-proc status) + block-procs) + (cons wrap-proc wrap-procs))))))) + + (define (handle-enabled prim-rvs priority+do-list priority) + (if (null? prim-rvs) + (let* ((stuff (select-do-thunk priority+do-list priority)) + (do-thunk (car stuff)) + (wrap-proc (cdr stuff))) + (let ((value (do-thunk))) + (leave-cr!) + (wrap-proc value))) + (let* ((prim-rv (car prim-rvs)) + (poll-thunk (prim-rv-poll-thunk prim-rv)) + (wrap-proc (prim-rv-wrap-proc prim-rv)) + (status (poll-thunk))) + (cond + ((enabled? status) + (handle-enabled (cdr prim-rvs) + (cons (cons (enabled-priority status) + (cons (enabled-do-thunk status) + wrap-proc)) + priority+do-list) + (+ 1 priority))) + (else + (handle-enabled (cdr prim-rvs) + priority+do-list + priority)))))) + + (enter-cr!) + (find-enabled prim-rvs '() '()))))) + +(define (sync rv) + (let ((group (force-rv rv))) + (cond + ((base-group? group) + (sync-prim-rvs (base-group-prim-rvs group))) + (else + (sync-group group))))) + +(define-record-type :ack-flag + (really-make-ack-flag acked?) + ack-flag? + (acked? flag-acked? set-flag-acked?!)) + +(define (make-ack-flag) + (really-make-ack-flag #f)) + +(define (ack-flag! ack-flag) + (set-flag-acked?! ack-flag #t)) + +(define-record-type :flag-set + (make-flag-set cvar ack-flags) + flag-set? + (cvar flag-set-cvar) + (ack-flags flag-set-ack-flags)) + +(define (check-cvars! flag-sets) + (for-each check-cvar! flag-sets)) + +(define (check-cvar! flag-set) + (let loop ((ack-flags (flag-set-ack-flags flag-set))) + (cond + ((null? ack-flags) + (cr-cvar-set! (flag-set-cvar flag-set))) + ((flag-acked? (car ack-flags)) + (values)) + (else + (loop (cdr ack-flags)))))) + +;; this corresponds to syncOnGrp from Reppy's code +(define (sync-group group) + (call-with-values + (lambda () (collect-group group)) + (lambda (prim-rv+ack-flag-list flag-sets) + (if (null? (cdr prim-rv+ack-flag-list)) + (sync-prim-rv (caar prim-rv+ack-flag-list)) + (really-sync-group prim-rv+ack-flag-list flag-sets))))) + +;; This is analogous to SYNC-PRIM-RVS + +(define (really-sync-group prim-rv+ack-flag-list flag-sets) + + (define (find-enabled prim-rv+ack-flag-list + block-proc+ack-flag-list + wrap-procs) + (if (null? prim-rv+ack-flag-list) + (let ((trans-id (make-trans-id))) + (for-each (lambda (block-proc+ack-flag wrap-proc) + (let ((block-proc (car block-proc+ack-flag)) + (ack-flag (cdr block-proc+ack-flag))) + (block-proc trans-id + (lambda () + (ack-flag! ack-flag) + (check-cvars! flag-sets)) + wrap-proc))) + block-proc+ack-flag-list wrap-procs) + (let ((pair (cr-trans-id-wait trans-id))) + ((cdr pair) (car pair)))) + (let* ((prim-rv (caar prim-rv+ack-flag-list)) + (ack-flag (cdar prim-rv+ack-flag-list)) + (poll-thunk (prim-rv-poll-thunk prim-rv)) + (wrap-proc (prim-rv-wrap-proc prim-rv)) + (status (poll-thunk))) + (cond + ((enabled? status) + (handle-enabled (cdr prim-rv+ack-flag-list) + (list + (cons (enabled-priority status) + (cons (cons (enabled-do-thunk status) ack-flag) + wrap-proc))) + 1)) + ((blocked? status) + (find-enabled (cdr prim-rv+ack-flag-list) + (cons (cons (blocked-proc status) ack-flag) + block-proc+ack-flag-list) + (cons wrap-proc wrap-procs))))))) + + (define (handle-enabled prim-rv+ack-flag-list priority+do-list priority) + (if (null? prim-rv+ack-flag-list) + (let* ((stuff (select-do-thunk priority+do-list priority)) + (more-stuff (car stuff)) + (do-thunk (car more-stuff)) + (ack-flag (cdr more-stuff)) + (wrap-proc (cdr stuff))) + (ack-flag! ack-flag) + (check-cvars! flag-sets) + (let ((value (do-thunk))) + (leave-cr!) + (wrap-proc value))) + (let* ((prim-rv+ack-flag (car prim-rv+ack-flag-list)) + (prim-rv (car prim-rv+ack-flag)) + (ack-flag (cdr prim-rv+ack-flag)) + (poll-thunk (prim-rv-poll-thunk prim-rv)) + (wrap-proc (prim-rv-wrap-proc prim-rv)) + (status (poll-thunk))) + (cond + ((enabled? status) + (handle-enabled (cdr prim-rv+ack-flag-list) + (cons (cons (enabled-priority status) + (cons (cons (enabled-do-thunk status) ack-flag) + wrap-proc)) + priority+do-list) + (+ 1 priority))) + (else + (handle-enabled (cdr prim-rv+ack-flag-list) + priority+do-list + priority)))))) + + (enter-cr!) + (find-enabled prim-rv+ack-flag-list '() '())) + +(define (collect-group group) + (cond + ((choose-group? group) + (gather-choose-group group)) + (else + (gather-wrapped group '() '())))) + +(define (gather-choose-group group) + (let ((ack-flag (make-ack-flag))) + (let gather ((group group) + (prim-rv+ack-flag-list '()) + (flag-sets '())) + (cond + ((base-group? group) + (let append ((prim-rvs (base-group-prim-rvs group)) + (prim-rv+ack-flag-list prim-rv+ack-flag-list)) + (if (null? prim-rvs) + (values prim-rv+ack-flag-list flag-sets) + (append (cdr prim-rvs) + (cons (cons (car prim-rvs) ack-flag) + prim-rv+ack-flag-list))))) + ((choose-group? group) + ;; fold-left + (let loop ((groups (choose-group-groups group)) + (prim-rv+ack-flag-list prim-rv+ack-flag-list) + (flag-sets flag-sets)) + (if (null? groups) + (values prim-rv+ack-flag-list flag-sets) + (call-with-values + (lambda () + (gather (car groups) + prim-rv+ack-flag-list + flag-sets)) + (lambda (prim-rv+ack-flag-list flag-sets) + (loop (cdr groups) + prim-rv+ack-flag-list + flag-sets)))))) + ((nack-group? group) + (gather-wrapped group prim-rv+ack-flag-list flag-sets)))))) + +(define (gather-wrapped group prim-rv+ack-flag-list flag-sets) + (call-with-values + (lambda () + (let gather ((group group) + (prim-rv+ack-flag-list prim-rv+ack-flag-list) + (all-flags '()) + (flag-sets flag-sets)) + (cond + ((base-group? group) + (let append ((prim-rvs (base-group-prim-rvs group)) + (prim-rv+ack-flag-list prim-rv+ack-flag-list) + (all-flags all-flags)) + (if (null? prim-rvs) + (values prim-rv+ack-flag-list + all-flags + flag-sets) + (let ((ack-flag (make-ack-flag))) + (append (cdr prim-rvs) + (cons (cons (car prim-rvs) ack-flag) + prim-rv+ack-flag-list) + (cons ack-flag all-flags)))))) + ((choose-group? group) + ;; fold-left + (let loop ((groups (choose-group-groups group)) + (prim-rv+ack-flag-list prim-rv+ack-flag-list) + (all-flags all-flags) + (flag-sets flag-sets)) + (if (null? groups) + (values prim-rv+ack-flag-list + all-flags + flag-sets) + (call-with-values + (lambda () + (gather (car groups) + prim-rv+ack-flag-list + all-flags + flag-sets)) + (lambda (prim-rv+ack-flag-list all-flags flag-sets) + (loop (cdr groups) + prim-rv+ack-flag-list all-flags flag-sets)))))) + ((nack-group? group) + (call-with-values + (lambda () + (gather (nack-group-group group) + prim-rv+ack-flag-list + '() + flag-sets)) + (lambda (prim-rv+ack-flag-list all-flags-new flag-sets) + (values prim-rv+ack-flag-list + (append all-flags-new all-flags) + (cons (make-flag-set (nack-group-cvar group) + all-flags-new) + flag-sets)))))))) + (lambda (prim-rv+ack-flag-list all-flags flag-sets) + (values prim-rv+ack-flag-list flag-sets)))) + + +(define (select . rvs) + (sync (apply choose rvs))) diff --git a/cml/trans-id.scm b/cml/trans-id.scm new file mode 100644 index 0000000..81a125f --- /dev/null +++ b/cml/trans-id.scm @@ -0,0 +1,70 @@ +(define *cr-lock* (make-lock)) + +(define *cr-interrupt-mask* #f) + +(define (enter-cr!) + (let ((old-enabled (set-enabled-interrupts! no-interrupts))) + (if (= old-enabled no-interrupts) + (error "tried to enter critical region from critical region") + (set! *cr-interrupt-mask* old-enabled)))) + +(define (leave-cr!) + (set-enabled-interrupts! *cr-interrupt-mask*)) + +(define (leave-cr-and-block!) + (leave-cr!) + (block)) + +(define (in-cr?) + (let* ((old-enabled (set-enabled-interrupts! no-interrupts)) + (yes? (= old-enabled no-interrupts))) + (set-enabled-interrupts! old-enabled) + yes?)) + + +;; This replaces trans-id REF in Reppy's code + +(define-record-type :trans-id + (really-make-trans-id maybe-thread-uid placeholder) + trans-id? + (maybe-thread-uid trans-id-maybe-thread-uid + set-trans-id-maybe-thread-uid!) + (placeholder trans-id-placeholder set-trans-id-placeholder!)) + +(define (make-trans-id) + (really-make-trans-id (thread-uid (current-thread)) + (make-placeholder))) + +;; this stuff needs to move into WAKEUP +(define (cr-trans-id-wait trans-id) + (if (not (in-cr?)) + (error "not in critical region")) + (if (trans-id-cancelled? trans-id) + (error "wait on cancelled trans-id")) + (let ((placeholder (trans-id-placeholder trans-id))) + (leave-cr!) + (placeholder-value placeholder))) + +(define (cr-trans-id-wakeup trans-id value) + (if (not (in-cr?)) + (error "not in critical region")) + (if (trans-id-cancelled? trans-id) + (error "wakeup on cancelled trans-id")) + (let ((placeholder (trans-id-placeholder trans-id))) + (set-trans-id-maybe-thread-uid! trans-id #f) + (set-trans-id-placeholder! trans-id 'no-placeholder) + (placeholder-set! placeholder value))) + +(define (cr-maybe-trans-id-wakeup trans-id value) + (if (not (in-cr?)) + (error "not in critical region")) + (if (not (trans-id-cancelled? trans-id)) + (cr-trans-id-wakeup trans-id value))) + +(define (trans-id-cancelled? trans-id) + (not (trans-id-maybe-thread-uid trans-id))) + +(define (trans-id-thread-uid trans-id) + (if (trans-id-cancelled? trans-id) + (error "trans-id cancelled")) + (trans-id-maybe-thread-uid trans-id)) \ No newline at end of file