From 9aaf306f162bb8e2222b2c597af66d8dff41e2aa Mon Sep 17 00:00:00 2001 From: Abdulaziz Ghuloum Date: Sun, 23 Mar 2008 02:14:00 -0400 Subject: [PATCH] Nonblocking sockets no longer raise a continuable exception when a read or write operation would block. Instead, they are schedules with an event handler that `selects' on the pending file descriptors and dispatches the appropriate callback. --- lab/tcp-connect-nonblocking-example.ss | 23 ++--- scheme/ikarus.io.ss | 113 +++++++++++++++++++++++-- scheme/last-revision | 2 +- src/ikarus-io.c | 12 +++ 4 files changed, 127 insertions(+), 23 deletions(-) diff --git a/lab/tcp-connect-nonblocking-example.ss b/lab/tcp-connect-nonblocking-example.ss index 5f5407b..3a2d47f 100755 --- a/lab/tcp-connect-nonblocking-example.ss +++ b/lab/tcp-connect-nonblocking-example.ss @@ -12,22 +12,15 @@ ;;; it succeeds. Pretty lame at this point, but it works. (define (http-cat host) - (with-exception-handler - (lambda (c) - ;;; just return and let it retry until it succeeds - (print-condition c) - (unless (i/o-would-block-condition? c) - (raise c))) - (lambda () - (let-values ([(op ip) (tcp-connect-nonblocking host "http")]) - (let ([op (transcoded-port op (native-transcoder))] - [ip (transcoded-port ip (native-transcoder))]) - (display "GET /\n" op) - (display (get-string-all ip)) - (close-input-port ip) - (close-output-port op)))))) + (let-values ([(op ip) (tcp-connect-nonblocking host "http")]) + (let ([op (transcoded-port op (native-transcoder))] + [ip (transcoded-port ip (native-transcoder))]) + (display "GET /\n" op) + (display (get-string-all ip)) + (newline) + (close-input-port ip) + (close-output-port op)))) (http-cat "www.google.com") -(newline) ;(http-cat "127.0.0.1") diff --git a/scheme/ikarus.io.ss b/scheme/ikarus.io.ss index aff458c..576e505 100644 --- a/scheme/ikarus.io.ss +++ b/scheme/ikarus.io.ss @@ -1230,8 +1230,12 @@ (cond [(fx>= bytes 0) bytes] [(fx= bytes EAGAIN-error-code) - (raise-continuable - (make-i/o-would-block-condition port)) + ;(raise-continuable + ; (make-i/o-would-block-condition port)) + (call/cc + (lambda (k) + (add-io-event fd k 'r) + (process-events))) (refill bv idx cnt)] [else (io-error 'read id bytes)])))]) refill) @@ -1265,8 +1269,12 @@ (cond [(fx>= bytes 0) bytes] [(fx= bytes EAGAIN-error-code) - (raise-continuable - (make-i/o-would-block-condition port)) + ;(raise-continuable + ; (make-i/o-would-block-condition port)) + (call/cc + (lambda (k) + (add-io-event fd k 'w) + (process-events))) (refill bv idx cnt)] [else (io-error 'write id bytes)])))]) refill) @@ -2054,6 +2062,11 @@ cmd input-file-buffer-size #f #t 'process))))) + (define (set-fd-nonblocking fd who id) + (let ([rv (foreign-call "ikrt_make_fd_nonblocking" fd)]) + (unless (eq? rv 0) + (io-error who id fd)))) + (define (socket->ports socket who id block?) (if (< socket 0) (io-error who id socket) @@ -2064,9 +2077,7 @@ ((file-close-proc id socket)) (set! closed-once? #t))))]) (unless block? - (let ([rv (foreign-call "ikrt_make_fd_nonblocking" socket)]) - (unless (eq? rv 0) - (io-error who id socket)))) + (set-fd-nonblocking socket who id)) (values (fh->output-port socket id output-file-buffer-size #f close who) @@ -2091,5 +2102,93 @@ (define-connector tcp-connect-nonblocking "ikrt_tcp_connect" #f) (define-connector udp-connect-nonblocking "ikrt_udp_connect" #f) + (module (add-io-event process-events) + (define-struct t (fd proc type)) + ;;; callbacks + (define pending '()) + (define out-queue '()) + (define in-queue '()) + + (define (process-events) + (if (null? out-queue) + (if (null? in-queue) + (if (null? pending) + (error 'process-events "no more events") + (begin + (do-select) + (process-events))) + (begin + (set! out-queue (reverse in-queue)) + (set! in-queue '()) + (process-events))) + (let ([proc (car out-queue)]) + (set! out-queue (cdr out-queue)) + (proc) + (process-events)))) + + (define (add-io-event fd proc event-type) + (set! pending + (cons (make-t fd proc event-type) pending))) + + (define (get-max-fd) + (assert (pair? pending)) + (let f ([m (t-fd (car pending))] + [ls (cdr pending)]) + (cond + [(null? ls) m] + [else (f (max m (t-fd (car ls))) (cdr ls))]))) + + (define (do-select) + (let ([n (add1 (get-max-fd))]) + (let ([rbv (make-bytevector n 0)] + [wbv (make-bytevector n 0)] + [xbv (make-bytevector n 0)]) + ;;; add all fds to their bytevectors depending on type + (for-each + (lambda (t) + (let ([fd (t-fd t)]) + (let ([i (div fd 8)] [j (mod fd 8)]) + (let ([bv (case (t-type t) + [(r) rbv] + [(w) wbv] + [(x) xbv] + [else + (error 'do-select "invalid type" t)])]) + (bytevector-u8-set! bv i + (fxlogor (fxsll 1 j) + (bytevector-u8-ref bv i))))))) + pending) + ;;; do select + (let ([rv (foreign-call "ikrt_select" n rbv wbv xbv)]) + (when (< rv 0) + (die 'select "error selecting from fds"))) + ;;; go through fds again and see if they're selected + (for-each + (lambda (t) + (let ([fd (t-fd t)]) + (let ([i (div fd 8)] [j (mod fd 8)]) + (let ([bv (case (t-type t) + [(r) rbv] + [(w) wbv] + [(x) xbv] + [else + (error 'do-select "invalid type" t)])]) + (cond + [(fxzero? + (fxlogand (fxsll 1 j) + (bytevector-u8-ref bv i))) + ;;; not selected + (set! pending (cons t pending))] + [else + ;;; ready + (set! in-queue (cons (t-proc t) in-queue))]))))) + (let ([ls pending]) + (set! pending '()) + ls))))) + ) + + + + (set-fd-nonblocking 0 'init '*stdin*) ) diff --git a/scheme/last-revision b/scheme/last-revision index aa10053..4e6ee1e 100644 --- a/scheme/last-revision +++ b/scheme/last-revision @@ -1 +1 @@ -1420 +1421 diff --git a/src/ikarus-io.c b/src/ikarus-io.c index 0448d5f..da95832 100644 --- a/src/ikarus-io.c +++ b/src/ikarus-io.c @@ -201,6 +201,18 @@ ikrt_make_fd_nonblocking(ikptr fdptr, ikpcb* pcb){ return 0; } +ikptr +ikrt_select(ikptr fds, ikptr rfds, ikptr wfds, ikptr xfds, ikpcb* pcb){ + int rv = select(unfix(fds), + (fd_set*)(rfds + off_bytevector_data), + (fd_set*)(wfds + off_bytevector_data), + (fd_set*)(xfds + off_bytevector_data), + NULL); + if(rv < 0){ + return ikrt_io_error(); + } + return fix(rv); +} ikptr ikrt_file_ctime(ikptr filename, ikptr res){