Revamped the SELECT implementation to properly distinguish between
input and output port. Also, SELECT and SELECT! are no longer squashed into one silly procedure.
This commit is contained in:
parent
a8ad3e0c71
commit
d0977ea4d4
|
@ -733,177 +733,229 @@
|
|||
;;; select
|
||||
;;; -----
|
||||
|
||||
(define (port/fdes->port port/fd)
|
||||
(define (port/fdes->input-port port/fd)
|
||||
(if (port? port/fd)
|
||||
port/fd
|
||||
(fdes->inport port/fd))) ; ####
|
||||
(fdes->inport port/fd)))
|
||||
|
||||
(define (port/fdes-ready? port/fd)
|
||||
(let ((port (port/fdes->port port/fd)))
|
||||
(define (port/fdes->output-port port/fd)
|
||||
(if (port? port/fd)
|
||||
port/fd
|
||||
(fdes->outport port/fd)))
|
||||
|
||||
(define (input-port/fdes-ready? port/fd)
|
||||
(let ((port (port/fdes->input-port port/fd)))
|
||||
((port-handler-ready? (port-handler port)) port)))
|
||||
|
||||
(define (any-ready port/fds)
|
||||
(let loop ((port/fds port/fds))
|
||||
(if (null? port/fds)
|
||||
'()
|
||||
(let ((port/fd (car port/fds)))
|
||||
(if (port/fdes-ready? port/fd)
|
||||
;; one is ready, get them all
|
||||
(let loop ((rest (cdr port/fds))
|
||||
(ready (list port/fd)))
|
||||
(cond
|
||||
((null? rest) (reverse ready))
|
||||
((port/fdes-ready? (car rest))
|
||||
(loop (cdr rest) (cons (car rest) ready)))
|
||||
(else
|
||||
(loop (cdr rest) ready))))
|
||||
(loop (cdr port/fds)))))))
|
||||
(define (output-port/fdes-ready? port/fd)
|
||||
(let ((port (port/fdes->output-port port/fd)))
|
||||
((port-handler-ready? (port-handler port)) port)))
|
||||
|
||||
(define (port/fdes-check-unlocked port/fd)
|
||||
(if (port-locked? (port/fdes->port port/fd))
|
||||
(begin
|
||||
((structure-ref interrupts enable-interrupts!))
|
||||
(error "SELECT on port with pending operation"
|
||||
port/fd))))
|
||||
(define (make-any-ready input?)
|
||||
(let ((port/fdes-ready?
|
||||
(if input?
|
||||
input-port/fdes-ready?
|
||||
output-port/fdes-ready?)))
|
||||
(lambda (port/fds)
|
||||
(let loop ((port/fds port/fds))
|
||||
(if (null? port/fds)
|
||||
'()
|
||||
(let ((port/fd (car port/fds)))
|
||||
(if (port/fdes-ready? port/fd)
|
||||
;; one is ready, get them all
|
||||
(let loop ((rest (cdr port/fds))
|
||||
(ready (list port/fd)))
|
||||
(cond
|
||||
((null? rest) (reverse ready))
|
||||
((port/fdes-ready? (car rest))
|
||||
(loop (cdr rest) (cons (car rest) ready)))
|
||||
(else
|
||||
(loop (cdr rest) ready))))
|
||||
(loop (cdr port/fds)))))))))
|
||||
|
||||
(define (port/fdes->channel port/fd)
|
||||
(define any-input-ready (make-any-ready #t))
|
||||
(define any-output-ready (make-any-ready #f))
|
||||
|
||||
(define (make-port/fdes-check-unlocked input?)
|
||||
(let ((port/fdes->port
|
||||
(if input?
|
||||
port/fdes->input-port
|
||||
port/fdes->output-port)))
|
||||
(lambda (port/fd)
|
||||
(if (port-locked? (port/fdes->port port/fd))
|
||||
(begin
|
||||
((structure-ref interrupts enable-interrupts!))
|
||||
(error "SELECT on port with pending operation"
|
||||
port/fd))))))
|
||||
|
||||
(define input-port/fdes-check-unlocked (make-port/fdes-check-unlocked #t))
|
||||
(define output-port/fdes-check-unlocked (make-port/fdes-check-unlocked #f))
|
||||
|
||||
(define (port/fdes->input-channel port/fd)
|
||||
(fdport-data:channel
|
||||
(fdport-data
|
||||
(port/fdes->port port/fd))))
|
||||
(port/fdes->input-port port/fd))))
|
||||
|
||||
;; this is way too epic and probably should just be split up once the
|
||||
;; dust has settled ---Mike
|
||||
(define (port/fdes->output-channel port/fd)
|
||||
(fdport-data:channel
|
||||
(fdport-data
|
||||
(port/fdes->output-port port/fd))))
|
||||
|
||||
(define (make-select !?)
|
||||
(lambda (read-vec write-vec exception-vec . maybe-timeout)
|
||||
(let ((read-list (vector->list read-vec))
|
||||
(write-list (vector->list write-vec))
|
||||
(timeout (:optional maybe-timeout #f)))
|
||||
(define (select read-vec write-vec exception-vec . maybe-timeout)
|
||||
(let ((read-list (vector->list read-vec))
|
||||
(write-list (vector->list write-vec))
|
||||
(timeout (:optional maybe-timeout #f)))
|
||||
|
||||
((structure-ref interrupts disable-interrupts!))
|
||||
((structure-ref interrupts disable-interrupts!))
|
||||
|
||||
(for-each port/fdes-check-unlocked read-list)
|
||||
(for-each port/fdes-check-unlocked write-list)
|
||||
(for-each input-port/fdes-check-unlocked read-list)
|
||||
(for-each output-port/fdes-check-unlocked write-list)
|
||||
|
||||
(let ((any-read (any-ready read-list))
|
||||
(any-write (any-ready write-list)))
|
||||
(if (or (pair? any-read) (pair? any-write))
|
||||
(begin
|
||||
((structure-ref interrupts enable-interrupts!))
|
||||
(if !? ; we're SELECT!
|
||||
(let ((n-read-ready
|
||||
(let ((length (vector-length read-vec)))
|
||||
(let loop ((i 0) (n 0))
|
||||
(cond
|
||||
((= i length) n)
|
||||
((memq (vector-ref read-vec i) any-read)
|
||||
(loop (+ 1 i) (+ 1 n)))
|
||||
(else
|
||||
(vector-set! read-vec i #f)
|
||||
(loop (+ 1 i) n))))))
|
||||
(n-write-ready
|
||||
(let ((length (vector-length write-vec)))
|
||||
(let loop ((i 0) (n 0))
|
||||
(cond
|
||||
((= i length) n)
|
||||
((memq (vector-ref write-vec i) any-write)
|
||||
(loop (+ 1 i) (+ 1 n)))
|
||||
(else
|
||||
(vector-set! write-vec i #f)
|
||||
(loop (+ 1 i) n)))))))
|
||||
|
||||
;; zero out EXCEPTION-VEC
|
||||
(let ((length (vector-length exception-vec)))
|
||||
(let loop ((i 0))
|
||||
(if (< i length)
|
||||
(begin
|
||||
(vector-set! exception-vec i #f)
|
||||
(loop (+ 1 i))))))
|
||||
|
||||
(values n-read-ready n-write-ready 0))
|
||||
;; we're vanilla SELECT
|
||||
(values (list->vector any-read)
|
||||
(list->vector any-write)
|
||||
(make-vector 0))))
|
||||
(let ((any-read (any-input-ready read-list))
|
||||
(any-write (any-output-ready write-list)))
|
||||
|
||||
(if (or (pair? any-read) (pair? any-write))
|
||||
(begin
|
||||
((structure-ref interrupts enable-interrupts!))
|
||||
(values (list->vector any-read)
|
||||
(list->vector any-write)
|
||||
(make-vector 0)))
|
||||
|
||||
;; we need to block
|
||||
(let ((read-channels (map port/fdes->channel read-list))
|
||||
(write-channels (map port/fdes->channel write-list)))
|
||||
;; we need to block
|
||||
(let ((read-channels (map port/fdes->input-channel read-list))
|
||||
(write-channels (map port/fdes->output-channel write-list)))
|
||||
|
||||
(for-each (lambda (channel)
|
||||
(add-pending-channel channel #t))
|
||||
read-channels)
|
||||
(for-each (lambda (channel)
|
||||
(add-pending-channel channel #t))
|
||||
read-channels)
|
||||
|
||||
(for-each (lambda (channel)
|
||||
(add-pending-channel channel #f))
|
||||
write-channels)
|
||||
(for-each (lambda (channel)
|
||||
(add-pending-channel channel #f))
|
||||
write-channels)
|
||||
|
||||
(call-with-values
|
||||
(lambda ()
|
||||
(wait-for-channels read-channels write-channels timeout))
|
||||
;; re-enables interrupts
|
||||
(lambda (ready-read-channels ready-write-channels)
|
||||
(let ((ready-read-port/fds '())
|
||||
(ready-write-port/fds '()))
|
||||
(for-each (lambda (port/fd channel)
|
||||
(if (memq channel ready-read-channels)
|
||||
(set! ready-read-port/fds
|
||||
(cons port/fd ready-read-port/fds))))
|
||||
read-list read-channels)
|
||||
(for-each (lambda (port/fd channel)
|
||||
(if (memq channel ready-write-channels)
|
||||
(set! ready-write-port/fds
|
||||
(cons port/fd ready-write-port/fds))))
|
||||
write-list write-channels)
|
||||
|
||||
(values (list->vector (reverse ready-read-port/fds))
|
||||
(list->vector (reverse ready-write-port/fds))
|
||||
(make-vector 0))))))))))
|
||||
|
||||
(define (select! read-vec write-vec exception-vec . maybe-timeout)
|
||||
(let ((read-list (vector->list read-vec))
|
||||
(write-list (vector->list write-vec))
|
||||
(timeout (:optional maybe-timeout #f)))
|
||||
|
||||
((structure-ref interrupts disable-interrupts!))
|
||||
|
||||
(for-each input-port/fdes-check-unlocked read-list)
|
||||
(for-each output-port/fdes-check-unlocked write-list)
|
||||
|
||||
(let ((any-read (any-input-ready read-list))
|
||||
(any-write (any-output-ready write-list)))
|
||||
(if (or (pair? any-read) (pair? any-write))
|
||||
(begin
|
||||
((structure-ref interrupts enable-interrupts!))
|
||||
(let ((n-read-ready
|
||||
(let ((length (vector-length read-vec)))
|
||||
(let loop ((i 0) (n 0))
|
||||
(cond
|
||||
((= i length) n)
|
||||
((memq (vector-ref read-vec i) any-read)
|
||||
(loop (+ 1 i) (+ 1 n)))
|
||||
(else
|
||||
(vector-set! read-vec i #f)
|
||||
(loop (+ 1 i) n))))))
|
||||
(n-write-ready
|
||||
(let ((length (vector-length write-vec)))
|
||||
(let loop ((i 0) (n 0))
|
||||
(cond
|
||||
((= i length) n)
|
||||
((memq (vector-ref write-vec i) any-write)
|
||||
(loop (+ 1 i) (+ 1 n)))
|
||||
(else
|
||||
(vector-set! write-vec i #f)
|
||||
(loop (+ 1 i) n)))))))
|
||||
|
||||
;; zero out EXCEPTION-VEC
|
||||
(let ((length (vector-length exception-vec)))
|
||||
(let loop ((i 0))
|
||||
(if (< i length)
|
||||
(begin
|
||||
(vector-set! exception-vec i #f)
|
||||
(loop (+ 1 i))))))
|
||||
|
||||
(values n-read-ready n-write-ready 0)))
|
||||
|
||||
;; we need to block
|
||||
(let ((read-channels (map port/fdes->input-channel read-list))
|
||||
(write-channels (map port/fdes->output-channel write-list)))
|
||||
|
||||
(for-each (lambda (channel)
|
||||
(add-pending-channel channel #t))
|
||||
read-channels)
|
||||
|
||||
(for-each (lambda (channel)
|
||||
(add-pending-channel channel #f))
|
||||
write-channels)
|
||||
|
||||
(call-with-values
|
||||
(lambda ()
|
||||
(wait-for-channels read-channels write-channels timeout))
|
||||
;; re-enables interrupts
|
||||
(lambda (ready-read-channels ready-write-channels)
|
||||
(call-with-values
|
||||
(lambda ()
|
||||
(wait-for-channels read-channels write-channels timeout))
|
||||
;; re-enables interrupts
|
||||
(lambda (ready-read-channels ready-write-channels)
|
||||
|
||||
;; too many free variables ...
|
||||
(if !? ; we're SELECT!
|
||||
(let ((n-read-ready
|
||||
(let loop ((read-channels read-channels)
|
||||
(n-ready 0)
|
||||
(index 0))
|
||||
(if (null? read-channels)
|
||||
n-ready
|
||||
(if (memq (car read-channels) ready-read-channels)
|
||||
(loop (cdr read-channels)
|
||||
(+ 1 n-ready)
|
||||
(+ 1 index))
|
||||
(begin
|
||||
(vector-set! read-vec index #f)
|
||||
(loop (cdr read-channels)
|
||||
n-ready
|
||||
(+ 1 index)))))))
|
||||
(n-write-ready
|
||||
(let loop ((write-channels write-channels)
|
||||
(n-ready 0)
|
||||
(index 0))
|
||||
(if (null? write-channels)
|
||||
n-ready
|
||||
(if (memq (car write-channels) ready-write-channels)
|
||||
(loop (cdr write-channels)
|
||||
(+ 1 n-ready)
|
||||
(+ 1 index))
|
||||
(begin
|
||||
(vector-set! write-vec index #f)
|
||||
(loop (cdr write-channels)
|
||||
n-ready
|
||||
(+ 1 index))))))))
|
||||
;; zero out EXCEPTION-VEC
|
||||
(let ((length (vector-length exception-vec)))
|
||||
(let loop ((i 0))
|
||||
(if (< i length)
|
||||
(begin
|
||||
(vector-set! exception-vec i #f)
|
||||
(loop (+ 1 i))))))
|
||||
(let ((n-read-ready
|
||||
(let loop ((read-channels read-channels)
|
||||
(n-ready 0)
|
||||
(index 0))
|
||||
(if (null? read-channels)
|
||||
n-ready
|
||||
(if (memq (car read-channels) ready-read-channels)
|
||||
(loop (cdr read-channels)
|
||||
(+ 1 n-ready)
|
||||
(+ 1 index))
|
||||
(begin
|
||||
(vector-set! read-vec index #f)
|
||||
(loop (cdr read-channels)
|
||||
n-ready
|
||||
(+ 1 index)))))))
|
||||
(n-write-ready
|
||||
(let loop ((write-channels write-channels)
|
||||
(n-ready 0)
|
||||
(index 0))
|
||||
(if (null? write-channels)
|
||||
n-ready
|
||||
(if (memq (car write-channels) ready-write-channels)
|
||||
(loop (cdr write-channels)
|
||||
(+ 1 n-ready)
|
||||
(+ 1 index))
|
||||
(begin
|
||||
(vector-set! write-vec index #f)
|
||||
(loop (cdr write-channels)
|
||||
n-ready
|
||||
(+ 1 index))))))))
|
||||
;; zero out EXCEPTION-VEC
|
||||
(let ((length (vector-length exception-vec)))
|
||||
(let loop ((i 0))
|
||||
(if (< i length)
|
||||
(begin
|
||||
(vector-set! exception-vec i #f)
|
||||
(loop (+ 1 i))))))
|
||||
|
||||
(values n-read-ready n-write-ready 0))
|
||||
(values n-read-ready n-write-ready 0)))))))))
|
||||
|
||||
;; we're vanilla SELECT
|
||||
(let ((ready-read-port/fds '())
|
||||
(ready-write-port/fds '()))
|
||||
(for-each (lambda (port/fd channel)
|
||||
(if (memq channel ready-read-channels)
|
||||
(set! ready-read-port/fds
|
||||
(cons port/fd ready-read-port/fds))))
|
||||
read-list read-channels)
|
||||
(for-each (lambda (port/fd channel)
|
||||
(if (memq channel ready-write-channels)
|
||||
(set! ready-write-port/fds
|
||||
(cons port/fd ready-write-port/fds))))
|
||||
write-list write-channels)
|
||||
|
||||
(values (list->vector (reverse ready-read-port/fds))
|
||||
(list->vector (reverse ready-write-port/fds))
|
||||
(make-vector 0))))))))))))
|
||||
|
||||
(define select (make-select #f))
|
||||
|
||||
(define select! (make-select #t))
|
||||
|
|
Loading…
Reference in New Issue