diff --git a/Makefile.in b/Makefile.in index 779e2a0..5dd65a5 100644 --- a/Makefile.in +++ b/Makefile.in @@ -225,7 +225,6 @@ scsh/network1o: scsh/network1.h scsh/flock1.o: scsh/flock1.h scsh/fdports1.o scsh/fdports.o: scsh/fdports1.h -#scsh/select1.o scsh/select.o: scsh/select1.h scsh/rx/regexp1.o: c/scheme48.h @@ -436,9 +435,6 @@ clean: clean-cig clean-scsh clean-cig: -rm -f cig/*.o $(CIG) $(CIG).image $(LIBCIG) -clean-scm2c: - rm -f #scsh/select.c - distclean: clean rm -f Makefile config.log config.status c/sysdep.h config.cache \ scsh/machine \ @@ -841,7 +837,6 @@ SCHEME =scsh/awk.scm \ # Explicitly giving the .o/.c dependency also makes it go. ############################################################ cig/libcig.c: cig/libcig.scm -#scsh/select.c: scsh/select.scm scsh/scsh: scsh/scsh-tramp.c $(CC) -o $@ $(CPPFLAGS) $(CFLAGS) \ diff --git a/c/event.h b/c/event.h index 96874e5..b2548b6 100644 --- a/c/event.h +++ b/c/event.h @@ -1,5 +1,7 @@ -enum event_enum { KEYBOARD_INTERRUPT_EVENT, IO_COMPLETION_EVENT, ALARM_EVENT, - OS_SIGNAL_EVENT, ERROR_EVENT, NO_EVENT }; +enum event_enum { KEYBOARD_INTERRUPT_EVENT, + IO_READ_COMPLETION_EVENT, IO_WRITE_COMPLETION_EVENT, + ALARM_EVENT, + OS_SIGNAL_EVENT, ERROR_EVENT, NO_EVENT }; extern bool s48_add_pending_fd(int fd, bool is_input); extern bool s48_remove_fd(int fd); diff --git a/c/unix/event.c b/c/unix/event.c index 99379a9..428a1a1 100644 --- a/c/unix/event.c +++ b/c/unix/event.c @@ -287,7 +287,7 @@ s48_stop_alarm_interrupts(void) * (queue-ready-ports) * (set! *poll-time* (+ *time* *poll-interval*)))) * (cond ((not (queue-empty? ready-ports)) - * (values (enum event-type i/o-completion) + * (values (enum event-type i/o-{read/write}-completion) * (dequeue! ready-ports))) * ((>= *current_time* *alarm-time*) * (set! *alarm-time* max-integer) @@ -302,9 +302,20 @@ s48_stop_alarm_interrupts(void) * (values (enum event-type no-event) #f)))))) */ -static bool there_are_ready_ports(void); -static int next_ready_port(void); -static int queue_ready_ports(bool wait, long seconds, long ticks); +#define FD_QUIESCENT 0 /* idle */ +#define FD_READY 1 /* I/O ready to be performed */ +#define FD_PENDING 2 /* waiting */ + +typedef struct fd_struct { + int fd, /* file descriptor */ + status; /* one of the FD_* constants */ + bool is_input; /* iff input */ + struct fd_struct *next; /* next on same queue */ +} fd_struct; + +static bool there_are_ready_ports(void); +static fd_struct *next_ready_fd_struct(void); +static int queue_ready_ports(bool wait, long seconds, long ticks); int s48_get_next_event(long *ready_fd, long *status) @@ -314,6 +325,8 @@ s48_get_next_event(long *ready_fd, long *status) */ int io_poll_status; + fd_struct *f; + /* fprintf(stderr, "[poll at %d (waiting for %d)]\n", s48_current_time, alarm_time); */ @@ -334,10 +347,14 @@ s48_get_next_event(long *ready_fd, long *status) } } if (there_are_ready_ports()) { - *ready_fd = next_ready_port(); + f = next_ready_fd_struct(); + *ready_fd = f->fd; *status = 0; /* chars read or written */ /* fprintf(stderr, "[i/o completion]\n"); */ - return (IO_COMPLETION_EVENT); + if (f->is_input) + return (IO_READ_COMPLETION_EVENT); + else + return (IO_WRITE_COMPLETION_EVENT); } if (alarm_time != -1 && s48_current_time >= alarm_time) { alarm_time = -1; @@ -364,17 +381,6 @@ s48_get_next_event(long *ready_fd, long *status) * the pending ports and move any that are ready onto the other queue and * signal an event. */ -#define FD_QUIESCENT 0 /* idle */ -#define FD_READY 1 /* I/O ready to be performed */ -#define FD_PENDING 2 /* waiting */ - -typedef struct fd_struct { - int fd, /* file descriptor */ - status; /* one of the FD_* constants */ - bool is_input; /* iff input */ - struct fd_struct *next; /* next on same queue */ -} fd_struct; - /* * A queue of fd_structs is empty iff the first field is NULL. In @@ -459,14 +465,14 @@ there_are_ready_ports(void) } -static int -next_ready_port(void) +static fd_struct * +next_ready_fd_struct(void) { fd_struct *p; p = rmque(&ready.first, &ready); p->status = FD_QUIESCENT; - return (p->fd); + return (p); } diff --git a/c/unix/fd-io.c b/c/unix/fd-io.c index 5f3ba3d..22c8083 100644 --- a/c/unix/fd-io.c +++ b/c/unix/fd-io.c @@ -111,6 +111,16 @@ bool ps_check_fd(long fd_as_long, bool is_read, long *status) return FALSE; } } } +/* + * Return TRUE if successful, and FALSE otherwise. + */ + +bool +ps_add_pending_fd(long fd_as_long, bool is_input) +{ + return s48_add_pending_fd((int) fd_as_long, is_input); +} + long ps_read_fd(long fd_as_long, char *buffer, long max, bool waitp, bool *eofp, bool *pending, long *status) @@ -201,7 +211,7 @@ long ps_abort_fd_op(long fd_as_long) { int fd = (int)fd_as_long; - + fprintf(stderr, "aborting %d\n", fd); if (!s48_remove_fd(fd)) fprintf(stderr, "Error: ps_abort_fd_op, no pending operation on fd %d\n", fd); diff --git a/c/unix/socket.c b/c/unix/socket.c index 2896c50..7acca9e 100644 --- a/c/unix/socket.c +++ b/c/unix/socket.c @@ -36,9 +36,23 @@ static s48_value s48_socket(s48_value server_p), s48_value input_p), s48_get_host_name(void); +s48_value s48_add_pending_channel (s48_value channel) +{ + int socket_fd; + + S48_CHECK_CHANNEL(channel); + socket_fd = S48_UNSAFE_EXTRACT_FIXNUM(S48_UNSAFE_CHANNEL_OS_INDEX(channel)); + + if (! s48_add_pending_fd(socket_fd, 1)) // 1 for: yes, is input + s48_raise_out_of_memory_error(); + + return S48_UNSPECIFIC; +} + /* * Install all exported functions in Scheme48. */ + void s48_init_socket(void) { @@ -50,6 +64,7 @@ s48_init_socket(void) S48_EXPORT_FUNCTION(s48_connect); S48_EXPORT_FUNCTION(s48_close_socket_half); S48_EXPORT_FUNCTION(s48_get_host_name); + S48_EXPORT_FUNCTION(s48_add_pending_channel); } /* diff --git a/scheme/interfaces.scm b/scheme/interfaces.scm index 125f08f..c334e54 100644 --- a/scheme/interfaces.scm +++ b/scheme/interfaces.scm @@ -54,6 +54,7 @@ (define-interface primitives-interface (export add-finalizer! + add-pending-channel call-external-value checked-record-ref checked-record-set! @@ -544,6 +545,7 @@ output-channel+closer->port ;big/socket.scm ; call WAIT-FOR-CHANNEL with interrupts disabled + wait-for-channels wait-for-channel ;big/socket.scm port->channel ;posix @@ -584,6 +586,7 @@ block make-ready + set-thread-cell! clear-thread-cell! spawn-on-scheduler spawn-on-root wait upcall propogate-upcall @@ -592,6 +595,7 @@ terminate-thread! wake-some-threads + register-dozer all-threads ; for command-levels diff --git a/scheme/rts-packages.scm b/scheme/rts-packages.scm index 2edfa8e..4fb954d 100644 --- a/scheme/rts-packages.scm +++ b/scheme/rts-packages.scm @@ -101,7 +101,7 @@ channels low-channels architecture code-vectors wind define-record-types - queues threads threads-internal locks + queues threads threads-internal locks cells exceptions interrupts ascii ports util session-data diff --git a/scheme/rts/channel-io.scm b/scheme/rts/channel-io.scm index 81a8dcd..77bce3b 100644 --- a/scheme/rts/channel-io.scm +++ b/scheme/rts/channel-io.scm @@ -2,33 +2,54 @@ ; Channel interrupt stuff. -; Install an interrupt handler that queues up the results of completed I/O +; Install an interrupt handler that cells up the results of completed I/O ; operations and spawn a thread to cope with them. This is written so as ; to avoid having state in top-level variables, because their values are ; saved in dumped images. (define (initialize-channel-i/o!) - (session-data-set! channel-wait-queues-slot '()) + (session-data-set! channel-wait-cells-slot '()) (session-data-set! channel-wait-count-slot 0) - (set-interrupt-handler! (enum interrupt i/o-completion) - i/o-completion-handler)) + (set-interrupt-handler! (enum interrupt i/o-read-completion) + (make-i/o-completion-handler + (lambda (cell channel) + (let ((old (cell-ref cell))) + (cell-set! cell + (cons (cons channel (car old)) + (cdr old))))))) + (set-interrupt-handler! (enum interrupt i/o-write-completion) + (make-i/o-completion-handler + (make-i/o-completion-handler + (lambda (cell channel) + (let ((old (cell-ref cell))) + (cell-set! cell + (cons (car old) + (cons channel (cdr old)))))))))) ; The warning message is printed using DEBUG-MESSAGE because to try to make ; sure it appears in spite of whatever problem's the I/O system is having. -; -; Called with interrupts disabled. -(define (i/o-completion-handler channel status enabled-interrupts) - (let ((queue (fetch-channel-wait-queue! channel))) - (if queue - (begin - (decrement-channel-wait-count!) - (make-ready (maybe-dequeue-thread! queue) status)) - (debug-message "Warning: dropping ignored channel i/o result {Channel " - (channel-os-index channel) - " " - (channel-id channel) - "}")))) +(define (make-i/o-completion-handler update-ready-cell) + ;; Called with interrupts disabled. + (lambda (channel status enabled-interrupts) + (call-with-values + (lambda () (fetch-channel-wait-cell! channel)) + (lambda (thread-cell maybe-ready-cell) + (cond + ((and thread-cell (cell-ref thread-cell)) + => (lambda (thread) + (decrement-channel-wait-count!) + (make-ready thread status))) + (else + (debug-message "Warning: dropping ignored channel i/o result {Channel " + (channel-os-index channel) + " " + (channel-id channel) + "}"))) + (cond + ((and maybe-ready-cell + (cell-ref maybe-ready-cell)) + (update-ready-cell maybe-ready-cell channel))))))) ; Exported procedure @@ -44,46 +65,114 @@ ; terminated. (define (wait-for-channel channel) - (let ((queue (fetch-channel-wait-queue! channel))) - (if queue - (begin - (add-channel-wait-queue! channel queue) - (warn "channel has two pending operations" channel) - (terminate-current-thread)) - (let ((queue (make-queue))) - (increment-channel-wait-count!) - (enqueue-thread! queue (current-thread)) - (add-channel-wait-queue! channel queue) - (dynamic-wind nothing - block - (lambda () - (disable-interrupts!) - (let ((new-queue (fetch-channel-wait-queue! channel))) - (cond ((eq? queue new-queue) - (channel-abort channel) - (wait-for-channel channel)) - (new-queue - (add-channel-wait-queue! channel new-queue))) - (enable-interrupts!)))))))) + (call-with-values + (lambda () (fetch-channel-wait-cell! channel)) + (lambda (thread-cell maybe-ready-cell) + (if (and thread-cell (cell-ref thread-cell)) + (begin + (add-channel-wait-cell! channel thread-cell #f) + (warn "channel has two pending operations" channel) + (terminate-current-thread)) + (let ((cell (make-cell (current-thread)))) + (increment-channel-wait-count!) + (set-thread-cell! (current-thread) cell) + (add-channel-wait-cell! channel cell #f) + (dynamic-wind nothing + block + (lambda () + (disable-interrupts!) + (if (cell-ref cell) + ;; we're being terminated + (begin + (fetch-channel-wait-cell! channel) + (channel-abort channel) + (wait-for-channel channel))) + (enable-interrupts!)))))))) (define (nothing) #f) +(define (channel-check-waiter channel) + (if (channel-has-waiter? channel) + (begin + (warn "channel has two pending operations" channel) + (terminate-current-thread)))) + +(define (wait-for-channels read-channels write-channels timeout) + ;; check if we're borked from the outset + (for-each channel-has-waiter? read-channels) + (for-each channel-has-waiter? write-channels) + + (let ((thread-cell (make-cell (current-thread))) + (ready-channels-cell (make-cell (cons '() '()))) + (ready-read-channels #f) + (ready-write-channels #f)) + + (if (or (not timeout) + (register-dozer thread-cell timeout)) + (begin + ;; register us with every channel we're waiting for + (set-thread-cell! (current-thread) thread-cell) + (let ((signup (lambda (channel) + (add-channel-wait-cell! channel + thread-cell ready-channels-cell) + (increment-channel-wait-count!)))) + (for-each signup read-channels) + (for-each signup write-channels)) + + ;; block + (dynamic-wind + nothing + (lambda () + (block) + (disable-interrupts!) + (let ((pair (cell-ref ready-channels-cell))) + (set! ready-read-channels (car pair)) + (set! ready-write-channels (cdr pair))) + (cell-set! ready-channels-cell #f) + (enable-interrupts!) + (values ready-read-channels ready-write-channels)) + ;; clean up + (lambda () + (let ((aborting? (and (cell-ref thread-cell) #t))) + (disable-interrupts!) + ;; this ain't so great ... + (let ((make-cleanup + (lambda (ready-channels) + (lambda (channel) + (if (memq channel ready-channels) + (begin + (fetch-channel-wait-cell! channel) + (if (not aborting?) + (decrement-channel-wait-count!) + (begin + (channel-abort channel) + (wait-for-channel channel))))))))) + (for-each (make-cleanup ready-read-channels) read-channels) + (for-each (make-cleanup ready-write-channels) write-channels)) + + (enable-interrupts!))))) + ;; the timeout was zero or less + (enable-interrupts!)))) + ; Abort any pending operation on by OWNER on CHANNEL. ; Called with interrupts disabled. (define (steal-channel! channel owner) - (let ((queue (fetch-channel-wait-queue! channel))) - (if queue - (let ((thread (maybe-dequeue-thread! queue))) - (cond ((eq? thread owner) - (decrement-channel-wait-count!) - (channel-abort channel)) - (else - (warn "channel in use by other than port owner" - channel thread owner) - (enqueue-thread! queue thread) - #f))) - #f))) + (call-with-values + (lambda () (fetch-channel-wait-cell! channel)) + (lambda (thread-cell maybe-ready-cell) + (cond + ((cell-ref thread-cell) + => (lambda (thread) + (cond ((eq? thread owner) + (clear-thread-cell! thread) + (decrement-channel-wait-count!) + (channel-abort channel)) + (else + (warn "channel in use by other than port owner" + channel thread owner) + #f)))) + (else #f))))) ; Have CHANNEL-READ and CHANNEL-WRITE wait if a pending-channel-i/o ; exception occurs. @@ -110,7 +199,7 @@ ; Two session slots ; - the number of threads waiting for I/O completion events -; - an alist mapping channels to queues for waiting threads +; - an alist mapping channels to cells for waiting threads (define channel-wait-count-slot (make-session-data-slot! 0)) @@ -123,40 +212,47 @@ (define (decrement-channel-wait-count!) (session-data-set! channel-wait-count-slot (- (channel-wait-count) 1))) -(define channel-wait-queues-slot (make-session-data-slot! '())) +(define channel-wait-cells-slot (make-session-data-slot! '())) -; Adding a queue and channel - the caller has already determined there is no -; existing queue for this channel. +; Adding a cell and channel - the caller has already determined there is no +; existing cell for this channel. -(define (add-channel-wait-queue! channel queue) - (session-data-set! channel-wait-queues-slot - (cons (cons channel queue) - (session-data-ref channel-wait-queues-slot)))) +(define (add-channel-wait-cell! channel cell maybe-ready-channels-cell) + (session-data-set! channel-wait-cells-slot + (cons (cons channel (cons cell maybe-ready-channels-cell)) + (session-data-ref channel-wait-cells-slot)))) ; This is just deleting from an a-list. -(define (fetch-channel-wait-queue! channel) - (let* ((queues (session-data-ref channel-wait-queues-slot)) - (queue (cond ((null? queues) - #f) - ((eq? channel (caar queues)) - (session-data-set! channel-wait-queues-slot - (cdr queues)) - (cdar queues)) - (else - (let loop ((queues (cdr queues)) (prev queues)) - (cond ((null? queues) - #f) - ((eq? channel (caar queues)) - (set-cdr! prev (cdr queues)) - (cdar queues)) - (else - (loop (cdr queues) queues)))))))) - (if (or (not queue) - (thread-queue-empty? queue)) - #f - queue))) - - +(define (fetch-channel-wait-cell! channel) + (let* ((cells (session-data-ref channel-wait-cells-slot)) + (cell+ready-channels-cell + (cond ((null? cells) + #f) + ((eq? channel (caar cells)) + (session-data-set! channel-wait-cells-slot + (cdr cells)) + (cdar cells)) + (else + (let loop ((cells (cdr cells)) (prev cells)) + (cond ((null? cells) + #f) + ((eq? channel (caar cells)) + (set-cdr! prev (cdr cells)) + (cdar cells)) + (else + (loop (cdr cells) cells)))))))) + (cond + (cell+ready-channels-cell + => (lambda (pair) + (let ((thread-cell (car pair)) + (ready-cell (cdr pair))) + (values thread-cell ready-cell)))) + (else + (values #f #f))))) + +(define (channel-has-waiter? channel) + (and (assq channel + (session-data-ref channel-wait-cells-slot)) + #t)) - diff --git a/scheme/rts/sleep.scm b/scheme/rts/sleep.scm index c954ba2..5970ed7 100644 --- a/scheme/rts/sleep.scm +++ b/scheme/rts/sleep.scm @@ -10,14 +10,26 @@ (let ((cell (make-cell (current-thread)))) (disable-interrupts!) (set-thread-cell! (current-thread) cell) - (set! *dozers* - (insert (cons (+ (real-time) n) - cell) - *dozers* - (lambda (frob1 frob2) - (< (car frob1) (car frob2))))) + (insert-dozer! cell n) (block)))))) +(define (register-dozer cell user-n) + (let ((n (coerce-to-nonnegative-integer user-n))) + (cond ((not n) + (call-error "wrong type argument" sleep user-n)) + ((< 0 n) + (insert-dozer! cell n) + #t) + (else #f)))) + +(define (insert-dozer! cell n) + (set! *dozers* + (insert (cons (+ (real-time) n) + cell) + *dozers* + (lambda (frob1 frob2) + (< (car frob1) (car frob2)))))) + (define (coerce-to-nonnegative-integer n) (if (real? n) (let* ((n (round n)) diff --git a/scheme/vm/arch.scm b/scheme/vm/arch.scm index fe9fbe8..68ef52f 100644 --- a/scheme/vm/arch.scm +++ b/scheme/vm/arch.scm @@ -183,6 +183,7 @@ (close-channel 1) (channel-maybe-read 5) (channel-maybe-write 4) + (add-pending-channel 2) (channel-ready? 1) (channel-abort 1) ; stop channel operation (open-channels-list) ; return a list of the open channels @@ -240,7 +241,8 @@ (alarm ; order matters - higher priority first keyboard post-gc ; handler is passed a list of finalizers - i/o-completion ; handler is passed channel and status + i/o-read-completion ; handler is passed channel and status + i/o-write-completion ; handler is passed channel and status os-signal )) diff --git a/scheme/vm/interfaces.scm b/scheme/vm/interfaces.scm index 2d96f48..0f69338 100644 --- a/scheme/vm/interfaces.scm +++ b/scheme/vm/interfaces.scm @@ -405,6 +405,7 @@ channel-read-block channel-write-block channel-abort + add-pending-channel )) (define external-call-interface diff --git a/scheme/vm/interrupt.scm b/scheme/vm/interrupt.scm index 4bf1856..b066cdc 100644 --- a/scheme/vm/interrupt.scm +++ b/scheme/vm/interrupt.scm @@ -45,7 +45,7 @@ ; For alarm interrupts the interrupted template is passed to the handler ; for use by code profilers. ; For gc interrupts we push the list of things to be finalized. -; For i/o-completion we push the channel and its status. +; For i/o-{read,write}-completion we push the channel and its status. (define (push-interrupt-args pending-interrupt) (cond ((eq? pending-interrupt (enum interrupt alarm)) @@ -58,10 +58,11 @@ (set! *finalize-these* null) (push (enter-fixnum *enabled-interrupts*)) 2) - ((eq? pending-interrupt (enum interrupt i/o-completion)) + ((or (eq? pending-interrupt (enum interrupt i/o-read-completion)) + (eq? pending-interrupt (enum interrupt i/o-write-completion))) (let ((channel (dequeue-channel!))) (if (not (channel-queue-empty?)) - (note-interrupt! (enum interrupt i/o-completion))) + (note-interrupt! pending-interrupt)) (push channel) (push (channel-os-status channel)) (push (enter-fixnum *enabled-interrupts*)) @@ -232,9 +233,12 @@ (interrupt-bit (enum interrupt alarm))) ((eq? event (enum events keyboard-interrupt-event)) (interrupt-bit (enum interrupt keyboard))) - ((eq? event (enum events io-completion-event)) + ((eq? event (enum events io-read-completion-event)) (enqueue-channel! channel status) - (interrupt-bit (enum interrupt i/o-completion))) + (interrupt-bit (enum interrupt i/o-read-completion))) + ((eq? event (enum events io-write-completion-event)) + (enqueue-channel! channel status) + (interrupt-bit (enum interrupt i/o-write-completion))) ((eq? event (enum events os-signal-event)) (interrupt-bit (enum interrupt os-signal))) ((eq? event (enum events no-event)) diff --git a/scheme/vm/prim-io.scm b/scheme/vm/prim-io.scm index 3e225fb..c65cd19 100644 --- a/scheme/vm/prim-io.scm +++ b/scheme/vm/prim-io.scm @@ -190,6 +190,11 @@ (lambda (buffer start count channel key) (do-it buffer start count #f channel key)))) +(define-primitive add-pending-channel (channel-> boolean->) + (lambda (channel input?) + (add-pending-channel (extract-channel channel) input?)) + return-boolean) + (define-primitive channel-abort (channel->) (lambda (channel) (goto return (vm-channel-abort channel)))) diff --git a/scheme/vm/ps-channel.scm b/scheme/vm/ps-channel.scm index 4050375..8f6d871 100644 --- a/scheme/vm/ps-channel.scm +++ b/scheme/vm/ps-channel.scm @@ -64,7 +64,7 @@ ; PENDING? - true if the operation cannot complete immediately ; STATUS - from an enumeration defined as part of Pre-Scheme ; -; Pending i/o operations produce i/o-completion events when they're done. +; Pending i/o operations produce i/o-{read,write}-completion events when they're done. (define channel-read-block (external "ps_read_fd" @@ -77,6 +77,11 @@ (define channel-abort (external "ps_abort_fd_op" (=> (integer) integer))) +; Checking a channel for data + +(define add-pending-channel + (external "ps_add_pending_fd" (=> (integer boolean) boolean))) + ;---------------------------------------------------------------- ; Asynchronous external events @@ -84,7 +89,8 @@ (define-external-enumeration events (keyboard-interrupt-event ; user interrupt - io-completion-event ; a pending i/o operation completed + io-read-completion-event ; a pending read operation completed + io-write-completion-event ; a pending write operation completed alarm-event ; scheduled interrupt os-signal-event ; some OS signal of no interest to the VM occured error-event ; OS error occurred @@ -101,9 +107,10 @@ (define pending-event? (external "pending_eventp" (=> () boolean))) -; Returns the next event. The second return value is the FD for i/o-completion -; events and the third is the status for i/o-completion and error events. -; (currently this is always zero for i/o-completions). +; Returns the next event. The second return value is the FD for +; i/o-{read,write}-completion events and the third is the status for +; i/o-{read,write}-completion and error events. (currently this is +; always zero for i/o-{read,write}-completions). (define get-next-event (external "s48_get_next_event" (=> () integer integer integer))) diff --git a/scheme/vm/s48-channel.scm b/scheme/vm/s48-channel.scm index 143a873..6833207 100644 --- a/scheme/vm/s48-channel.scm +++ b/scheme/vm/s48-channel.scm @@ -130,7 +130,8 @@ (define-enumeration events (keyboard-interrupt-event - io-completion-event + io-read-completion-event + io-write-completion-event alarm-event os-signal-event error-event diff --git a/scheme/vm/vmio.scm b/scheme/vm/vmio.scm index 4c8a2bf..34f9358 100644 --- a/scheme/vm/vmio.scm +++ b/scheme/vm/vmio.scm @@ -210,7 +210,7 @@ res)))) ;---------------------------------------------------------------- -; Handling i/o-completion interrupts +; Handling i/o-{read,write}-completion interrupts ; Currently, because the GC may move buffers, strings, etc. around, the OS ; must buffer the data while waiting for i/o to complete. ; diff --git a/scsh/newports.scm b/scsh/newports.scm index 986275a..0776c34 100644 --- a/scsh/newports.scm +++ b/scsh/newports.scm @@ -730,4 +730,179 @@ ;;; replace rts/channel-port.scm end - \ No newline at end of file +;;; select +;;; ----- + +(define (port/fdes->port port/fd) + (if (port? port/fd) + port/fd + (fdes->inport port/fd))) ; #### + +(define (port/fdes-ready? port/fd) + (let ((port (port/fdes->port port/fd))) + ((port-handler-ready? (port-handler port)) port))) + +(define (any-ready port/fds) + (let loop ((port/fds port/fds)) + (if (null? port/fds) + '() + (let ((port/fd (car port/fds))) + (if (port/fdes-ready? port/fd) + ;; one is ready, get them all + (let loop ((rest (cdr port/fds)) + (ready (list port/fd))) + (cond + ((null? rest) (reverse ready)) + ((port/fdes-ready? (car rest)) + (loop (cdr rest) (cons (car rest) ready))) + (else + (loop (cdr rest) ready)))) + (loop (cdr port/fds))))))) + +(define (port/fdes-check-unlocked port/fd) + (if (port-locked? (port/fdes->port port/fd)) + (begin + ((structure-ref interrupts enable-interrupts!)) + (error "SELECT on port with pending operation" + port/fd)))) + +(define (port/fdes->channel port/fd) + (fdport-data:channel + (fdport-data + (port/fdes->port port/fd)))) + +;; this is way too epic and probably should just be split up once the +;; dust has settled ---Mike + +(define (make-select !?) + (lambda (read-vec write-vec exception-vec . maybe-timeout) + (let ((read-list (vector->list read-vec)) + (write-list (vector->list write-vec))) + + ((structure-ref interrupts disable-interrupts!)) + + (for-each port/fdes-check-unlocked read-list) + (for-each port/fdes-check-unlocked write-list) + + (let ((any-read (any-ready read-list)) + (any-write (any-ready write-list))) + (if (or (pair? any-read) (pair? any-write)) + (begin + ((structure-ref interrupts enable-interrupts!)) + (if !? ; we're SELECT! + (let ((n-read-ready + (let ((length (vector-length read-vec))) + (let loop ((i 0) (n 0)) + (cond + ((= i length) n) + ((memq (vector-ref read-vec i) any-read) + (loop (+ 1 i) (+ 1 n))) + (else + (vector-set! read-vec i #f) + (loop (+ 1 i) n)))))) + (n-write-ready + (let ((length (vector-length write-vec))) + (let loop ((i 0) (n 0)) + (cond + ((= i length) n) + ((memq (vector-ref write-vec i) any-write) + (loop (+ 1 i) (+ 1 n))) + (else + (vector-set! write-vec i #f) + (loop (+ 1 i) n))))))) + + ;; zero out EXCEPTION-VEC + (let ((length (vector-length exception-vec))) + (let loop ((i 0)) + (if (< i length) + (begin + (vector-set! exception-vec i #f) + (loop (+ 1 i)))))) + + (values n-read-ready n-write-ready 0)) + ;; we're vanilla SELECT + (values (list->vector any-read) + (list->vector any-write) + (make-vector 0)))) + + ;; we need to block + (let ((read-channels (map port/fdes->channel read-list)) + (write-channels (map port/fdes->channel write-list))) + + (for-each (lambda (channel) + (add-pending-channel channel #t)) + read-channels) + + (for-each (lambda (channel) + (add-pending-channel channel #f)) + write-channels) + + (call-with-values + (lambda () + (apply wait-for-channels read-channels write-channels maybe-timeout)) + ;; re-enables interrupts + (lambda (ready-read-channels ready-write-channels) + + ;; too many free variables ... + (if !? ; we're SELECT! + (let ((n-read-ready + (let loop ((read-channels read-channels) + (n-ready 0) + (index 0)) + (if (null? read-channels) + n-ready + (if (memq (car read-channels) ready-read-channels) + (loop (cdr read-channels) + (+ 1 n-ready) + (+ 1 index)) + (begin + (vector-set! read-vec index #f) + (loop (cdr read-channels) + n-ready + (+ 1 index))))))) + (n-write-ready + (let loop ((write-channels write-channels) + (n-ready 0) + (index 0)) + (if (null? write-channels) + n-ready + (if (memq (car write-channels) ready-write-channels) + (loop (cdr write-channels) + (+ 1 n-ready) + (+ 1 index)) + (begin + (vector-set! write-vec index #f) + (loop (cdr write-channels) + n-ready + (+ 1 index)))))))) + ;; zero out EXCEPTION-VEC + (let ((length (vector-length exception-vec))) + (let loop ((i 0)) + (if (< i length) + (begin + (vector-set! exception-vec i #f) + (loop (+ 1 i)))))) + + (values n-read-ready n-write-ready 0)) + + ;; we're vanilla SELECT + (let ((ready-read-port/fds '()) + (ready-write-port/fds '())) + (for-each (lambda (port/fd channel) + (if (memq channel ready-read-channels) + (set! ready-read-port/fds + (cons port/fd ready-read-port/fds)))) + read-list read-channels) + (for-each (lambda (port/fd channel) + (if (memq channel ready-write-channels) + (set! ready-write-port/fds + (cons port/fd ready-write-port/fds)))) + write-list write-channels) + + (values (list->vector (reverse ready-read-port/fds)) + (list->vector (reverse ready-write-port/fds)) + (make-vector 0)))))))))))) + +(define select (make-select #f)) + +(define select! (make-select #t)) diff --git a/scsh/scsh-interfaces.scm b/scsh/scsh-interfaces.scm index 9f5a0b8..46e74d8 100644 --- a/scsh/scsh-interfaces.scm +++ b/scsh/scsh-interfaces.scm @@ -222,6 +222,9 @@ read-string! read-string/partial read-string!/partial + + select select! + (write-string (proc (:string &opt :value :exact-integer :exact-integer) :unspecific)) write-string/partial))) diff --git a/scsh/select.c b/scsh/select.c deleted file mode 100644 index 22d06f4..0000000 --- a/scsh/select.c +++ /dev/null @@ -1,61 +0,0 @@ -/* This is an Scheme48/C interface file, -** automatically generated by a hacked version of cig 3.0. -step 4 -*/ - -#include -#include /* For malloc. */ -#include "libcig.h" - -/* Make sure foreign-function stubs interface to the C funs correctly: */ -#include "select1.h" - -s48_value df_select_copyback(s48_value g1, s48_value g2, s48_value g3, s48_value g4, s48_value mv_vec) -{ - extern s48_value select_copyback(s48_value , s48_value , s48_value , s48_value , int *, int *, int *); - s48_value ret1 = S48_FALSE; - S48_DECLARE_GC_PROTECT(2); - s48_value r1; - int r2 = 0; - int r3 = 0; - int r4 = 0; - - - - S48_GC_PROTECT_2(mv_vec,ret1); - r1 = select_copyback(g1, g2, g3, g4, &r2, &r3, &r4); - ret1 = r1; - S48_VECTOR_SET(mv_vec,0,s48_enter_fixnum(r2)); - S48_VECTOR_SET(mv_vec,1,s48_enter_fixnum(r3)); - S48_VECTOR_SET(mv_vec,2,s48_enter_fixnum(r4)); - S48_GC_UNPROTECT(); - return ret1; -} - -s48_value df_select_filter(s48_value g1, s48_value g2, s48_value g3, s48_value g4, s48_value mv_vec) -{ - extern s48_value select_filter(s48_value , s48_value , s48_value , s48_value , int *, int *, int *); - s48_value ret1 = S48_FALSE; - S48_DECLARE_GC_PROTECT(2); - s48_value r1; - int r2 = 0; - int r3 = 0; - int r4 = 0; - - - - S48_GC_PROTECT_2(mv_vec,ret1); - r1 = select_filter(g1, g2, g3, g4, &r2, &r3, &r4); - ret1 = r1; - S48_VECTOR_SET(mv_vec,0,s48_enter_fixnum(r2)); - S48_VECTOR_SET(mv_vec,1,s48_enter_fixnum(r3)); - S48_VECTOR_SET(mv_vec,2,s48_enter_fixnum(r4)); - S48_GC_UNPROTECT(); - return ret1; -} - -void s48_init_select(void) -{ - S48_EXPORT_FUNCTION(df_select_copyback); - S48_EXPORT_FUNCTION(df_select_filter); -} diff --git a/scsh/select.scm b/scsh/select.scm deleted file mode 100644 index ee53fb6..0000000 --- a/scsh/select.scm +++ /dev/null @@ -1,193 +0,0 @@ -;;; select(2) syscall for scsh. -*- Scheme -*- -;;; Copyright (c) 1995 by Olin Shivers. - -(foreign-init-name "select") - - -(foreign-source - "/* Make sure foreign-function stubs interface to the C funs correctly: */" - "#include \"select1.h\"" - "" "") - -;;; TIMEOUT is 0 for immediate, >0 for timeout, #f for infinite; -;;; default is #f. -;;; The sets are vectors of file descriptors & fd ports. -;;; You get three new vectors back. - -; The following routines copy ports to fd's, and copy fd's back to fd's and -; ports, so that select can take numbers and ports, simultaneously. - -; This is a C procedure in scheme. So sue me. At least it's tail-recursive -(define (fd-filter filter-me) - (let* ((len (vector-length filter-me)) - (vector-to-return (make-vector len))) - (let loop ((count (- len 1))) - (if (>= count 0) - (let ((ref (vector-ref filter-me count))) - (if (integer? ref) - (vector-set! vector-to-return count ref) - (vector-set! vector-to-return count (port->fdes ref))) - (loop (- count 1))))) - vector-to-return)) - -; ! means side-effect, the next one is more functional. -(define (fd-copyback! orig form) - (let loop ((count (- (vector-length orig) 1))) - (if (>= count 0) - (begin - (if (not (vector-ref form count)) - (vector-set! orig count #f) - (vector-set! form count (vector-ref orig count))) - (loop (- count 1))))) - orig) - -(define (fd-copyback orig form) - (let* ((len (vector-length orig)) - (vector-to-return (make-vector len #f))) - (let loop ((count (- len 1))) - (if (>= count 0) - (begin - (if (vector-ref form count) - (vector-set! vector-to-return count (vector-ref orig count))) - (loop (- count 1))))) - vector-to-return)) - -(define (select read-vec write-vec exception-vec . maybe-timeout) - (let ((rv (copy-vector read-vec)) - (wv (copy-vector write-vec)) - (ev (copy-vector exception-vec))) - (receive (nr nw ne) (apply select! rv wv ev maybe-timeout) - (values (vector-take-form read-vec rv nr) - (vector-take-form write-vec wv nw) - (vector-take-form exception-vec ev ne))))) - - -(define (select!/copyback read-vec write-vec exception-vec . maybe-timeout) - (receive (errno nr nw ne) - (apply select!/copyback/errno read-vec write-vec exception-vec - maybe-timeout) - (if errno - (apply errno-error errno select!/copyback - read-vec write-vec exception-vec maybe-timeout) - (values nr nw ne)))) - - -(define (select!/copyback/errno read-vec write-vec - exception-vec . maybe-timeout) - (let ((timeout (and (pair? maybe-timeout) - (if (pair? (cdr maybe-timeout)) - (apply error "Too many arguments" - select!/copyback/errno - read-vec write-vec exception-vec - maybe-timeout) - (real->exact-integer (check-arg real? - (car maybe-timeout) - select!/copyback/errno))))) - - (vec-ok? (lambda (v) - (vector-every? (lambda (elt) - (or (and (integer? elt) (>= elt 0)) - (fdport? elt))) - v)))) - ;; Type-check input vectors. - (check-arg vec-ok? read-vec select!/copyback/errno) - (check-arg vec-ok? write-vec select!/copyback/errno) - (check-arg vec-ok? exception-vec select!/copyback/errno) - (check-arg (lambda (x) (or (not x) (integer? x))) timeout - select!/copyback/errno) - - (let ((prop-read-vec (fd-filter read-vec)) - (prop-write-vec (fd-filter write-vec)) - (prop-exception-vec (fd-filter exception-vec))) - (let lp () - (receive (errno nr nw ne) - (%select/copyback/errno prop-read-vec prop-write-vec prop-exception-vec timeout) - (if (and errno (= errno errno/intr)) ; Retry on interrupts. - (lp) - (values errno - (fd-copyback read-vec nr) - (fd-copyback write-vec nw) - (fd-copyback exception-vec ne)))))))) - - -(define-foreign %select/copyback/errno - (select_copyback (vector-desc rvec) - (vector-desc wvec) - (vector-desc evec) - (desc nsecs)) ; Integer or #f for infinity. - desc ; errno or #f - fixnum ; nread - number of hits in RVEC - fixnum ; nwrite - number of hits in WVEC - fixnum) ; nexcept - number of hits in EVEC - - -(define (vector-take-form vec form nelts) - (let ((short (make-vector nelts))) - (do ((i (- (vector-length vec) 1) (- i 1))) - ((< i 0)) - (if (vector-ref form i) - (begin - (set! nelts (- nelts 1)) - (vector-set! short nelts (vector-ref vec i))))) - short)) - - -;;; SELECT! -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;;; The side-effecting variant. To be documented. - -(define (select! read-vec write-vec exception-vec . maybe-timeout) - (receive (errno nr nw ne) - (apply select!/errno read-vec write-vec exception-vec maybe-timeout) - (if errno - (apply errno-error errno select! read-vec write-vec exception-vec - maybe-timeout) - (values nr nw ne)))) - -(define (select!/errno read-vec write-vec exception-vec . maybe-timeout) - (let ((timeout (and (pair? maybe-timeout) - (if (pair? (cdr maybe-timeout)) - (apply error "Too many arguments" - select!/copyback/errno - read-vec write-vec exception-vec - maybe-timeout) - (real->exact-integer (check-arg real? - (car maybe-timeout) - select!/copyback/errno))))) - - (vec-ok? (lambda (v) - (vector-every? (lambda (elt) - (or (and (integer? elt) (>= elt 0)) - (not elt) - (fdport? elt))) - v)))) - ;; Type-check input vectors. - (check-arg vec-ok? read-vec select!/errno) - (check-arg vec-ok? write-vec select!/errno) - (check-arg vec-ok? exception-vec select!/errno) - (check-arg (lambda (x) (or (not x) (integer? x))) timeout select!/errno) - - (let ((prop-read-vec (fd-filter read-vec)) - (prop-write-vec (fd-filter write-vec)) - (prop-exception-vec (fd-filter exception-vec))) - (let lp () - (receive (errno nr nw ne) - (%select!/errno prop-read-vec prop-write-vec prop-exception-vec timeout) - (if (and errno (= errno errno/intr)) ; Retry on interrupts. - (lp) - (begin - (fd-copyback! read-vec prop-read-vec) - (fd-copyback! write-vec prop-write-vec) - (fd-copyback! exception-vec prop-exception-vec) - (values errno nr nw ne)))))))) - - -(define-foreign %select!/errno - (select_filter (vector-desc rvec) - (vector-desc wvec) - (vector-desc evec) - (desc nsecs)) ; Integer or #f for infinity. - desc ; errno or #f - fixnum ; nread - number of hits in RVEC - fixnum ; nwrite - number of hits in WVEC - fixnum) ; nexcept - number of hits in EVEC diff --git a/scsh/select1.c b/scsh/select1.c deleted file mode 100644 index 984af8e..0000000 --- a/scsh/select1.c +++ /dev/null @@ -1,223 +0,0 @@ -/* C support for scsh select call. -** Copyright (c) 1995 by Olin Shivers. -*/ - -#include "sysdep.h" - -#include -#if defined(HAVE_SYS_SELECT_H) -# include -#endif -#include - -#include -#include - -#include "cstuff.h" -#include "fdports.h" /* Accessors for Scheme I/O port internals. */ - -/* Make sure our exports match up w/the implementation: */ -#include "select1.h" - -/* the traditional sleazy max non-function. */ -#define max(a,b) (((a) > (b)) ? (a) : (b)) - -extern int errno; - -static void or2_fdset(fd_set *x, fd_set *y, int max_elt); -static int copyback_fdvec(s48_value portvec, fd_set *fdset); - -/* RVEC, WVEC, and EVEC are Scheme vectors of integer file descriptors, -** I/O ports, and #f's. NSECS is an integer timeout value, or #f for -** infinite wait. Do the select() call, returning result fd_sets in the -** passed pointers. Return 0 for OK, otherwise error is in errno. -*/ - -int do_select(s48_value rvec, s48_value wvec, - s48_value evec, s48_value nsecs, - fd_set *rset_ans, fd_set *wset_ans, fd_set *eset_ans) -{ - struct timeval timeout, *tptr; - fd_set rset_bufrdy, wset_bufrdy, eset_bufrdy; /* Buffered port hits. */ - int rbuf_rdy=0, wbuf_rdy=0, bufrdy; /* Set if we find buffered I/O hits. */ - int max_fd = -1; /* Max fdes in the sets. */ - int nelts, i; - int nfound; - - FD_ZERO(rset_ans); FD_ZERO(wset_ans); FD_ZERO(eset_ans); - FD_ZERO(&rset_bufrdy); FD_ZERO(&wset_bufrdy); FD_ZERO(&eset_bufrdy); - - /* Scan the readvec elts. */ - nelts = S48_VECTOR_LENGTH(rvec); - for(i=nelts; --i >= 0; ) { - s48_value elt = S48_VECTOR_REF(rvec,i); - int fd; - - fd = s48_extract_fixnum(elt); - FD_SET(fd, rset_ans); - - max_fd = max(max_fd, fd); - } - - /* Scan the writevec elts. */ - nelts = S48_VECTOR_LENGTH(wvec); - for(i=nelts; --i >= 0; ) { - s48_value elt = S48_VECTOR_REF(wvec,i); - int fd; - - fd = s48_extract_fixnum(elt); - FD_SET(fd, wset_ans); - - max_fd = max(max_fd, fd); - } - - /* Scan the exception-vec elts. */ - nelts = S48_VECTOR_LENGTH(evec); - for(i=nelts; --i >= 0; ) { - s48_value elt = S48_VECTOR_REF(evec,i); - int fd; - - fd = s48_extract_fixnum(elt); - FD_SET(fd, eset_ans); - - max_fd = max(max_fd, fd); - } - - bufrdy = rbuf_rdy || wbuf_rdy; - if( bufrdy ) { /* Already have some hits on buffered ports, */ - timeout.tv_sec = 0; /* so we only poll the others. */ - timeout.tv_usec = 0; - tptr = &timeout; - } - else if ( S48_FIXNUM_P(nsecs) ) { - timeout.tv_sec = s48_extract_fixnum(nsecs); /* Wait n seconds. */ - timeout.tv_usec = 0; - tptr = &timeout; - } - else tptr = NULL; /* #f => Infinite wait. */ - - /* select1() is defined in sysdep.h -- bogus compatibility macro. */ - nfound = select(max_fd+1, rset_ans, wset_ans, eset_ans, tptr); /* Do it.*/ - - /* EINTR is not an error return if we have hits on buffered ports - ** to report. - */ - if( nfound < 0 ) - if ( errno != EINTR || !bufrdy ) return -1; - else { /* EINTR, but we have hits on buffered ports to report. */ - FD_ZERO(rset_ans); /* This should never happen -- */ - FD_ZERO(wset_ans); /* EINTR on a zero-sec select() */ - FD_ZERO(eset_ans); /* -- but I'm paranoid. */ - } - - /* OR together the buffered-io ready sets and the fd ready sets. */ - if( rbuf_rdy ) or2_fdset(rset_ans, &rset_bufrdy, max_fd); - if( wbuf_rdy ) or2_fdset(wset_ans, &wset_bufrdy, max_fd); - - return 0; - } - - - -/* x = x or y */ -static void or2_fdset(fd_set *x, fd_set *y, int max_elt) -{ - int i; - for(i=max_elt+1; --i >= 0;) - if( FD_ISSET(i,y) ) FD_SET(i,x); - } - - - -/* PORTVEC is a vector of integer file descriptors and Scheme ports. -** Scan over the vector, and copy any elt whose file descriptor is in FDSET -** to the front of the vector. Return the number of elts thus copied. -*/ -static int copyback_fdvec(s48_value portvec, fd_set *fdset) -{ - int vlen = S48_VECTOR_LENGTH(portvec); - int i, j=0; - for( i = -1; ++i < vlen; ) { - s48_value elt = S48_VECTOR_REF(portvec, i); - int fd = s48_extract_fixnum((S48_FIXNUM_P(elt)) ? elt : (1 / 0)); - /* JMG *PortFd(elt));*/ - if( FD_ISSET(fd,fdset) ) { - FD_CLR(fd,fdset); /* In case luser put elt in multiple times. */ - S48_VECTOR_SET(portvec, j, elt); - j++; - } - } - return j; - } - - -/* Overwrite every inactive element in the vector with #f; -** Return count of active elements. -*/ - -static int clobber_inactives(s48_value portvec, fd_set *fdset) -{ - int count = 0; - int i = S48_VECTOR_LENGTH(portvec); - - while( --i >= 0 ) { - s48_value elt = S48_VECTOR_REF(portvec, i); - if( elt != S48_FALSE ) { - int fd = s48_extract_fixnum((S48_FIXNUM_P(elt)) ? elt : (1/0)); /* JMG *PortFd(elt));*/ - if( FD_ISSET(fd,fdset) ) { - FD_CLR(fd,fdset); /* In case luser put elt in multiple times. */ - ++count; - } - else S48_VECTOR_SET(portvec, i, S48_FALSE); /* Clobber. */ - } - } - return count; - } - - -/* These two functions are the entry points to this file. -********************************************************* -*/ - -/* Copy active elts back to the front of their vector; -** Return error indicator & number of hits for each vector. -*/ - -s48_value select_copyback(s48_value rvec, s48_value wvec, - s48_value evec, s48_value nsecs, - int *r_numrdy, int *w_numrdy, int *e_numrdy) -{ - fd_set rset, wset, eset; - - if( do_select(rvec, wvec, evec, nsecs, &rset, &wset, &eset) ) { - *r_numrdy = *w_numrdy = *e_numrdy = 0; - return s48_enter_fixnum(errno); - } - - *r_numrdy = copyback_fdvec(rvec, &rset); - *w_numrdy = copyback_fdvec(wvec, &wset); - *e_numrdy = copyback_fdvec(evec, &eset); - return S48_FALSE; - } - - -/* Overwrite non-active elements in the vectors with #f; -** return error indicator & number of hits for each vector. -*/ - -s48_value select_filter(s48_value rvec, s48_value wvec, - s48_value evec, s48_value nsecs, - int *r_numrdy, int *w_numrdy, int *e_numrdy) -{ - fd_set rset, wset, eset; - - if( do_select(rvec, wvec, evec, nsecs, &rset, &wset, &eset) ) { - *r_numrdy = *w_numrdy = *e_numrdy = 0; - return s48_enter_fixnum(errno); - } - - *r_numrdy = clobber_inactives(rvec, &rset); - *w_numrdy = clobber_inactives(wvec, &wset); - *e_numrdy = clobber_inactives(evec, &eset); - return S48_FALSE; - } diff --git a/scsh/select1.h b/scsh/select1.h deleted file mode 100644 index 1a91028..0000000 --- a/scsh/select1.h +++ /dev/null @@ -1,9 +0,0 @@ -/* Exports from select1.c. */ - -s48_value select_copyback(s48_value rvec, s48_value wvec, - s48_value evec, s48_value nsecs, - int *r_numrdy, int *w_numrdy, int *e_numrdy); - -s48_value select_filter(s48_value rvec, s48_value wvec, - s48_value evec, s48_value nsecs, - int *r_numrdy, int *w_numrdy, int *e_numrdy);