First stab at implementing SELECT/SELECT!:

- extend the VM interrupts to distinguish between read and write
  events
- add new ADD-PENDING-CHANNEL instruction to the VM
- add WAIT-FOR-CHANNELS procedure to the run-time system
- implement SELECT and SELECT! on top of that in newports.scm

This runs some basic tests, but in general should be considered
largely untested.

Moreover, SELECT/SELECT! never detect any exceptional conditions---the
returned vectors are always empty.  This is because the VM doesn't
really track those, and it's unclear whether it would be worth the
effort.
This commit is contained in:
sperber 2002-08-20 14:03:01 +00:00
parent ac343ba970
commit 33c14d7901
22 changed files with 469 additions and 617 deletions

View File

@ -225,7 +225,6 @@ scsh/network1o: scsh/network1.h
scsh/flock1.o: scsh/flock1.h
scsh/fdports1.o scsh/fdports.o: scsh/fdports1.h
#scsh/select1.o scsh/select.o: scsh/select1.h
scsh/rx/regexp1.o: c/scheme48.h
@ -436,9 +435,6 @@ clean: clean-cig clean-scsh
clean-cig:
-rm -f cig/*.o $(CIG) $(CIG).image $(LIBCIG)
clean-scm2c:
rm -f #scsh/select.c
distclean: clean
rm -f Makefile config.log config.status c/sysdep.h config.cache \
scsh/machine \
@ -841,7 +837,6 @@ SCHEME =scsh/awk.scm \
# Explicitly giving the .o/.c dependency also makes it go.
############################################################
cig/libcig.c: cig/libcig.scm
#scsh/select.c: scsh/select.scm
scsh/scsh: scsh/scsh-tramp.c
$(CC) -o $@ $(CPPFLAGS) $(CFLAGS) \

View File

@ -1,5 +1,7 @@
enum event_enum { KEYBOARD_INTERRUPT_EVENT, IO_COMPLETION_EVENT, ALARM_EVENT,
OS_SIGNAL_EVENT, ERROR_EVENT, NO_EVENT };
enum event_enum { KEYBOARD_INTERRUPT_EVENT,
IO_READ_COMPLETION_EVENT, IO_WRITE_COMPLETION_EVENT,
ALARM_EVENT,
OS_SIGNAL_EVENT, ERROR_EVENT, NO_EVENT };
extern bool s48_add_pending_fd(int fd, bool is_input);
extern bool s48_remove_fd(int fd);

View File

@ -287,7 +287,7 @@ s48_stop_alarm_interrupts(void)
* (queue-ready-ports)
* (set! *poll-time* (+ *time* *poll-interval*))))
* (cond ((not (queue-empty? ready-ports))
* (values (enum event-type i/o-completion)
* (values (enum event-type i/o-{read/write}-completion)
* (dequeue! ready-ports)))
* ((>= *current_time* *alarm-time*)
* (set! *alarm-time* max-integer)
@ -302,9 +302,20 @@ s48_stop_alarm_interrupts(void)
* (values (enum event-type no-event) #f))))))
*/
static bool there_are_ready_ports(void);
static int next_ready_port(void);
static int queue_ready_ports(bool wait, long seconds, long ticks);
#define FD_QUIESCENT 0 /* idle */
#define FD_READY 1 /* I/O ready to be performed */
#define FD_PENDING 2 /* waiting */
typedef struct fd_struct {
int fd, /* file descriptor */
status; /* one of the FD_* constants */
bool is_input; /* iff input */
struct fd_struct *next; /* next on same queue */
} fd_struct;
static bool there_are_ready_ports(void);
static fd_struct *next_ready_fd_struct(void);
static int queue_ready_ports(bool wait, long seconds, long ticks);
int
s48_get_next_event(long *ready_fd, long *status)
@ -314,6 +325,8 @@ s48_get_next_event(long *ready_fd, long *status)
*/
int io_poll_status;
fd_struct *f;
/*
fprintf(stderr, "[poll at %d (waiting for %d)]\n", s48_current_time, alarm_time);
*/
@ -334,10 +347,14 @@ s48_get_next_event(long *ready_fd, long *status)
}
}
if (there_are_ready_ports()) {
*ready_fd = next_ready_port();
f = next_ready_fd_struct();
*ready_fd = f->fd;
*status = 0; /* chars read or written */
/* fprintf(stderr, "[i/o completion]\n"); */
return (IO_COMPLETION_EVENT);
if (f->is_input)
return (IO_READ_COMPLETION_EVENT);
else
return (IO_WRITE_COMPLETION_EVENT);
}
if (alarm_time != -1 && s48_current_time >= alarm_time) {
alarm_time = -1;
@ -364,17 +381,6 @@ s48_get_next_event(long *ready_fd, long *status)
* the pending ports and move any that are ready onto the other queue and
* signal an event.
*/
#define FD_QUIESCENT 0 /* idle */
#define FD_READY 1 /* I/O ready to be performed */
#define FD_PENDING 2 /* waiting */
typedef struct fd_struct {
int fd, /* file descriptor */
status; /* one of the FD_* constants */
bool is_input; /* iff input */
struct fd_struct *next; /* next on same queue */
} fd_struct;
/*
* A queue of fd_structs is empty iff the first field is NULL. In
@ -459,14 +465,14 @@ there_are_ready_ports(void)
}
static int
next_ready_port(void)
static fd_struct *
next_ready_fd_struct(void)
{
fd_struct *p;
p = rmque(&ready.first, &ready);
p->status = FD_QUIESCENT;
return (p->fd);
return (p);
}

View File

@ -111,6 +111,16 @@ bool ps_check_fd(long fd_as_long, bool is_read, long *status)
return FALSE; } }
}
/*
* Return TRUE if successful, and FALSE otherwise.
*/
bool
ps_add_pending_fd(long fd_as_long, bool is_input)
{
return s48_add_pending_fd((int) fd_as_long, is_input);
}
long
ps_read_fd(long fd_as_long, char *buffer, long max, bool waitp,
bool *eofp, bool *pending, long *status)
@ -201,7 +211,7 @@ long
ps_abort_fd_op(long fd_as_long)
{
int fd = (int)fd_as_long;
fprintf(stderr, "aborting %d\n", fd);
if (!s48_remove_fd(fd))
fprintf(stderr, "Error: ps_abort_fd_op, no pending operation on fd %d\n",
fd);

View File

@ -36,9 +36,23 @@ static s48_value s48_socket(s48_value server_p),
s48_value input_p),
s48_get_host_name(void);
s48_value s48_add_pending_channel (s48_value channel)
{
int socket_fd;
S48_CHECK_CHANNEL(channel);
socket_fd = S48_UNSAFE_EXTRACT_FIXNUM(S48_UNSAFE_CHANNEL_OS_INDEX(channel));
if (! s48_add_pending_fd(socket_fd, 1)) // 1 for: yes, is input
s48_raise_out_of_memory_error();
return S48_UNSPECIFIC;
}
/*
* Install all exported functions in Scheme48.
*/
void
s48_init_socket(void)
{
@ -50,6 +64,7 @@ s48_init_socket(void)
S48_EXPORT_FUNCTION(s48_connect);
S48_EXPORT_FUNCTION(s48_close_socket_half);
S48_EXPORT_FUNCTION(s48_get_host_name);
S48_EXPORT_FUNCTION(s48_add_pending_channel);
}
/*

View File

@ -54,6 +54,7 @@
(define-interface primitives-interface
(export add-finalizer!
add-pending-channel
call-external-value
checked-record-ref
checked-record-set!
@ -544,6 +545,7 @@
output-channel+closer->port ;big/socket.scm
; call WAIT-FOR-CHANNEL with interrupts disabled
wait-for-channels
wait-for-channel ;big/socket.scm
port->channel ;posix
@ -584,6 +586,7 @@
block
make-ready
set-thread-cell! clear-thread-cell!
spawn-on-scheduler spawn-on-root
wait
upcall propogate-upcall
@ -592,6 +595,7 @@
terminate-thread!
wake-some-threads
register-dozer
all-threads ; for command-levels

View File

@ -101,7 +101,7 @@
channels low-channels
architecture code-vectors wind
define-record-types
queues threads threads-internal locks
queues threads threads-internal locks cells
exceptions interrupts
ascii ports util
session-data

View File

@ -2,33 +2,54 @@
; Channel interrupt stuff.
; Install an interrupt handler that queues up the results of completed I/O
; Install an interrupt handler that cells up the results of completed I/O
; operations and spawn a thread to cope with them. This is written so as
; to avoid having state in top-level variables, because their values are
; saved in dumped images.
(define (initialize-channel-i/o!)
(session-data-set! channel-wait-queues-slot '())
(session-data-set! channel-wait-cells-slot '())
(session-data-set! channel-wait-count-slot 0)
(set-interrupt-handler! (enum interrupt i/o-completion)
i/o-completion-handler))
(set-interrupt-handler! (enum interrupt i/o-read-completion)
(make-i/o-completion-handler
(lambda (cell channel)
(let ((old (cell-ref cell)))
(cell-set! cell
(cons (cons channel (car old))
(cdr old)))))))
(set-interrupt-handler! (enum interrupt i/o-write-completion)
(make-i/o-completion-handler
(make-i/o-completion-handler
(lambda (cell channel)
(let ((old (cell-ref cell)))
(cell-set! cell
(cons (car old)
(cons channel (cdr old))))))))))
; The warning message is printed using DEBUG-MESSAGE because to try to make
; sure it appears in spite of whatever problem's the I/O system is having.
;
; Called with interrupts disabled.
(define (i/o-completion-handler channel status enabled-interrupts)
(let ((queue (fetch-channel-wait-queue! channel)))
(if queue
(begin
(decrement-channel-wait-count!)
(make-ready (maybe-dequeue-thread! queue) status))
(debug-message "Warning: dropping ignored channel i/o result {Channel "
(channel-os-index channel)
" "
(channel-id channel)
"}"))))
(define (make-i/o-completion-handler update-ready-cell)
;; Called with interrupts disabled.
(lambda (channel status enabled-interrupts)
(call-with-values
(lambda () (fetch-channel-wait-cell! channel))
(lambda (thread-cell maybe-ready-cell)
(cond
((and thread-cell (cell-ref thread-cell))
=> (lambda (thread)
(decrement-channel-wait-count!)
(make-ready thread status)))
(else
(debug-message "Warning: dropping ignored channel i/o result {Channel "
(channel-os-index channel)
" "
(channel-id channel)
"}")))
(cond
((and maybe-ready-cell
(cell-ref maybe-ready-cell))
(update-ready-cell maybe-ready-cell channel)))))))
; Exported procedure
@ -44,46 +65,114 @@
; terminated.
(define (wait-for-channel channel)
(let ((queue (fetch-channel-wait-queue! channel)))
(if queue
(begin
(add-channel-wait-queue! channel queue)
(warn "channel has two pending operations" channel)
(terminate-current-thread))
(let ((queue (make-queue)))
(increment-channel-wait-count!)
(enqueue-thread! queue (current-thread))
(add-channel-wait-queue! channel queue)
(dynamic-wind nothing
block
(lambda ()
(disable-interrupts!)
(let ((new-queue (fetch-channel-wait-queue! channel)))
(cond ((eq? queue new-queue)
(channel-abort channel)
(wait-for-channel channel))
(new-queue
(add-channel-wait-queue! channel new-queue)))
(enable-interrupts!))))))))
(call-with-values
(lambda () (fetch-channel-wait-cell! channel))
(lambda (thread-cell maybe-ready-cell)
(if (and thread-cell (cell-ref thread-cell))
(begin
(add-channel-wait-cell! channel thread-cell #f)
(warn "channel has two pending operations" channel)
(terminate-current-thread))
(let ((cell (make-cell (current-thread))))
(increment-channel-wait-count!)
(set-thread-cell! (current-thread) cell)
(add-channel-wait-cell! channel cell #f)
(dynamic-wind nothing
block
(lambda ()
(disable-interrupts!)
(if (cell-ref cell)
;; we're being terminated
(begin
(fetch-channel-wait-cell! channel)
(channel-abort channel)
(wait-for-channel channel)))
(enable-interrupts!))))))))
(define (nothing) #f)
(define (channel-check-waiter channel)
(if (channel-has-waiter? channel)
(begin
(warn "channel has two pending operations" channel)
(terminate-current-thread))))
(define (wait-for-channels read-channels write-channels timeout)
;; check if we're borked from the outset
(for-each channel-has-waiter? read-channels)
(for-each channel-has-waiter? write-channels)
(let ((thread-cell (make-cell (current-thread)))
(ready-channels-cell (make-cell (cons '() '())))
(ready-read-channels #f)
(ready-write-channels #f))
(if (or (not timeout)
(register-dozer thread-cell timeout))
(begin
;; register us with every channel we're waiting for
(set-thread-cell! (current-thread) thread-cell)
(let ((signup (lambda (channel)
(add-channel-wait-cell! channel
thread-cell ready-channels-cell)
(increment-channel-wait-count!))))
(for-each signup read-channels)
(for-each signup write-channels))
;; block
(dynamic-wind
nothing
(lambda ()
(block)
(disable-interrupts!)
(let ((pair (cell-ref ready-channels-cell)))
(set! ready-read-channels (car pair))
(set! ready-write-channels (cdr pair)))
(cell-set! ready-channels-cell #f)
(enable-interrupts!)
(values ready-read-channels ready-write-channels))
;; clean up
(lambda ()
(let ((aborting? (and (cell-ref thread-cell) #t)))
(disable-interrupts!)
;; this ain't so great ...
(let ((make-cleanup
(lambda (ready-channels)
(lambda (channel)
(if (memq channel ready-channels)
(begin
(fetch-channel-wait-cell! channel)
(if (not aborting?)
(decrement-channel-wait-count!)
(begin
(channel-abort channel)
(wait-for-channel channel)))))))))
(for-each (make-cleanup ready-read-channels) read-channels)
(for-each (make-cleanup ready-write-channels) write-channels))
(enable-interrupts!)))))
;; the timeout was zero or less
(enable-interrupts!))))
; Abort any pending operation on by OWNER on CHANNEL.
; Called with interrupts disabled.
(define (steal-channel! channel owner)
(let ((queue (fetch-channel-wait-queue! channel)))
(if queue
(let ((thread (maybe-dequeue-thread! queue)))
(cond ((eq? thread owner)
(decrement-channel-wait-count!)
(channel-abort channel))
(else
(warn "channel in use by other than port owner"
channel thread owner)
(enqueue-thread! queue thread)
#f)))
#f)))
(call-with-values
(lambda () (fetch-channel-wait-cell! channel))
(lambda (thread-cell maybe-ready-cell)
(cond
((cell-ref thread-cell)
=> (lambda (thread)
(cond ((eq? thread owner)
(clear-thread-cell! thread)
(decrement-channel-wait-count!)
(channel-abort channel))
(else
(warn "channel in use by other than port owner"
channel thread owner)
#f))))
(else #f)))))
; Have CHANNEL-READ and CHANNEL-WRITE wait if a pending-channel-i/o
; exception occurs.
@ -110,7 +199,7 @@
; Two session slots
; - the number of threads waiting for I/O completion events
; - an alist mapping channels to queues for waiting threads
; - an alist mapping channels to cells for waiting threads
(define channel-wait-count-slot (make-session-data-slot! 0))
@ -123,40 +212,47 @@
(define (decrement-channel-wait-count!)
(session-data-set! channel-wait-count-slot (- (channel-wait-count) 1)))
(define channel-wait-queues-slot (make-session-data-slot! '()))
(define channel-wait-cells-slot (make-session-data-slot! '()))
; Adding a queue and channel - the caller has already determined there is no
; existing queue for this channel.
; Adding a cell and channel - the caller has already determined there is no
; existing cell for this channel.
(define (add-channel-wait-queue! channel queue)
(session-data-set! channel-wait-queues-slot
(cons (cons channel queue)
(session-data-ref channel-wait-queues-slot))))
(define (add-channel-wait-cell! channel cell maybe-ready-channels-cell)
(session-data-set! channel-wait-cells-slot
(cons (cons channel (cons cell maybe-ready-channels-cell))
(session-data-ref channel-wait-cells-slot))))
; This is just deleting from an a-list.
(define (fetch-channel-wait-queue! channel)
(let* ((queues (session-data-ref channel-wait-queues-slot))
(queue (cond ((null? queues)
#f)
((eq? channel (caar queues))
(session-data-set! channel-wait-queues-slot
(cdr queues))
(cdar queues))
(else
(let loop ((queues (cdr queues)) (prev queues))
(cond ((null? queues)
#f)
((eq? channel (caar queues))
(set-cdr! prev (cdr queues))
(cdar queues))
(else
(loop (cdr queues) queues))))))))
(if (or (not queue)
(thread-queue-empty? queue))
#f
queue)))
(define (fetch-channel-wait-cell! channel)
(let* ((cells (session-data-ref channel-wait-cells-slot))
(cell+ready-channels-cell
(cond ((null? cells)
#f)
((eq? channel (caar cells))
(session-data-set! channel-wait-cells-slot
(cdr cells))
(cdar cells))
(else
(let loop ((cells (cdr cells)) (prev cells))
(cond ((null? cells)
#f)
((eq? channel (caar cells))
(set-cdr! prev (cdr cells))
(cdar cells))
(else
(loop (cdr cells) cells))))))))
(cond
(cell+ready-channels-cell
=> (lambda (pair)
(let ((thread-cell (car pair))
(ready-cell (cdr pair)))
(values thread-cell ready-cell))))
(else
(values #f #f)))))
(define (channel-has-waiter? channel)
(and (assq channel
(session-data-ref channel-wait-cells-slot))
#t))

View File

@ -10,14 +10,26 @@
(let ((cell (make-cell (current-thread))))
(disable-interrupts!)
(set-thread-cell! (current-thread) cell)
(set! *dozers*
(insert (cons (+ (real-time) n)
cell)
*dozers*
(lambda (frob1 frob2)
(< (car frob1) (car frob2)))))
(insert-dozer! cell n)
(block))))))
(define (register-dozer cell user-n)
(let ((n (coerce-to-nonnegative-integer user-n)))
(cond ((not n)
(call-error "wrong type argument" sleep user-n))
((< 0 n)
(insert-dozer! cell n)
#t)
(else #f))))
(define (insert-dozer! cell n)
(set! *dozers*
(insert (cons (+ (real-time) n)
cell)
*dozers*
(lambda (frob1 frob2)
(< (car frob1) (car frob2))))))
(define (coerce-to-nonnegative-integer n)
(if (real? n)
(let* ((n (round n))

View File

@ -183,6 +183,7 @@
(close-channel 1)
(channel-maybe-read 5)
(channel-maybe-write 4)
(add-pending-channel 2)
(channel-ready? 1)
(channel-abort 1) ; stop channel operation
(open-channels-list) ; return a list of the open channels
@ -240,7 +241,8 @@
(alarm ; order matters - higher priority first
keyboard
post-gc ; handler is passed a list of finalizers
i/o-completion ; handler is passed channel and status
i/o-read-completion ; handler is passed channel and status
i/o-write-completion ; handler is passed channel and status
os-signal
))

View File

@ -405,6 +405,7 @@
channel-read-block
channel-write-block
channel-abort
add-pending-channel
))
(define external-call-interface

View File

@ -45,7 +45,7 @@
; For alarm interrupts the interrupted template is passed to the handler
; for use by code profilers.
; For gc interrupts we push the list of things to be finalized.
; For i/o-completion we push the channel and its status.
; For i/o-{read,write}-completion we push the channel and its status.
(define (push-interrupt-args pending-interrupt)
(cond ((eq? pending-interrupt (enum interrupt alarm))
@ -58,10 +58,11 @@
(set! *finalize-these* null)
(push (enter-fixnum *enabled-interrupts*))
2)
((eq? pending-interrupt (enum interrupt i/o-completion))
((or (eq? pending-interrupt (enum interrupt i/o-read-completion))
(eq? pending-interrupt (enum interrupt i/o-write-completion)))
(let ((channel (dequeue-channel!)))
(if (not (channel-queue-empty?))
(note-interrupt! (enum interrupt i/o-completion)))
(note-interrupt! pending-interrupt))
(push channel)
(push (channel-os-status channel))
(push (enter-fixnum *enabled-interrupts*))
@ -232,9 +233,12 @@
(interrupt-bit (enum interrupt alarm)))
((eq? event (enum events keyboard-interrupt-event))
(interrupt-bit (enum interrupt keyboard)))
((eq? event (enum events io-completion-event))
((eq? event (enum events io-read-completion-event))
(enqueue-channel! channel status)
(interrupt-bit (enum interrupt i/o-completion)))
(interrupt-bit (enum interrupt i/o-read-completion)))
((eq? event (enum events io-write-completion-event))
(enqueue-channel! channel status)
(interrupt-bit (enum interrupt i/o-write-completion)))
((eq? event (enum events os-signal-event))
(interrupt-bit (enum interrupt os-signal)))
((eq? event (enum events no-event))

View File

@ -190,6 +190,11 @@
(lambda (buffer start count channel key)
(do-it buffer start count #f channel key))))
(define-primitive add-pending-channel (channel-> boolean->)
(lambda (channel input?)
(add-pending-channel (extract-channel channel) input?))
return-boolean)
(define-primitive channel-abort (channel->)
(lambda (channel)
(goto return (vm-channel-abort channel))))

View File

@ -64,7 +64,7 @@
; PENDING? - true if the operation cannot complete immediately
; STATUS - from an enumeration defined as part of Pre-Scheme
;
; Pending i/o operations produce i/o-completion events when they're done.
; Pending i/o operations produce i/o-{read,write}-completion events when they're done.
(define channel-read-block
(external "ps_read_fd"
@ -77,6 +77,11 @@
(define channel-abort
(external "ps_abort_fd_op" (=> (integer) integer)))
; Checking a channel for data
(define add-pending-channel
(external "ps_add_pending_fd" (=> (integer boolean) boolean)))
;----------------------------------------------------------------
; Asynchronous external events
@ -84,7 +89,8 @@
(define-external-enumeration events
(keyboard-interrupt-event ; user interrupt
io-completion-event ; a pending i/o operation completed
io-read-completion-event ; a pending read operation completed
io-write-completion-event ; a pending write operation completed
alarm-event ; scheduled interrupt
os-signal-event ; some OS signal of no interest to the VM occured
error-event ; OS error occurred
@ -101,9 +107,10 @@
(define pending-event?
(external "pending_eventp" (=> () boolean)))
; Returns the next event. The second return value is the FD for i/o-completion
; events and the third is the status for i/o-completion and error events.
; (currently this is always zero for i/o-completions).
; Returns the next event. The second return value is the FD for
; i/o-{read,write}-completion events and the third is the status for
; i/o-{read,write}-completion and error events. (currently this is
; always zero for i/o-{read,write}-completions).
(define get-next-event
(external "s48_get_next_event" (=> () integer integer integer)))

View File

@ -130,7 +130,8 @@
(define-enumeration events
(keyboard-interrupt-event
io-completion-event
io-read-completion-event
io-write-completion-event
alarm-event
os-signal-event
error-event

View File

@ -210,7 +210,7 @@
res))))
;----------------------------------------------------------------
; Handling i/o-completion interrupts
; Handling i/o-{read,write}-completion interrupts
; Currently, because the GC may move buffers, strings, etc. around, the OS
; must buffer the data while waiting for i/o to complete.
;

View File

@ -730,4 +730,179 @@
;;; replace rts/channel-port.scm end
;;; select
;;; -----
(define (port/fdes->port port/fd)
(if (port? port/fd)
port/fd
(fdes->inport port/fd))) ; ####
(define (port/fdes-ready? port/fd)
(let ((port (port/fdes->port port/fd)))
((port-handler-ready? (port-handler port)) port)))
(define (any-ready port/fds)
(let loop ((port/fds port/fds))
(if (null? port/fds)
'()
(let ((port/fd (car port/fds)))
(if (port/fdes-ready? port/fd)
;; one is ready, get them all
(let loop ((rest (cdr port/fds))
(ready (list port/fd)))
(cond
((null? rest) (reverse ready))
((port/fdes-ready? (car rest))
(loop (cdr rest) (cons (car rest) ready)))
(else
(loop (cdr rest) ready))))
(loop (cdr port/fds)))))))
(define (port/fdes-check-unlocked port/fd)
(if (port-locked? (port/fdes->port port/fd))
(begin
((structure-ref interrupts enable-interrupts!))
(error "SELECT on port with pending operation"
port/fd))))
(define (port/fdes->channel port/fd)
(fdport-data:channel
(fdport-data
(port/fdes->port port/fd))))
;; this is way too epic and probably should just be split up once the
;; dust has settled ---Mike
(define (make-select !?)
(lambda (read-vec write-vec exception-vec . maybe-timeout)
(let ((read-list (vector->list read-vec))
(write-list (vector->list write-vec)))
((structure-ref interrupts disable-interrupts!))
(for-each port/fdes-check-unlocked read-list)
(for-each port/fdes-check-unlocked write-list)
(let ((any-read (any-ready read-list))
(any-write (any-ready write-list)))
(if (or (pair? any-read) (pair? any-write))
(begin
((structure-ref interrupts enable-interrupts!))
(if !? ; we're SELECT!
(let ((n-read-ready
(let ((length (vector-length read-vec)))
(let loop ((i 0) (n 0))
(cond
((= i length) n)
((memq (vector-ref read-vec i) any-read)
(loop (+ 1 i) (+ 1 n)))
(else
(vector-set! read-vec i #f)
(loop (+ 1 i) n))))))
(n-write-ready
(let ((length (vector-length write-vec)))
(let loop ((i 0) (n 0))
(cond
((= i length) n)
((memq (vector-ref write-vec i) any-write)
(loop (+ 1 i) (+ 1 n)))
(else
(vector-set! write-vec i #f)
(loop (+ 1 i) n)))))))
;; zero out EXCEPTION-VEC
(let ((length (vector-length exception-vec)))
(let loop ((i 0))
(if (< i length)
(begin
(vector-set! exception-vec i #f)
(loop (+ 1 i))))))
(values n-read-ready n-write-ready 0))
;; we're vanilla SELECT
(values (list->vector any-read)
(list->vector any-write)
(make-vector 0))))
;; we need to block
(let ((read-channels (map port/fdes->channel read-list))
(write-channels (map port/fdes->channel write-list)))
(for-each (lambda (channel)
(add-pending-channel channel #t))
read-channels)
(for-each (lambda (channel)
(add-pending-channel channel #f))
write-channels)
(call-with-values
(lambda ()
(apply wait-for-channels read-channels write-channels maybe-timeout))
;; re-enables interrupts
(lambda (ready-read-channels ready-write-channels)
;; too many free variables ...
(if !? ; we're SELECT!
(let ((n-read-ready
(let loop ((read-channels read-channels)
(n-ready 0)
(index 0))
(if (null? read-channels)
n-ready
(if (memq (car read-channels) ready-read-channels)
(loop (cdr read-channels)
(+ 1 n-ready)
(+ 1 index))
(begin
(vector-set! read-vec index #f)
(loop (cdr read-channels)
n-ready
(+ 1 index)))))))
(n-write-ready
(let loop ((write-channels write-channels)
(n-ready 0)
(index 0))
(if (null? write-channels)
n-ready
(if (memq (car write-channels) ready-write-channels)
(loop (cdr write-channels)
(+ 1 n-ready)
(+ 1 index))
(begin
(vector-set! write-vec index #f)
(loop (cdr write-channels)
n-ready
(+ 1 index))))))))
;; zero out EXCEPTION-VEC
(let ((length (vector-length exception-vec)))
(let loop ((i 0))
(if (< i length)
(begin
(vector-set! exception-vec i #f)
(loop (+ 1 i))))))
(values n-read-ready n-write-ready 0))
;; we're vanilla SELECT
(let ((ready-read-port/fds '())
(ready-write-port/fds '()))
(for-each (lambda (port/fd channel)
(if (memq channel ready-read-channels)
(set! ready-read-port/fds
(cons port/fd ready-read-port/fds))))
read-list read-channels)
(for-each (lambda (port/fd channel)
(if (memq channel ready-write-channels)
(set! ready-write-port/fds
(cons port/fd ready-write-port/fds))))
write-list write-channels)
(values (list->vector (reverse ready-read-port/fds))
(list->vector (reverse ready-write-port/fds))
(make-vector 0))))))))))))
(define select (make-select #f))
(define select! (make-select #t))

View File

@ -222,6 +222,9 @@
read-string!
read-string/partial
read-string!/partial
select select!
(write-string (proc (:string &opt :value :exact-integer :exact-integer) :unspecific))
write-string/partial)))

View File

@ -1,61 +0,0 @@
/* This is an Scheme48/C interface file,
** automatically generated by a hacked version of cig 3.0.
step 4
*/
#include <stdio.h>
#include <stdlib.h> /* For malloc. */
#include "libcig.h"
/* Make sure foreign-function stubs interface to the C funs correctly: */
#include "select1.h"
s48_value df_select_copyback(s48_value g1, s48_value g2, s48_value g3, s48_value g4, s48_value mv_vec)
{
extern s48_value select_copyback(s48_value , s48_value , s48_value , s48_value , int *, int *, int *);
s48_value ret1 = S48_FALSE;
S48_DECLARE_GC_PROTECT(2);
s48_value r1;
int r2 = 0;
int r3 = 0;
int r4 = 0;
S48_GC_PROTECT_2(mv_vec,ret1);
r1 = select_copyback(g1, g2, g3, g4, &r2, &r3, &r4);
ret1 = r1;
S48_VECTOR_SET(mv_vec,0,s48_enter_fixnum(r2));
S48_VECTOR_SET(mv_vec,1,s48_enter_fixnum(r3));
S48_VECTOR_SET(mv_vec,2,s48_enter_fixnum(r4));
S48_GC_UNPROTECT();
return ret1;
}
s48_value df_select_filter(s48_value g1, s48_value g2, s48_value g3, s48_value g4, s48_value mv_vec)
{
extern s48_value select_filter(s48_value , s48_value , s48_value , s48_value , int *, int *, int *);
s48_value ret1 = S48_FALSE;
S48_DECLARE_GC_PROTECT(2);
s48_value r1;
int r2 = 0;
int r3 = 0;
int r4 = 0;
S48_GC_PROTECT_2(mv_vec,ret1);
r1 = select_filter(g1, g2, g3, g4, &r2, &r3, &r4);
ret1 = r1;
S48_VECTOR_SET(mv_vec,0,s48_enter_fixnum(r2));
S48_VECTOR_SET(mv_vec,1,s48_enter_fixnum(r3));
S48_VECTOR_SET(mv_vec,2,s48_enter_fixnum(r4));
S48_GC_UNPROTECT();
return ret1;
}
void s48_init_select(void)
{
S48_EXPORT_FUNCTION(df_select_copyback);
S48_EXPORT_FUNCTION(df_select_filter);
}

View File

@ -1,193 +0,0 @@
;;; select(2) syscall for scsh. -*- Scheme -*-
;;; Copyright (c) 1995 by Olin Shivers.
(foreign-init-name "select")
(foreign-source
"/* Make sure foreign-function stubs interface to the C funs correctly: */"
"#include \"select1.h\""
"" "")
;;; TIMEOUT is 0 for immediate, >0 for timeout, #f for infinite;
;;; default is #f.
;;; The sets are vectors of file descriptors & fd ports.
;;; You get three new vectors back.
; The following routines copy ports to fd's, and copy fd's back to fd's and
; ports, so that select can take numbers and ports, simultaneously.
; This is a C procedure in scheme. So sue me. At least it's tail-recursive
(define (fd-filter filter-me)
(let* ((len (vector-length filter-me))
(vector-to-return (make-vector len)))
(let loop ((count (- len 1)))
(if (>= count 0)
(let ((ref (vector-ref filter-me count)))
(if (integer? ref)
(vector-set! vector-to-return count ref)
(vector-set! vector-to-return count (port->fdes ref)))
(loop (- count 1)))))
vector-to-return))
; ! means side-effect, the next one is more functional.
(define (fd-copyback! orig form)
(let loop ((count (- (vector-length orig) 1)))
(if (>= count 0)
(begin
(if (not (vector-ref form count))
(vector-set! orig count #f)
(vector-set! form count (vector-ref orig count)))
(loop (- count 1)))))
orig)
(define (fd-copyback orig form)
(let* ((len (vector-length orig))
(vector-to-return (make-vector len #f)))
(let loop ((count (- len 1)))
(if (>= count 0)
(begin
(if (vector-ref form count)
(vector-set! vector-to-return count (vector-ref orig count)))
(loop (- count 1)))))
vector-to-return))
(define (select read-vec write-vec exception-vec . maybe-timeout)
(let ((rv (copy-vector read-vec))
(wv (copy-vector write-vec))
(ev (copy-vector exception-vec)))
(receive (nr nw ne) (apply select! rv wv ev maybe-timeout)
(values (vector-take-form read-vec rv nr)
(vector-take-form write-vec wv nw)
(vector-take-form exception-vec ev ne)))))
(define (select!/copyback read-vec write-vec exception-vec . maybe-timeout)
(receive (errno nr nw ne)
(apply select!/copyback/errno read-vec write-vec exception-vec
maybe-timeout)
(if errno
(apply errno-error errno select!/copyback
read-vec write-vec exception-vec maybe-timeout)
(values nr nw ne))))
(define (select!/copyback/errno read-vec write-vec
exception-vec . maybe-timeout)
(let ((timeout (and (pair? maybe-timeout)
(if (pair? (cdr maybe-timeout))
(apply error "Too many arguments"
select!/copyback/errno
read-vec write-vec exception-vec
maybe-timeout)
(real->exact-integer (check-arg real?
(car maybe-timeout)
select!/copyback/errno)))))
(vec-ok? (lambda (v)
(vector-every? (lambda (elt)
(or (and (integer? elt) (>= elt 0))
(fdport? elt)))
v))))
;; Type-check input vectors.
(check-arg vec-ok? read-vec select!/copyback/errno)
(check-arg vec-ok? write-vec select!/copyback/errno)
(check-arg vec-ok? exception-vec select!/copyback/errno)
(check-arg (lambda (x) (or (not x) (integer? x))) timeout
select!/copyback/errno)
(let ((prop-read-vec (fd-filter read-vec))
(prop-write-vec (fd-filter write-vec))
(prop-exception-vec (fd-filter exception-vec)))
(let lp ()
(receive (errno nr nw ne)
(%select/copyback/errno prop-read-vec prop-write-vec prop-exception-vec timeout)
(if (and errno (= errno errno/intr)) ; Retry on interrupts.
(lp)
(values errno
(fd-copyback read-vec nr)
(fd-copyback write-vec nw)
(fd-copyback exception-vec ne))))))))
(define-foreign %select/copyback/errno
(select_copyback (vector-desc rvec)
(vector-desc wvec)
(vector-desc evec)
(desc nsecs)) ; Integer or #f for infinity.
desc ; errno or #f
fixnum ; nread - number of hits in RVEC
fixnum ; nwrite - number of hits in WVEC
fixnum) ; nexcept - number of hits in EVEC
(define (vector-take-form vec form nelts)
(let ((short (make-vector nelts)))
(do ((i (- (vector-length vec) 1) (- i 1)))
((< i 0))
(if (vector-ref form i)
(begin
(set! nelts (- nelts 1))
(vector-set! short nelts (vector-ref vec i)))))
short))
;;; SELECT!
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;;; The side-effecting variant. To be documented.
(define (select! read-vec write-vec exception-vec . maybe-timeout)
(receive (errno nr nw ne)
(apply select!/errno read-vec write-vec exception-vec maybe-timeout)
(if errno
(apply errno-error errno select! read-vec write-vec exception-vec
maybe-timeout)
(values nr nw ne))))
(define (select!/errno read-vec write-vec exception-vec . maybe-timeout)
(let ((timeout (and (pair? maybe-timeout)
(if (pair? (cdr maybe-timeout))
(apply error "Too many arguments"
select!/copyback/errno
read-vec write-vec exception-vec
maybe-timeout)
(real->exact-integer (check-arg real?
(car maybe-timeout)
select!/copyback/errno)))))
(vec-ok? (lambda (v)
(vector-every? (lambda (elt)
(or (and (integer? elt) (>= elt 0))
(not elt)
(fdport? elt)))
v))))
;; Type-check input vectors.
(check-arg vec-ok? read-vec select!/errno)
(check-arg vec-ok? write-vec select!/errno)
(check-arg vec-ok? exception-vec select!/errno)
(check-arg (lambda (x) (or (not x) (integer? x))) timeout select!/errno)
(let ((prop-read-vec (fd-filter read-vec))
(prop-write-vec (fd-filter write-vec))
(prop-exception-vec (fd-filter exception-vec)))
(let lp ()
(receive (errno nr nw ne)
(%select!/errno prop-read-vec prop-write-vec prop-exception-vec timeout)
(if (and errno (= errno errno/intr)) ; Retry on interrupts.
(lp)
(begin
(fd-copyback! read-vec prop-read-vec)
(fd-copyback! write-vec prop-write-vec)
(fd-copyback! exception-vec prop-exception-vec)
(values errno nr nw ne))))))))
(define-foreign %select!/errno
(select_filter (vector-desc rvec)
(vector-desc wvec)
(vector-desc evec)
(desc nsecs)) ; Integer or #f for infinity.
desc ; errno or #f
fixnum ; nread - number of hits in RVEC
fixnum ; nwrite - number of hits in WVEC
fixnum) ; nexcept - number of hits in EVEC

View File

@ -1,223 +0,0 @@
/* C support for scsh select call.
** Copyright (c) 1995 by Olin Shivers.
*/
#include "sysdep.h"
#include <sys/types.h>
#if defined(HAVE_SYS_SELECT_H)
# include <sys/select.h>
#endif
#include <sys/time.h>
#include <errno.h>
#include <stdio.h>
#include "cstuff.h"
#include "fdports.h" /* Accessors for Scheme I/O port internals. */
/* Make sure our exports match up w/the implementation: */
#include "select1.h"
/* the traditional sleazy max non-function. */
#define max(a,b) (((a) > (b)) ? (a) : (b))
extern int errno;
static void or2_fdset(fd_set *x, fd_set *y, int max_elt);
static int copyback_fdvec(s48_value portvec, fd_set *fdset);
/* RVEC, WVEC, and EVEC are Scheme vectors of integer file descriptors,
** I/O ports, and #f's. NSECS is an integer timeout value, or #f for
** infinite wait. Do the select() call, returning result fd_sets in the
** passed pointers. Return 0 for OK, otherwise error is in errno.
*/
int do_select(s48_value rvec, s48_value wvec,
s48_value evec, s48_value nsecs,
fd_set *rset_ans, fd_set *wset_ans, fd_set *eset_ans)
{
struct timeval timeout, *tptr;
fd_set rset_bufrdy, wset_bufrdy, eset_bufrdy; /* Buffered port hits. */
int rbuf_rdy=0, wbuf_rdy=0, bufrdy; /* Set if we find buffered I/O hits. */
int max_fd = -1; /* Max fdes in the sets. */
int nelts, i;
int nfound;
FD_ZERO(rset_ans); FD_ZERO(wset_ans); FD_ZERO(eset_ans);
FD_ZERO(&rset_bufrdy); FD_ZERO(&wset_bufrdy); FD_ZERO(&eset_bufrdy);
/* Scan the readvec elts. */
nelts = S48_VECTOR_LENGTH(rvec);
for(i=nelts; --i >= 0; ) {
s48_value elt = S48_VECTOR_REF(rvec,i);
int fd;
fd = s48_extract_fixnum(elt);
FD_SET(fd, rset_ans);
max_fd = max(max_fd, fd);
}
/* Scan the writevec elts. */
nelts = S48_VECTOR_LENGTH(wvec);
for(i=nelts; --i >= 0; ) {
s48_value elt = S48_VECTOR_REF(wvec,i);
int fd;
fd = s48_extract_fixnum(elt);
FD_SET(fd, wset_ans);
max_fd = max(max_fd, fd);
}
/* Scan the exception-vec elts. */
nelts = S48_VECTOR_LENGTH(evec);
for(i=nelts; --i >= 0; ) {
s48_value elt = S48_VECTOR_REF(evec,i);
int fd;
fd = s48_extract_fixnum(elt);
FD_SET(fd, eset_ans);
max_fd = max(max_fd, fd);
}
bufrdy = rbuf_rdy || wbuf_rdy;
if( bufrdy ) { /* Already have some hits on buffered ports, */
timeout.tv_sec = 0; /* so we only poll the others. */
timeout.tv_usec = 0;
tptr = &timeout;
}
else if ( S48_FIXNUM_P(nsecs) ) {
timeout.tv_sec = s48_extract_fixnum(nsecs); /* Wait n seconds. */
timeout.tv_usec = 0;
tptr = &timeout;
}
else tptr = NULL; /* #f => Infinite wait. */
/* select1() is defined in sysdep.h -- bogus compatibility macro. */
nfound = select(max_fd+1, rset_ans, wset_ans, eset_ans, tptr); /* Do it.*/
/* EINTR is not an error return if we have hits on buffered ports
** to report.
*/
if( nfound < 0 )
if ( errno != EINTR || !bufrdy ) return -1;
else { /* EINTR, but we have hits on buffered ports to report. */
FD_ZERO(rset_ans); /* This should never happen -- */
FD_ZERO(wset_ans); /* EINTR on a zero-sec select() */
FD_ZERO(eset_ans); /* -- but I'm paranoid. */
}
/* OR together the buffered-io ready sets and the fd ready sets. */
if( rbuf_rdy ) or2_fdset(rset_ans, &rset_bufrdy, max_fd);
if( wbuf_rdy ) or2_fdset(wset_ans, &wset_bufrdy, max_fd);
return 0;
}
/* x = x or y */
static void or2_fdset(fd_set *x, fd_set *y, int max_elt)
{
int i;
for(i=max_elt+1; --i >= 0;)
if( FD_ISSET(i,y) ) FD_SET(i,x);
}
/* PORTVEC is a vector of integer file descriptors and Scheme ports.
** Scan over the vector, and copy any elt whose file descriptor is in FDSET
** to the front of the vector. Return the number of elts thus copied.
*/
static int copyback_fdvec(s48_value portvec, fd_set *fdset)
{
int vlen = S48_VECTOR_LENGTH(portvec);
int i, j=0;
for( i = -1; ++i < vlen; ) {
s48_value elt = S48_VECTOR_REF(portvec, i);
int fd = s48_extract_fixnum((S48_FIXNUM_P(elt)) ? elt : (1 / 0));
/* JMG *PortFd(elt));*/
if( FD_ISSET(fd,fdset) ) {
FD_CLR(fd,fdset); /* In case luser put elt in multiple times. */
S48_VECTOR_SET(portvec, j, elt);
j++;
}
}
return j;
}
/* Overwrite every inactive element in the vector with #f;
** Return count of active elements.
*/
static int clobber_inactives(s48_value portvec, fd_set *fdset)
{
int count = 0;
int i = S48_VECTOR_LENGTH(portvec);
while( --i >= 0 ) {
s48_value elt = S48_VECTOR_REF(portvec, i);
if( elt != S48_FALSE ) {
int fd = s48_extract_fixnum((S48_FIXNUM_P(elt)) ? elt : (1/0)); /* JMG *PortFd(elt));*/
if( FD_ISSET(fd,fdset) ) {
FD_CLR(fd,fdset); /* In case luser put elt in multiple times. */
++count;
}
else S48_VECTOR_SET(portvec, i, S48_FALSE); /* Clobber. */
}
}
return count;
}
/* These two functions are the entry points to this file.
*********************************************************
*/
/* Copy active elts back to the front of their vector;
** Return error indicator & number of hits for each vector.
*/
s48_value select_copyback(s48_value rvec, s48_value wvec,
s48_value evec, s48_value nsecs,
int *r_numrdy, int *w_numrdy, int *e_numrdy)
{
fd_set rset, wset, eset;
if( do_select(rvec, wvec, evec, nsecs, &rset, &wset, &eset) ) {
*r_numrdy = *w_numrdy = *e_numrdy = 0;
return s48_enter_fixnum(errno);
}
*r_numrdy = copyback_fdvec(rvec, &rset);
*w_numrdy = copyback_fdvec(wvec, &wset);
*e_numrdy = copyback_fdvec(evec, &eset);
return S48_FALSE;
}
/* Overwrite non-active elements in the vectors with #f;
** return error indicator & number of hits for each vector.
*/
s48_value select_filter(s48_value rvec, s48_value wvec,
s48_value evec, s48_value nsecs,
int *r_numrdy, int *w_numrdy, int *e_numrdy)
{
fd_set rset, wset, eset;
if( do_select(rvec, wvec, evec, nsecs, &rset, &wset, &eset) ) {
*r_numrdy = *w_numrdy = *e_numrdy = 0;
return s48_enter_fixnum(errno);
}
*r_numrdy = clobber_inactives(rvec, &rset);
*w_numrdy = clobber_inactives(wvec, &wset);
*e_numrdy = clobber_inactives(evec, &eset);
return S48_FALSE;
}

View File

@ -1,9 +0,0 @@
/* Exports from select1.c. */
s48_value select_copyback(s48_value rvec, s48_value wvec,
s48_value evec, s48_value nsecs,
int *r_numrdy, int *w_numrdy, int *e_numrdy);
s48_value select_filter(s48_value rvec, s48_value wvec,
s48_value evec, s48_value nsecs,
int *r_numrdy, int *w_numrdy, int *e_numrdy);