263 lines
8.2 KiB
Scheme
263 lines
8.2 KiB
Scheme
; Copyright (c) 1993-1999 by Richard Kelsey and Jonathan Rees. See file COPYING.
|
|
|
|
; Channel interrupt stuff.
|
|
|
|
; Install an interrupt handler that cells up the results of completed I/O
|
|
; operations and spawn a thread to cope with them. This is written so as
|
|
; to avoid having state in top-level variables, because their values are
|
|
; saved in dumped images.
|
|
|
|
(define (initialize-channel-i/o!)
|
|
(session-data-set! channel-wait-cells-slot '())
|
|
(session-data-set! channel-wait-count-slot 0)
|
|
(set-interrupt-handler! (enum interrupt i/o-read-completion)
|
|
(make-i/o-completion-handler
|
|
(lambda (cell channel)
|
|
(let ((old (cell-ref cell)))
|
|
(cell-set! cell
|
|
(cons (cons channel (car old))
|
|
(cdr old)))))))
|
|
(set-interrupt-handler! (enum interrupt i/o-write-completion)
|
|
(make-i/o-completion-handler
|
|
(lambda (cell channel)
|
|
(let ((old (cell-ref cell)))
|
|
(cell-set! cell
|
|
(cons (car old)
|
|
(cons channel (cdr old)))))))))
|
|
|
|
; The warning message is printed using DEBUG-MESSAGE because to try to make
|
|
; sure it appears in spite of whatever problem's the I/O system is having.
|
|
|
|
(define (make-i/o-completion-handler update-ready-cell)
|
|
;; Called with interrupts disabled.
|
|
(lambda (channel status enabled-interrupts)
|
|
(call-with-values
|
|
(lambda () (fetch-channel-wait-cell! channel))
|
|
(lambda (thread-cell maybe-ready-cell)
|
|
(cond
|
|
((and thread-cell (cell-ref thread-cell))
|
|
=> (lambda (thread)
|
|
(decrement-channel-wait-count!)
|
|
(make-ready thread status)))
|
|
;; The problem with the debug message is that
|
|
;; WAIT-FOR-CHANNELS can trigger this warning in a perfectly
|
|
;; legimitate situation because of a race I don't know how to
|
|
;; avoid. --Mike
|
|
; (else
|
|
; (debug-message "Warning: dropping ignored channel i/o result {Channel "
|
|
; (channel-os-index channel)
|
|
; " "
|
|
; (channel-id channel)
|
|
; "}"))
|
|
)
|
|
(cond
|
|
((and maybe-ready-cell
|
|
(cell-ref maybe-ready-cell))
|
|
(update-ready-cell maybe-ready-cell channel)))))))
|
|
|
|
; Exported procedure
|
|
|
|
(define (waiting-for-i/o?)
|
|
(< 0 (channel-wait-count)))
|
|
|
|
; Block until the current I/O operation on CHANNEL has completed.
|
|
; This returns the result of the operation.
|
|
;
|
|
; This needs to be called with interrupts disabled.
|
|
;
|
|
; We install a DYNAMIC-WIND to abort the operation if the waiting thread is
|
|
; terminated.
|
|
|
|
(define (wait-for-channel channel)
|
|
(call-with-values
|
|
(lambda () (fetch-channel-wait-cell! channel))
|
|
(lambda (thread-cell maybe-ready-cell)
|
|
(if (and thread-cell (cell-ref thread-cell))
|
|
(begin
|
|
(add-channel-wait-cell! channel thread-cell #f)
|
|
(warn "channel has two pending operations" channel)
|
|
(terminate-current-thread))
|
|
(let ((cell (make-cell (current-thread))))
|
|
(increment-channel-wait-count!)
|
|
(set-thread-cell! (current-thread) cell)
|
|
(add-channel-wait-cell! channel cell #f)
|
|
(dynamic-wind nothing
|
|
block
|
|
(lambda ()
|
|
(disable-interrupts!)
|
|
(if (cell-ref cell)
|
|
;; we're being terminated
|
|
(begin
|
|
(fetch-channel-wait-cell! channel)
|
|
(channel-abort channel)
|
|
(wait-for-channel channel)))
|
|
(enable-interrupts!))))))))
|
|
|
|
(define (nothing) #f)
|
|
|
|
(define (channel-check-waiter channel)
|
|
(if (channel-has-waiter? channel)
|
|
(begin
|
|
(warn "channel has two pending operations" channel)
|
|
(terminate-current-thread))))
|
|
|
|
(define (wait-for-channels read-channels write-channels timeout)
|
|
;; check if we're borked from the outset
|
|
(for-each channel-check-waiter read-channels)
|
|
(for-each channel-check-waiter write-channels)
|
|
|
|
(let ((thread-cell (make-cell (current-thread)))
|
|
(ready-channels-cell (make-cell (cons '() '())))
|
|
(ready-read-channels #f)
|
|
(ready-write-channels #f))
|
|
|
|
(if (or (not timeout)
|
|
(register-dozer thread-cell timeout))
|
|
(begin
|
|
;; register us with every channel we're waiting for
|
|
(set-thread-cell! (current-thread) thread-cell)
|
|
(let ((signup (lambda (channel)
|
|
(add-channel-wait-cell! channel
|
|
thread-cell ready-channels-cell)
|
|
(increment-channel-wait-count!))))
|
|
(for-each signup read-channels)
|
|
(for-each signup write-channels))
|
|
|
|
;; block
|
|
(dynamic-wind
|
|
nothing
|
|
(lambda ()
|
|
(block)
|
|
(disable-interrupts!)
|
|
(let ((pair (cell-ref ready-channels-cell)))
|
|
(set! ready-read-channels (car pair))
|
|
(set! ready-write-channels (cdr pair)))
|
|
(cell-set! ready-channels-cell #f)
|
|
(enable-interrupts!)
|
|
(values ready-read-channels ready-write-channels))
|
|
;; clean up
|
|
(lambda ()
|
|
(let ((aborting? (and (cell-ref thread-cell) #t)))
|
|
(disable-interrupts!)
|
|
;; this ain't so great ...
|
|
(let ((make-cleanup
|
|
(lambda (ready-channels)
|
|
(lambda (channel)
|
|
(if (memq channel ready-channels)
|
|
(begin
|
|
(fetch-channel-wait-cell! channel)
|
|
(if (not aborting?)
|
|
(decrement-channel-wait-count!)
|
|
(begin
|
|
(channel-abort channel)
|
|
(wait-for-channel channel)))))))))
|
|
(for-each (make-cleanup ready-read-channels) read-channels)
|
|
(for-each (make-cleanup ready-write-channels) write-channels))
|
|
|
|
(enable-interrupts!)))))
|
|
;; the timeout was zero or less
|
|
(enable-interrupts!))))
|
|
|
|
; Abort any pending operation on by OWNER on CHANNEL.
|
|
; Called with interrupts disabled.
|
|
|
|
(define (steal-channel! channel owner)
|
|
(call-with-values
|
|
(lambda () (fetch-channel-wait-cell! channel))
|
|
(lambda (thread-cell maybe-ready-cell)
|
|
(cond
|
|
((cell-ref thread-cell)
|
|
=> (lambda (thread)
|
|
(cond ((eq? thread owner)
|
|
(clear-thread-cell! thread)
|
|
(decrement-channel-wait-count!)
|
|
(channel-abort channel))
|
|
(else
|
|
(warn "channel in use by other than port owner"
|
|
channel thread owner)
|
|
#f))))
|
|
(else #f)))))
|
|
|
|
; Have CHANNEL-READ and CHANNEL-WRITE wait if a pending-channel-i/o
|
|
; exception occurs.
|
|
|
|
(define-exception-handler (enum op channel-maybe-read)
|
|
(lambda (opcode reason buffer start count wait? channel . maybe-os-message)
|
|
(if (= reason (enum exception pending-channel-i/o))
|
|
(wait-for-channel channel)
|
|
(begin
|
|
(enable-interrupts!)
|
|
(apply signal-exception
|
|
opcode reason buffer start count wait? channel
|
|
maybe-os-message)))))
|
|
|
|
(define-exception-handler (enum op channel-maybe-write)
|
|
(lambda (opcode reason buffer start count channel . maybe-os-message)
|
|
(if (= reason (enum exception pending-channel-i/o))
|
|
(wait-for-channel channel)
|
|
(begin
|
|
(enable-interrupts!)
|
|
(apply signal-exception
|
|
opcode reason buffer start count channel
|
|
maybe-os-message)))))
|
|
|
|
; Two session slots
|
|
; - the number of threads waiting for I/O completion events
|
|
; - an alist mapping channels to cells for waiting threads
|
|
|
|
(define channel-wait-count-slot (make-session-data-slot! 0))
|
|
|
|
(define (channel-wait-count)
|
|
(session-data-ref channel-wait-count-slot))
|
|
|
|
(define (increment-channel-wait-count!)
|
|
(session-data-set! channel-wait-count-slot (+ (channel-wait-count) 1)))
|
|
|
|
(define (decrement-channel-wait-count!)
|
|
(session-data-set! channel-wait-count-slot (- (channel-wait-count) 1)))
|
|
|
|
(define channel-wait-cells-slot (make-session-data-slot! '()))
|
|
|
|
; Adding a cell and channel - the caller has already determined there is no
|
|
; existing cell for this channel.
|
|
|
|
(define (add-channel-wait-cell! channel cell maybe-ready-channels-cell)
|
|
(session-data-set! channel-wait-cells-slot
|
|
(cons (cons channel (cons cell maybe-ready-channels-cell))
|
|
(session-data-ref channel-wait-cells-slot))))
|
|
|
|
; This is just deleting from an a-list.
|
|
|
|
(define (fetch-channel-wait-cell! channel)
|
|
(let* ((cells (session-data-ref channel-wait-cells-slot))
|
|
(cell+ready-channels-cell
|
|
(cond ((null? cells)
|
|
#f)
|
|
((eq? channel (caar cells))
|
|
(session-data-set! channel-wait-cells-slot
|
|
(cdr cells))
|
|
(cdar cells))
|
|
(else
|
|
(let loop ((cells (cdr cells)) (prev cells))
|
|
(cond ((null? cells)
|
|
#f)
|
|
((eq? channel (caar cells))
|
|
(set-cdr! prev (cdr cells))
|
|
(cdar cells))
|
|
(else
|
|
(loop (cdr cells) cells))))))))
|
|
(cond
|
|
(cell+ready-channels-cell
|
|
=> (lambda (pair)
|
|
(let ((thread-cell (car pair))
|
|
(ready-cell (cdr pair)))
|
|
(values thread-cell ready-cell))))
|
|
(else
|
|
(values #f #f)))))
|
|
|
|
(define (channel-has-waiter? channel)
|
|
(and (assq channel
|
|
(session-data-ref channel-wait-cells-slot))
|
|
#t))
|
|
|