Replace Martin's implementation of selective blocking by the one in
Scheme 48 1.0.1. Namely, instead of associating a list of queues with every thread, we associate a single cell, holding the thread. That cell is stored in thread queues, and once a thread is made runnable again, the cell is set to #f. The thread-queue accessors ignore cells containing #f. Implement an experimental OBTAIN-LOCK-MULTIPLE to test the whole thing.
This commit is contained in:
parent
51230dfab1
commit
ac343ba970
|
@ -16,17 +16,14 @@
|
|||
'()))))
|
||||
|
||||
(define (make-placeholder . id-option)
|
||||
(really-make-placeholder (make-thread-queue)
|
||||
(really-make-placeholder (make-queue)
|
||||
(if (null? id-option) #f (car id-option))))
|
||||
|
||||
(define (placeholder-value placeholder)
|
||||
(with-interrupts-inhibited
|
||||
(lambda ()
|
||||
(if (placeholder-queue placeholder)
|
||||
(begin
|
||||
(enqueue-thread! (placeholder-queue placeholder)
|
||||
(current-thread))
|
||||
(block)))
|
||||
(block-on-queue (placeholder-queue placeholder)))
|
||||
(placeholder-real-value placeholder))))
|
||||
|
||||
(define (placeholder-set! placeholder value)
|
||||
|
@ -36,11 +33,13 @@
|
|||
(cond (queue
|
||||
(set-placeholder-value! placeholder value)
|
||||
(set-placeholder-queue! placeholder #f)
|
||||
(do ((waiters '() (cons (dequeue-thread! queue)
|
||||
waiters)))
|
||||
((thread-queue-empty? queue)
|
||||
waiters)))
|
||||
(else #f)))))))
|
||||
(let loop ((waiters '()))
|
||||
(cond
|
||||
((maybe-dequeue-thread! queue)
|
||||
=> (lambda (thread)
|
||||
(loop (cons thread waiters))))
|
||||
(else
|
||||
waiters))))))))))
|
||||
(if waiters
|
||||
(for-each make-ready waiters)
|
||||
(if (not (eq? value (placeholder-value placeholder)))
|
||||
|
|
|
@ -26,9 +26,11 @@
|
|||
; The procedures for manipulating queues.
|
||||
|
||||
(define (queue-empty? q)
|
||||
;; (debug-message "queue-empty?" (queue? q))
|
||||
(null? (queue-head q)))
|
||||
|
||||
(define (enqueue! q v)
|
||||
;; (debug-message "enqueue!" (queue? q))
|
||||
(let ((p (cons v '())))
|
||||
(if (null? (queue-head q)) ;(queue-empty? q)
|
||||
(set-queue-head! q p)
|
||||
|
@ -36,11 +38,13 @@
|
|||
(set-queue-tail! q p)))
|
||||
|
||||
(define (queue-front q)
|
||||
;; (debug-message "queue-front" (queue? q))
|
||||
(if (queue-empty? q)
|
||||
(error "queue is empty" q)
|
||||
(car (queue-head q))))
|
||||
|
||||
(define (dequeue! q)
|
||||
;; (debug-message "dequeue!" (queue? q))
|
||||
(let ((pair (queue-head q)))
|
||||
(cond ((null? pair) ;(queue-empty? q)
|
||||
(error "empty queue" q))
|
||||
|
@ -52,7 +56,25 @@
|
|||
(set-queue-tail! q '())) ; don't retain pointers
|
||||
value)))))
|
||||
|
||||
; Same again, except that we return #F if the queue is empty.
|
||||
; This is a simple way of avoiding a race condition if the queue is known
|
||||
; not to contain #F.
|
||||
|
||||
(define (maybe-dequeue! q)
|
||||
;; (debug-message "maybe-dequeue!" (queue? q))
|
||||
(let ((pair (queue-head q)))
|
||||
(cond ((null? pair) ;(queue-empty? q)
|
||||
#f)
|
||||
(else
|
||||
(let ((value (car pair))
|
||||
(next (cdr pair)))
|
||||
(set-queue-head! q next)
|
||||
(if (null? next)
|
||||
(set-queue-tail! q '())) ; don't retain pointers
|
||||
value)))))
|
||||
|
||||
(define (on-queue? v q)
|
||||
;; (debug-message "on-queue!" (queue? q))
|
||||
(memq v (queue-head q)))
|
||||
|
||||
; This removes the first occurrence of V from Q.
|
||||
|
@ -61,6 +83,7 @@
|
|||
(delete-from-queue-if! q (lambda (x) (eq? x v))))
|
||||
|
||||
(define (delete-from-queue-if! q pred)
|
||||
;; (debug-message "delete-from-queue-if!" (queue? q))
|
||||
(let ((list (queue-head q)))
|
||||
(cond ((null? list)
|
||||
#f)
|
||||
|
|
|
@ -152,7 +152,7 @@
|
|||
; lazily generated list of this level's threads
|
||||
|
||||
(define (make-command-level repl-thunk repl-data dynamic-env levels throw)
|
||||
(let ((level (really-make-command-level (make-thread-queue)
|
||||
(let ((level (really-make-command-level (make-queue)
|
||||
(make-counter)
|
||||
dynamic-env
|
||||
levels
|
||||
|
@ -172,7 +172,7 @@
|
|||
(let ((thread (make-thread thunk (command-level-dynamic-env level) id)))
|
||||
(set-thread-scheduler! thread (command-thread))
|
||||
(set-thread-data! thread level)
|
||||
(enqueue-thread! (command-level-queue level) thread)
|
||||
(enqueue! (command-level-queue level) thread)
|
||||
(increment-counter! (command-level-thread-counter level))
|
||||
thread))
|
||||
|
||||
|
@ -294,12 +294,7 @@
|
|||
(*out?* #f))
|
||||
(for-each (lambda (thread)
|
||||
(if (thread-continuation thread)
|
||||
(begin
|
||||
(remove-thread-from-queues! thread)
|
||||
(interrupt-thread thread
|
||||
(lambda ignore
|
||||
(terminate-current-thread)))
|
||||
(enqueue-thread! queue thread))))
|
||||
(terminate-level-thread thread level)))
|
||||
threads)
|
||||
(dynamic-wind
|
||||
(lambda ()
|
||||
|
@ -313,6 +308,16 @@
|
|||
(if (not (null? levels))
|
||||
(reset-command-input! (car levels))))))))
|
||||
|
||||
; Put the thread on the runnable queue if it is not already there and then
|
||||
; terminate it. Termination removes the thread from any blocking queues
|
||||
; and interrupts with a throw that will run any pending dynamic-winds.
|
||||
|
||||
(define (terminate-level-thread thread level)
|
||||
(let ((queue (command-level-queue level)))
|
||||
(if (not (on-queue? thread queue))
|
||||
(enqueue! queue thread))
|
||||
(terminate-thread! thread)))
|
||||
|
||||
(define (reset-command-input! level)
|
||||
(let ((repl (command-level-repl-thread level)))
|
||||
(if repl
|
||||
|
@ -367,8 +372,7 @@
|
|||
(error "non-command-level thread restarted on a command level"
|
||||
thread))
|
||||
((memq level levels)
|
||||
(enqueue-thread! (command-level-queue level)
|
||||
thread))
|
||||
(enqueue! (command-level-queue level) thread))
|
||||
(else
|
||||
(warn "dropping thread from exited command level"
|
||||
thread)))
|
||||
|
@ -447,7 +451,7 @@
|
|||
(if repl-thread
|
||||
(begin
|
||||
(set-command-level-repl-thread! level #f)
|
||||
(kill-thread! repl-thread)))))
|
||||
(terminate-level-thread repl-thread level)))))
|
||||
((eq? token repl-data-token)
|
||||
(command-level-repl-data level))
|
||||
((eq? token set-repl-data!-token)
|
||||
|
@ -524,14 +528,10 @@
|
|||
|
||||
(define (kill-paused-thread! level)
|
||||
(let ((paused (command-level-paused-thread level)))
|
||||
(if (not paused)
|
||||
(error "level has no paused thread" level))
|
||||
(if (eq? paused (command-level-repl-thread level))
|
||||
(spawn-repl-thread! level))
|
||||
(interrupt-thread paused terminate-current-thread)
|
||||
; (lambda ignore
|
||||
; (terminate-current-thread)))
|
||||
;(enqueue-thread! (command-level-queue level) paused)
|
||||
(set-command-level-paused-thread! level #f)))
|
||||
|
||||
|
||||
(if paused
|
||||
(begin
|
||||
(if (eq? paused (command-level-repl-thread level))
|
||||
(spawn-repl-thread! level))
|
||||
(terminate-thread! paused) ; it's already running, so no enqueue
|
||||
(set-command-level-paused-thread! level #f))
|
||||
(warn "level has no paused thread" level))))
|
||||
|
|
|
@ -197,6 +197,7 @@
|
|||
(export lock?
|
||||
make-lock
|
||||
obtain-lock
|
||||
obtain-lock-multiple
|
||||
maybe-obtain-lock
|
||||
release-lock
|
||||
lock-owner-uid)) ;really should be internal
|
||||
|
@ -568,12 +569,10 @@
|
|||
|
||||
current-thread
|
||||
|
||||
make-thread-queue
|
||||
thread-queue-empty?
|
||||
enqueue-thread!
|
||||
multiple-enqueue-thread!
|
||||
dequeue-thread!
|
||||
remove-thread-from-queues!
|
||||
thread-queue-empty?
|
||||
maybe-dequeue-thread!
|
||||
block-on-queue
|
||||
|
||||
event-pending?
|
||||
get-next-event!
|
||||
|
@ -590,6 +589,7 @@
|
|||
upcall propogate-upcall
|
||||
interrupt-thread
|
||||
kill-thread!
|
||||
terminate-thread!
|
||||
|
||||
wake-some-threads
|
||||
|
||||
|
@ -609,8 +609,9 @@
|
|||
decrement-counter!))
|
||||
|
||||
(define-interface queues-interface
|
||||
(export make-queue enqueue! dequeue! queue-empty?
|
||||
queue? queue->list queue-length delete-from-queue!))
|
||||
(export make-queue enqueue! dequeue! maybe-dequeue! queue-empty?
|
||||
queue? queue->list queue-front queue-length
|
||||
delete-from-queue! on-queue?))
|
||||
|
||||
(define-interface exceptions-interface
|
||||
(export define-exception-handler
|
||||
|
|
|
@ -63,6 +63,7 @@
|
|||
session-data
|
||||
define-record-types
|
||||
threads threads-internal
|
||||
queues
|
||||
scheduler
|
||||
interrupts
|
||||
weak
|
||||
|
@ -408,7 +409,7 @@
|
|||
|
||||
(define-structure placeholders placeholder-interface
|
||||
(open scheme-level-1 define-record-types
|
||||
threads threads-internal
|
||||
threads threads-internal queues
|
||||
interrupts
|
||||
signals)
|
||||
(files (big placeholder))
|
||||
|
|
|
@ -224,7 +224,7 @@
|
|||
|
||||
(define-structures ((rts-sigevents rts-sigevents-interface)
|
||||
(rts-sigevents-internal rts-sigevents-internal-interface))
|
||||
(open scheme-level-1 define-record-types
|
||||
(open scheme-level-1 define-record-types queues
|
||||
threads threads-internal
|
||||
interrupts
|
||||
architecture)
|
||||
|
@ -236,7 +236,7 @@
|
|||
|
||||
(define-structures ((threads threads-interface)
|
||||
(threads-internal threads-internal-interface))
|
||||
(open scheme-level-1 enumerated define-record-types queues
|
||||
(open scheme-level-1 enumerated define-record-types queues cells
|
||||
interrupts
|
||||
wind
|
||||
fluids
|
||||
|
@ -258,6 +258,7 @@
|
|||
(define-structure scheduler scheduler-interface
|
||||
(open scheme-level-1 threads threads-internal locks
|
||||
enumerated enum-case
|
||||
queues
|
||||
debug-messages
|
||||
signals) ;error
|
||||
(files (rts scheduler)))
|
||||
|
@ -267,6 +268,7 @@
|
|||
scheme-exit-now
|
||||
call-when-deadlocked!)
|
||||
(open scheme-level-1 threads threads-internal scheduler structure-refs
|
||||
queues
|
||||
session-data
|
||||
signals ;error
|
||||
handle ;with-handler
|
||||
|
@ -300,7 +302,8 @@
|
|||
(unspecific))))))
|
||||
|
||||
(define-structure queues queues-interface
|
||||
(open scheme-level-1 define-record-types signals)
|
||||
(open scheme-level-1 define-record-types signals
|
||||
debug-messages)
|
||||
(files (big queue))
|
||||
(optimize auto-integrate))
|
||||
|
||||
|
@ -314,7 +317,7 @@
|
|||
; (optimize auto-integrate))
|
||||
|
||||
(define-structure locks locks-interface
|
||||
(open scheme-level-1 define-record-types interrupts threads threads-internal)
|
||||
(open scheme-level-1 define-record-types queues interrupts threads threads-internal)
|
||||
(optimize auto-integrate)
|
||||
(files (rts lock)))
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
(if queue
|
||||
(begin
|
||||
(decrement-channel-wait-count!)
|
||||
(make-ready (dequeue-thread! queue) status))
|
||||
(make-ready (maybe-dequeue-thread! queue) status))
|
||||
(debug-message "Warning: dropping ignored channel i/o result {Channel "
|
||||
(channel-os-index channel)
|
||||
" "
|
||||
|
@ -50,7 +50,7 @@
|
|||
(add-channel-wait-queue! channel queue)
|
||||
(warn "channel has two pending operations" channel)
|
||||
(terminate-current-thread))
|
||||
(let ((queue (make-thread-queue)))
|
||||
(let ((queue (make-queue)))
|
||||
(increment-channel-wait-count!)
|
||||
(enqueue-thread! queue (current-thread))
|
||||
(add-channel-wait-queue! channel queue)
|
||||
|
@ -74,7 +74,7 @@
|
|||
(define (steal-channel! channel owner)
|
||||
(let ((queue (fetch-channel-wait-queue! channel)))
|
||||
(if queue
|
||||
(let ((thread (dequeue-thread! queue)))
|
||||
(let ((thread (maybe-dequeue-thread! queue)))
|
||||
(cond ((eq? thread owner)
|
||||
(decrement-channel-wait-count!)
|
||||
(channel-abort channel))
|
||||
|
@ -156,6 +156,7 @@
|
|||
(thread-queue-empty? queue))
|
||||
#f
|
||||
queue)))
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -20,15 +20,13 @@
|
|||
(define (make-lock)
|
||||
(let ((uid *lock-uid*))
|
||||
(set! *lock-uid* (+ uid 1))
|
||||
(really-make-lock #f (make-thread-queue) uid)))
|
||||
(really-make-lock #f (make-queue) uid)))
|
||||
|
||||
(define (obtain-lock lock)
|
||||
(with-interrupts-inhibited
|
||||
(lambda ()
|
||||
(if (lock-owner-uid lock)
|
||||
(begin
|
||||
(enqueue-thread! (lock-queue lock) (current-thread))
|
||||
(block))
|
||||
(block-on-queue (lock-queue lock))
|
||||
(set-lock-owner-uid! lock (thread-uid (current-thread)))))))
|
||||
|
||||
(define (maybe-obtain-lock lock)
|
||||
|
@ -40,17 +38,34 @@
|
|||
(set-lock-owner-uid! lock (thread-uid (current-thread)))
|
||||
#t)))))
|
||||
|
||||
(define (obtain-lock-multiple . all-locks)
|
||||
(with-interrupts-inhibited
|
||||
(lambda ()
|
||||
(let loop ((locks all-locks))
|
||||
(cond
|
||||
((null? locks)
|
||||
(for-each (lambda (lock)
|
||||
(enqueue-thread! (lock-queue lock) (current-thread)))
|
||||
all-locks)
|
||||
(block))
|
||||
((lock-owner-uid (car locks))
|
||||
(loop (cdr locks)))
|
||||
(else
|
||||
(set-lock-owner-uid! (car locks)
|
||||
(thread-uid (current-thread)))))))))
|
||||
|
||||
; Returns #t if the lock has no new owner.
|
||||
|
||||
(define (release-lock lock)
|
||||
(with-interrupts-inhibited
|
||||
(lambda ()
|
||||
(let ((queue (lock-queue lock)))
|
||||
(if (thread-queue-empty? queue)
|
||||
(begin
|
||||
(set-lock-owner-uid! lock #f)
|
||||
#t)
|
||||
(let ((next (dequeue-thread! queue)))
|
||||
(set-lock-owner-uid! lock (thread-uid next))
|
||||
(make-ready next)
|
||||
#f))))))
|
||||
(cond
|
||||
((maybe-dequeue-thread! queue)
|
||||
=> (lambda (next)
|
||||
(set-lock-owner-uid! lock (thread-uid next))
|
||||
(make-ready next)
|
||||
#f))
|
||||
(else
|
||||
(set-lock-owner-uid! lock #f)
|
||||
#t))))))
|
||||
|
|
|
@ -25,14 +25,14 @@
|
|||
; are handled specially. The only upcall is for aborting execution.
|
||||
|
||||
(define (make-root-event-handler thunk quantum abort)
|
||||
(let ((runnable (make-thread-queue))
|
||||
(let ((runnable (make-queue))
|
||||
(thread-count (make-counter))
|
||||
(safe-dynamic-env (with-handler root-handler get-dynamic-env))
|
||||
(thread (make-thread thunk
|
||||
(get-dynamic-env)
|
||||
'scheduler-initial-thread)))
|
||||
(increment-counter! thread-count)
|
||||
(enqueue-thread! runnable thread)
|
||||
(enqueue! runnable thread)
|
||||
(round-robin-event-handler
|
||||
runnable quantum safe-dynamic-env thread-count
|
||||
(lambda args #f) ; we handle no events
|
||||
|
|
|
@ -74,7 +74,7 @@
|
|||
(decrement-counter! thread-count)
|
||||
(next-thread))
|
||||
((out-of-time)
|
||||
(enqueue-thread! runnable thread)
|
||||
(enqueue! runnable thread)
|
||||
(next-thread))
|
||||
|
||||
;; the thread keeps running
|
||||
|
@ -94,13 +94,13 @@
|
|||
(or (event-handler event event-data)
|
||||
(enum-case event-type event
|
||||
((runnable)
|
||||
(enqueue-thread! runnable (car event-data)))
|
||||
(enqueue! runnable (car event-data)))
|
||||
((spawned)
|
||||
(increment-counter! thread-count)
|
||||
(enqueue-thread! runnable
|
||||
(make-thread (car event-data)
|
||||
dynamic-env
|
||||
(cadr event-data))))
|
||||
(enqueue! runnable
|
||||
(make-thread (car event-data)
|
||||
dynamic-env
|
||||
(cadr event-data))))
|
||||
((narrowed)
|
||||
(handle-narrow-event quantum dynamic-env event-data))
|
||||
((no-event)
|
||||
|
@ -112,7 +112,7 @@
|
|||
event-handler)))))
|
||||
|
||||
(define (next-thread)
|
||||
(if (thread-queue-empty? runnable)
|
||||
(if (queue-empty? runnable)
|
||||
(call-with-values
|
||||
get-next-event!
|
||||
(lambda (event . data)
|
||||
|
@ -123,7 +123,7 @@
|
|||
(next-thread))
|
||||
(else
|
||||
(values #f 0))))) ; scheduler quits
|
||||
(values (dequeue-thread! runnable)
|
||||
(values (dequeue! runnable)
|
||||
quantum)))
|
||||
|
||||
thread-event-handler)
|
||||
|
@ -134,13 +134,13 @@
|
|||
(obtain-lock lock)
|
||||
(spawn
|
||||
(lambda ()
|
||||
(let ((runnable (make-thread-queue))
|
||||
(let ((runnable (make-queue))
|
||||
(thread (make-thread (car event-data)
|
||||
dynamic-env
|
||||
(cadr event-data)))
|
||||
(thread-count (make-counter)))
|
||||
|
||||
(enqueue-thread! runnable thread)
|
||||
(enqueue! runnable thread)
|
||||
(increment-counter! thread-count)
|
||||
|
||||
(run-threads
|
||||
|
|
|
@ -30,8 +30,7 @@
|
|||
(if (type-in-set? (sigevent-type sigevent) set)
|
||||
sigevent
|
||||
(lp sigevent))
|
||||
(begin (enqueue-thread! sigevent-thread-queue (current-thread))
|
||||
(block)
|
||||
(begin (block-on-queue sigevent-thread-queue)
|
||||
(lp pre-sigevent))))))))
|
||||
|
||||
; same as above, but don't block
|
||||
|
@ -53,7 +52,7 @@
|
|||
(lambda ()
|
||||
(set-sigevent-next! *most-recent-sigevent* (make-sigevent type))
|
||||
(set! *most-recent-sigevent* (sigevent-next *most-recent-sigevent*))
|
||||
(do ((waiters '() (cons (dequeue-thread! sigevent-thread-queue)
|
||||
(do ((waiters '() (cons (maybe-dequeue-thread! sigevent-thread-queue)
|
||||
waiters)))
|
||||
((thread-queue-empty? sigevent-thread-queue)
|
||||
waiters))))))
|
||||
|
@ -64,7 +63,7 @@
|
|||
(not (thread-queue-empty? sigevent-thread-queue)))
|
||||
|
||||
(define (initialize-sigevents!)
|
||||
(set! sigevent-thread-queue (make-thread-queue))
|
||||
(set! sigevent-thread-queue (make-queue))
|
||||
(set-interrupt-handler! (enum interrupt os-signal)
|
||||
(lambda (type enabled-interrupts)
|
||||
; type is already set in the unix signal handler
|
||||
|
|
|
@ -7,12 +7,12 @@
|
|||
(cond ((not n)
|
||||
(call-error "wrong type argument" sleep user-n))
|
||||
((< 0 n)
|
||||
(let ((queue (make-thread-queue))) ; only one entry, but it must be a queue
|
||||
(let ((cell (make-cell (current-thread))))
|
||||
(disable-interrupts!)
|
||||
(enqueue-thread! queue (current-thread))
|
||||
(set-thread-cell! (current-thread) cell)
|
||||
(set! *dozers*
|
||||
(insert (cons (+ (real-time) n)
|
||||
queue)
|
||||
cell)
|
||||
*dozers*
|
||||
(lambda (frob1 frob2)
|
||||
(< (car frob1) (car frob2)))))
|
||||
|
@ -29,7 +29,7 @@
|
|||
#f))
|
||||
#f))
|
||||
|
||||
(define *dozers* '()) ; List of (wakeup-time . queue)
|
||||
(define *dozers* '()) ; List of (wakeup-time . cell)
|
||||
|
||||
(define (insert x l <)
|
||||
(cond ((null? l) (list x))
|
||||
|
@ -50,12 +50,13 @@
|
|||
(begin
|
||||
(set! *dozers* '())
|
||||
(values woke? #f))
|
||||
(let ((next (car dozers)))
|
||||
(cond ((thread-queue-empty? (cdr next))
|
||||
(let* ((next (car dozers))
|
||||
(thread (cell-ref (cdr next))))
|
||||
(cond ((not thread)
|
||||
(loop (cdr dozers) woke?))
|
||||
((< time (car next))
|
||||
(set! *dozers* dozers)
|
||||
(values woke? (- (car next) time)))
|
||||
(else
|
||||
(make-ready (dequeue-thread! (cdr next)))
|
||||
(make-ready thread)
|
||||
(loop (cdr dozers) #t)))))))))
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
; saved interrupt mask
|
||||
; scheduler, which is the thread that RUNs this one
|
||||
; remaining time in clock ticks ('waiting = waiting for events)
|
||||
; queue that is holding this thread, if any
|
||||
; cell that is holding this thread, if any
|
||||
; arguments waiting to be passed to the thread when it is next run
|
||||
; dynamic environment
|
||||
; dynamic point
|
||||
|
@ -41,7 +41,7 @@
|
|||
(define-record-type thread :thread
|
||||
(really-make-thread dynamic-env dynamic-point cell-env
|
||||
continuation scheduler
|
||||
queues arguments
|
||||
cell arguments
|
||||
events current-task uid name)
|
||||
thread?
|
||||
(dynamic-env thread-dynamic-env) ;Must be first! (See fluid.scm)
|
||||
|
@ -49,7 +49,7 @@
|
|||
;Must be second! (See fluid.scm)
|
||||
(cell-env thread-cell-env) ;Must be fourth! (See thread-env.scm)
|
||||
(continuation thread-continuation set-thread-continuation!)
|
||||
(queues thread-queues set-thread-queues!)
|
||||
(cell thread-cell set-thread-cell!)
|
||||
(arguments thread-arguments set-thread-arguments!)
|
||||
(time thread-time set-thread-time!)
|
||||
(scheduler thread-scheduler set-thread-scheduler!)
|
||||
|
@ -77,7 +77,7 @@
|
|||
(thunk->continuation
|
||||
(thread-top-level thunk))
|
||||
(current-thread) ; scheduler
|
||||
#f ; queue
|
||||
#f ; cell
|
||||
'() ; arguments
|
||||
#f ; events
|
||||
#f ; current-task
|
||||
|
@ -180,32 +180,31 @@
|
|||
; Rename the queue operations as thread-specific ones (both for clarity
|
||||
; and because we will want to use priority queues in the future).
|
||||
|
||||
(define make-thread-queue make-queue)
|
||||
(define thread-queue-empty? queue-empty?)
|
||||
|
||||
(define (enqueue-thread! queue thread)
|
||||
(if (thread-queues thread)
|
||||
(error "enqueued thread being added to another queue" thread queue))
|
||||
(set-thread-queues! thread (list queue))
|
||||
(enqueue! queue thread))
|
||||
(let ((cell (make-cell thread)))
|
||||
(enqueue! queue cell)
|
||||
(set-thread-cell! thread cell)))
|
||||
|
||||
(define (multiple-enqueue-thread! queues thread)
|
||||
(if (thread-queues thread)
|
||||
(error "enqueued thread being added to another queue" thread queues))
|
||||
(set-thread-queues! thread queues)
|
||||
(for-each (lambda (q) (enqueue! q thread)) queues))
|
||||
(define (maybe-dequeue-thread! queue)
|
||||
(let loop ()
|
||||
(let ((cell (maybe-dequeue! queue)))
|
||||
(if cell
|
||||
(or (cell-ref cell)
|
||||
(loop))
|
||||
#f))))
|
||||
|
||||
(define (dequeue-thread! queue)
|
||||
(let ((thread (dequeue! queue)))
|
||||
(for-each (lambda (q) (delete-from-queue! q thread)) (thread-queues thread))
|
||||
(set-thread-queues! thread #f)
|
||||
thread))
|
||||
; Look for a non-empty cell.
|
||||
|
||||
(define (thread-queue-empty? queue)
|
||||
(let loop ()
|
||||
(cond ((queue-empty? queue)
|
||||
#t)
|
||||
((cell-ref (queue-front queue))
|
||||
#f)
|
||||
(else
|
||||
(dequeue! queue)
|
||||
(loop)))))
|
||||
|
||||
(define (remove-thread-from-queues! thread)
|
||||
(if (thread-queues thread)
|
||||
(begin
|
||||
(for-each (lambda (q) (delete-from-queue! q thread)) (thread-queues thread))
|
||||
(set-thread-queues! thread #f))))
|
||||
;----------------
|
||||
|
||||
(define current-thread (structure-ref primitives current-thread))
|
||||
|
@ -257,13 +256,9 @@
|
|||
((not (eq? (thread-scheduler thread) scheduler))
|
||||
(enable-interrupts!)
|
||||
(error "thread run by wrong scheduler" thread scheduler))
|
||||
((thread-queues thread)
|
||||
((thread-cell thread)
|
||||
(enable-interrupts!)
|
||||
(apply debug-message (list "thread run while still on a queue "
|
||||
(thread-uid thread)
|
||||
(thread-name thread)
|
||||
(thread-queues thread)))
|
||||
(error "thread run while still on a queue" thread))
|
||||
(error "thread run while still blocked" thread))
|
||||
((and (thread-current-task thread)
|
||||
(not (null? (thread-arguments thread))))
|
||||
(enable-interrupts!)
|
||||
|
@ -443,6 +438,12 @@
|
|||
(define (relinquish-timeslice)
|
||||
(suspend (enum event-type out-of-time) '()))
|
||||
|
||||
; Utility procedure for the common case of blocking on a queue.
|
||||
|
||||
(define (block-on-queue queue)
|
||||
(enqueue-thread! queue (current-thread))
|
||||
(block))
|
||||
|
||||
; Send the upcall to the current scheduler and check the return value(s)
|
||||
; to see if it was handled properly.
|
||||
|
||||
|
@ -465,6 +466,13 @@
|
|||
(lambda ignored
|
||||
(exit (enum event-type killed) '()))))
|
||||
|
||||
; Also ends the thread, but lets it run any pending dynamic-winds.
|
||||
|
||||
(define (terminate-thread! thread)
|
||||
(let ((interrupts (set-enabled-interrupts! no-interrupts)))
|
||||
(clear-thread-cell! thread)
|
||||
(interrupt-thread thread terminate-current-thread)))
|
||||
|
||||
;----------------
|
||||
; Make THREAD execute PROC the next time it is run. The thread's own
|
||||
; continuation is passed whatever PROC returns.
|
||||
|
@ -596,10 +604,7 @@
|
|||
; Enqueue a RUNNABLE for THREAD's scheduler.
|
||||
|
||||
(define (make-ready thread . args)
|
||||
(if (thread-queues thread)
|
||||
(error "trying to schedule a queued thread" thread))
|
||||
; (if (not (null? (thread-arguments thread)))
|
||||
; (error "trying to replace thread arguments"))
|
||||
(clear-thread-cell! thread)
|
||||
(set-thread-arguments! thread args)
|
||||
(if (thread-scheduler thread)
|
||||
(schedule-event (thread-scheduler thread)
|
||||
|
@ -607,6 +612,13 @@
|
|||
thread)
|
||||
(error "MAKE-READY thread has no scheduler" thread)))
|
||||
|
||||
(define (clear-thread-cell! thread)
|
||||
(let ((cell (thread-cell thread)))
|
||||
(if cell
|
||||
(begin
|
||||
(set-thread-cell! thread #f)
|
||||
(cell-set! cell #f)))))
|
||||
|
||||
;----------------
|
||||
|
||||
(define (schedule-interrupt! time)
|
||||
|
|
|
@ -292,7 +292,7 @@
|
|||
usual-resumer ; usual-resumer
|
||||
environments ; with-interaction-environment
|
||||
fluids-internal ; JMG: get-dynamic-env
|
||||
threads threads-internal scheduler
|
||||
threads threads-internal queues scheduler
|
||||
structure-refs
|
||||
scsh-utilities
|
||||
interrupts
|
||||
|
|
|
@ -65,7 +65,7 @@
|
|||
(lambda ()
|
||||
(let ((dynamic-env (get-dynamic-env))
|
||||
(*result* 4711))
|
||||
(let ((runnable (make-thread-queue))
|
||||
(let ((runnable (make-queue))
|
||||
(thread (make-thread (lambda ()
|
||||
(set! *result*
|
||||
(start (command-line))))
|
||||
|
@ -73,7 +73,7 @@
|
|||
'scsh-initial-thread))
|
||||
(thread-count (make-counter)))
|
||||
|
||||
(enqueue-thread! runnable thread)
|
||||
(enqueue! runnable thread)
|
||||
(increment-counter! thread-count)
|
||||
|
||||
(run-threads
|
||||
|
|
|
@ -117,12 +117,7 @@
|
|||
(lambda () #t)
|
||||
thunk2
|
||||
(lambda ()
|
||||
(savely-kill-thread! (placeholder-value thread))))))
|
||||
|
||||
(define (savely-kill-thread! thread)
|
||||
(remove-thread-from-queues! thread)
|
||||
(kill-thread! thread)
|
||||
(make-ready thread))
|
||||
(terminate-thread! (placeholder-value thread))))))
|
||||
|
||||
(define (obtain-all-or-none . locks)
|
||||
(let lp ((obtained '()) (needed locks))
|
||||
|
|
Loading…
Reference in New Issue