162 lines
4.9 KiB
Scheme
162 lines
4.9 KiB
Scheme
; Copyright (c) 1993-1999 by Richard Kelsey and Jonathan Rees. See file COPYING.
|
|
|
|
; Channel interrupt stuff.
|
|
|
|
; Install an interrupt handler that queues up the results of completed I/O
|
|
; operations and spawn a thread to cope with them. This is written so as
|
|
; to avoid having state in top-level variables, because their values are
|
|
; saved in dumped images.
|
|
|
|
(define (initialize-channel-i/o!)
|
|
(session-data-set! channel-wait-queues-slot '())
|
|
(session-data-set! channel-wait-count-slot 0)
|
|
(set-interrupt-handler! (enum interrupt i/o-completion)
|
|
i/o-completion-handler))
|
|
|
|
; The warning message is printed using DEBUG-MESSAGE because to try to make
|
|
; sure it appears in spite of whatever problem's the I/O system is having.
|
|
;
|
|
; Called with interrupts disabled.
|
|
|
|
(define (i/o-completion-handler channel status enabled-interrupts)
|
|
(let ((queue (fetch-channel-wait-queue! channel)))
|
|
(if queue
|
|
(begin
|
|
(decrement-channel-wait-count!)
|
|
(make-ready (dequeue-thread! queue) status))
|
|
(debug-message "Warning: dropping ignored channel i/o result {Channel "
|
|
(channel-os-index channel)
|
|
" "
|
|
(channel-id channel)
|
|
"}"))))
|
|
|
|
; Exported procedure
|
|
|
|
(define (waiting-for-i/o?)
|
|
(< 0 (channel-wait-count)))
|
|
|
|
; Block until the current I/O operation on CHANNEL has completed.
|
|
; This returns the result of the operation.
|
|
;
|
|
; This needs to be called with interrupts disabled.
|
|
;
|
|
; We install a DYNAMIC-WIND to abort the operation if the waiting thread is
|
|
; terminated.
|
|
|
|
(define (wait-for-channel channel)
|
|
(let ((queue (fetch-channel-wait-queue! channel)))
|
|
(if queue
|
|
(begin
|
|
(add-channel-wait-queue! channel queue)
|
|
(warn "channel has two pending operations" channel)
|
|
(terminate-current-thread))
|
|
(let ((queue (make-thread-queue)))
|
|
(increment-channel-wait-count!)
|
|
(enqueue-thread! queue (current-thread))
|
|
(add-channel-wait-queue! channel queue)
|
|
(dynamic-wind nothing
|
|
block
|
|
(lambda ()
|
|
(disable-interrupts!)
|
|
(let ((new-queue (fetch-channel-wait-queue! channel)))
|
|
(cond ((eq? queue new-queue)
|
|
(channel-abort channel)
|
|
(wait-for-channel channel))
|
|
(new-queue
|
|
(add-channel-wait-queue! channel new-queue)))
|
|
(enable-interrupts!))))))))
|
|
|
|
(define (nothing) #f)
|
|
|
|
; Abort any pending operation on by OWNER on CHANNEL.
|
|
; Called with interrupts disabled.
|
|
|
|
(define (steal-channel! channel owner)
|
|
(let ((queue (fetch-channel-wait-queue! channel)))
|
|
(if queue
|
|
(let ((thread (dequeue-thread! queue)))
|
|
(cond ((eq? thread owner)
|
|
(decrement-channel-wait-count!)
|
|
(channel-abort channel))
|
|
(else
|
|
(warn "channel in use by other than port owner"
|
|
channel thread owner)
|
|
(enqueue-thread! queue thread)
|
|
#f)))
|
|
#f)))
|
|
|
|
; Have CHANNEL-READ and CHANNEL-WRITE wait if a pending-channel-i/o
|
|
; exception occurs.
|
|
|
|
(define-exception-handler (enum op channel-maybe-read)
|
|
(lambda (opcode reason buffer start count wait? channel . maybe-os-message)
|
|
(if (= reason (enum exception pending-channel-i/o))
|
|
(wait-for-channel channel)
|
|
(begin
|
|
(enable-interrupts!)
|
|
(apply signal-exception
|
|
opcode reason buffer start count wait? channel
|
|
maybe-os-message)))))
|
|
|
|
(define-exception-handler (enum op channel-maybe-write)
|
|
(lambda (opcode reason buffer start count channel . maybe-os-message)
|
|
(if (= reason (enum exception pending-channel-i/o))
|
|
(wait-for-channel channel)
|
|
(begin
|
|
(enable-interrupts!)
|
|
(apply signal-exception
|
|
opcode reason buffer start count channel
|
|
maybe-os-message)))))
|
|
|
|
; Two session slots
|
|
; - the number of threads waiting for I/O completion events
|
|
; - an alist mapping channels to queues for waiting threads
|
|
|
|
(define channel-wait-count-slot (make-session-data-slot! 0))
|
|
|
|
(define (channel-wait-count)
|
|
(session-data-ref channel-wait-count-slot))
|
|
|
|
(define (increment-channel-wait-count!)
|
|
(session-data-set! channel-wait-count-slot (+ (channel-wait-count) 1)))
|
|
|
|
(define (decrement-channel-wait-count!)
|
|
(session-data-set! channel-wait-count-slot (- (channel-wait-count) 1)))
|
|
|
|
(define channel-wait-queues-slot (make-session-data-slot! '()))
|
|
|
|
; Adding a queue and channel - the caller has already determined there is no
|
|
; existing queue for this channel.
|
|
|
|
(define (add-channel-wait-queue! channel queue)
|
|
(session-data-set! channel-wait-queues-slot
|
|
(cons (cons channel queue)
|
|
(session-data-ref channel-wait-queues-slot))))
|
|
|
|
; This is just deleting from an a-list.
|
|
|
|
(define (fetch-channel-wait-queue! channel)
|
|
(let* ((queues (session-data-ref channel-wait-queues-slot))
|
|
(queue (cond ((null? queues)
|
|
#f)
|
|
((eq? channel (caar queues))
|
|
(session-data-set! channel-wait-queues-slot
|
|
(cdr queues))
|
|
(cdar queues))
|
|
(else
|
|
(let loop ((queues (cdr queues)) (prev queues))
|
|
(cond ((null? queues)
|
|
#f)
|
|
((eq? channel (caar queues))
|
|
(set-cdr! prev (cdr queues))
|
|
(cdar queues))
|
|
(else
|
|
(loop (cdr queues) queues))))))))
|
|
(if (or (not queue)
|
|
(thread-queue-empty? queue))
|
|
#f
|
|
queue)))
|
|
|
|
|
|
|