Nonblocking sockets no longer raise a continuable exception when a
read or write operation would block. Instead, they are schedules with an event handler that `selects' on the pending file descriptors and dispatches the appropriate callback.
This commit is contained in:
parent
6e599c4c72
commit
9aaf306f16
|
@ -12,22 +12,15 @@
|
||||||
;;; it succeeds. Pretty lame at this point, but it works.
|
;;; it succeeds. Pretty lame at this point, but it works.
|
||||||
|
|
||||||
(define (http-cat host)
|
(define (http-cat host)
|
||||||
(with-exception-handler
|
|
||||||
(lambda (c)
|
|
||||||
;;; just return and let it retry until it succeeds
|
|
||||||
(print-condition c)
|
|
||||||
(unless (i/o-would-block-condition? c)
|
|
||||||
(raise c)))
|
|
||||||
(lambda ()
|
|
||||||
(let-values ([(op ip) (tcp-connect-nonblocking host "http")])
|
(let-values ([(op ip) (tcp-connect-nonblocking host "http")])
|
||||||
(let ([op (transcoded-port op (native-transcoder))]
|
(let ([op (transcoded-port op (native-transcoder))]
|
||||||
[ip (transcoded-port ip (native-transcoder))])
|
[ip (transcoded-port ip (native-transcoder))])
|
||||||
(display "GET /\n" op)
|
(display "GET /\n" op)
|
||||||
(display (get-string-all ip))
|
(display (get-string-all ip))
|
||||||
|
(newline)
|
||||||
(close-input-port ip)
|
(close-input-port ip)
|
||||||
(close-output-port op))))))
|
(close-output-port op))))
|
||||||
|
|
||||||
(http-cat "www.google.com")
|
(http-cat "www.google.com")
|
||||||
(newline)
|
|
||||||
;(http-cat "127.0.0.1")
|
;(http-cat "127.0.0.1")
|
||||||
|
|
||||||
|
|
|
@ -1230,8 +1230,12 @@
|
||||||
(cond
|
(cond
|
||||||
[(fx>= bytes 0) bytes]
|
[(fx>= bytes 0) bytes]
|
||||||
[(fx= bytes EAGAIN-error-code)
|
[(fx= bytes EAGAIN-error-code)
|
||||||
(raise-continuable
|
;(raise-continuable
|
||||||
(make-i/o-would-block-condition port))
|
; (make-i/o-would-block-condition port))
|
||||||
|
(call/cc
|
||||||
|
(lambda (k)
|
||||||
|
(add-io-event fd k 'r)
|
||||||
|
(process-events)))
|
||||||
(refill bv idx cnt)]
|
(refill bv idx cnt)]
|
||||||
[else (io-error 'read id bytes)])))])
|
[else (io-error 'read id bytes)])))])
|
||||||
refill)
|
refill)
|
||||||
|
@ -1265,8 +1269,12 @@
|
||||||
(cond
|
(cond
|
||||||
[(fx>= bytes 0) bytes]
|
[(fx>= bytes 0) bytes]
|
||||||
[(fx= bytes EAGAIN-error-code)
|
[(fx= bytes EAGAIN-error-code)
|
||||||
(raise-continuable
|
;(raise-continuable
|
||||||
(make-i/o-would-block-condition port))
|
; (make-i/o-would-block-condition port))
|
||||||
|
(call/cc
|
||||||
|
(lambda (k)
|
||||||
|
(add-io-event fd k 'w)
|
||||||
|
(process-events)))
|
||||||
(refill bv idx cnt)]
|
(refill bv idx cnt)]
|
||||||
[else (io-error 'write id bytes)])))])
|
[else (io-error 'write id bytes)])))])
|
||||||
refill)
|
refill)
|
||||||
|
@ -2054,6 +2062,11 @@
|
||||||
cmd input-file-buffer-size #f #t
|
cmd input-file-buffer-size #f #t
|
||||||
'process)))))
|
'process)))))
|
||||||
|
|
||||||
|
(define (set-fd-nonblocking fd who id)
|
||||||
|
(let ([rv (foreign-call "ikrt_make_fd_nonblocking" fd)])
|
||||||
|
(unless (eq? rv 0)
|
||||||
|
(io-error who id fd))))
|
||||||
|
|
||||||
(define (socket->ports socket who id block?)
|
(define (socket->ports socket who id block?)
|
||||||
(if (< socket 0)
|
(if (< socket 0)
|
||||||
(io-error who id socket)
|
(io-error who id socket)
|
||||||
|
@ -2064,9 +2077,7 @@
|
||||||
((file-close-proc id socket))
|
((file-close-proc id socket))
|
||||||
(set! closed-once? #t))))])
|
(set! closed-once? #t))))])
|
||||||
(unless block?
|
(unless block?
|
||||||
(let ([rv (foreign-call "ikrt_make_fd_nonblocking" socket)])
|
(set-fd-nonblocking socket who id))
|
||||||
(unless (eq? rv 0)
|
|
||||||
(io-error who id socket))))
|
|
||||||
(values
|
(values
|
||||||
(fh->output-port socket
|
(fh->output-port socket
|
||||||
id output-file-buffer-size #f close who)
|
id output-file-buffer-size #f close who)
|
||||||
|
@ -2091,5 +2102,93 @@
|
||||||
(define-connector tcp-connect-nonblocking "ikrt_tcp_connect" #f)
|
(define-connector tcp-connect-nonblocking "ikrt_tcp_connect" #f)
|
||||||
(define-connector udp-connect-nonblocking "ikrt_udp_connect" #f)
|
(define-connector udp-connect-nonblocking "ikrt_udp_connect" #f)
|
||||||
|
|
||||||
|
(module (add-io-event process-events)
|
||||||
|
(define-struct t (fd proc type))
|
||||||
|
;;; callbacks
|
||||||
|
(define pending '())
|
||||||
|
(define out-queue '())
|
||||||
|
(define in-queue '())
|
||||||
|
|
||||||
|
(define (process-events)
|
||||||
|
(if (null? out-queue)
|
||||||
|
(if (null? in-queue)
|
||||||
|
(if (null? pending)
|
||||||
|
(error 'process-events "no more events")
|
||||||
|
(begin
|
||||||
|
(do-select)
|
||||||
|
(process-events)))
|
||||||
|
(begin
|
||||||
|
(set! out-queue (reverse in-queue))
|
||||||
|
(set! in-queue '())
|
||||||
|
(process-events)))
|
||||||
|
(let ([proc (car out-queue)])
|
||||||
|
(set! out-queue (cdr out-queue))
|
||||||
|
(proc)
|
||||||
|
(process-events))))
|
||||||
|
|
||||||
|
(define (add-io-event fd proc event-type)
|
||||||
|
(set! pending
|
||||||
|
(cons (make-t fd proc event-type) pending)))
|
||||||
|
|
||||||
|
(define (get-max-fd)
|
||||||
|
(assert (pair? pending))
|
||||||
|
(let f ([m (t-fd (car pending))]
|
||||||
|
[ls (cdr pending)])
|
||||||
|
(cond
|
||||||
|
[(null? ls) m]
|
||||||
|
[else (f (max m (t-fd (car ls))) (cdr ls))])))
|
||||||
|
|
||||||
|
(define (do-select)
|
||||||
|
(let ([n (add1 (get-max-fd))])
|
||||||
|
(let ([rbv (make-bytevector n 0)]
|
||||||
|
[wbv (make-bytevector n 0)]
|
||||||
|
[xbv (make-bytevector n 0)])
|
||||||
|
;;; add all fds to their bytevectors depending on type
|
||||||
|
(for-each
|
||||||
|
(lambda (t)
|
||||||
|
(let ([fd (t-fd t)])
|
||||||
|
(let ([i (div fd 8)] [j (mod fd 8)])
|
||||||
|
(let ([bv (case (t-type t)
|
||||||
|
[(r) rbv]
|
||||||
|
[(w) wbv]
|
||||||
|
[(x) xbv]
|
||||||
|
[else
|
||||||
|
(error 'do-select "invalid type" t)])])
|
||||||
|
(bytevector-u8-set! bv i
|
||||||
|
(fxlogor (fxsll 1 j)
|
||||||
|
(bytevector-u8-ref bv i)))))))
|
||||||
|
pending)
|
||||||
|
;;; do select
|
||||||
|
(let ([rv (foreign-call "ikrt_select" n rbv wbv xbv)])
|
||||||
|
(when (< rv 0)
|
||||||
|
(die 'select "error selecting from fds")))
|
||||||
|
;;; go through fds again and see if they're selected
|
||||||
|
(for-each
|
||||||
|
(lambda (t)
|
||||||
|
(let ([fd (t-fd t)])
|
||||||
|
(let ([i (div fd 8)] [j (mod fd 8)])
|
||||||
|
(let ([bv (case (t-type t)
|
||||||
|
[(r) rbv]
|
||||||
|
[(w) wbv]
|
||||||
|
[(x) xbv]
|
||||||
|
[else
|
||||||
|
(error 'do-select "invalid type" t)])])
|
||||||
|
(cond
|
||||||
|
[(fxzero?
|
||||||
|
(fxlogand (fxsll 1 j)
|
||||||
|
(bytevector-u8-ref bv i)))
|
||||||
|
;;; not selected
|
||||||
|
(set! pending (cons t pending))]
|
||||||
|
[else
|
||||||
|
;;; ready
|
||||||
|
(set! in-queue (cons (t-proc t) in-queue))])))))
|
||||||
|
(let ([ls pending])
|
||||||
|
(set! pending '())
|
||||||
|
ls)))))
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
(set-fd-nonblocking 0 'init '*stdin*)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
1420
|
1421
|
||||||
|
|
|
@ -201,6 +201,18 @@ ikrt_make_fd_nonblocking(ikptr fdptr, ikpcb* pcb){
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ikptr
|
||||||
|
ikrt_select(ikptr fds, ikptr rfds, ikptr wfds, ikptr xfds, ikpcb* pcb){
|
||||||
|
int rv = select(unfix(fds),
|
||||||
|
(fd_set*)(rfds + off_bytevector_data),
|
||||||
|
(fd_set*)(wfds + off_bytevector_data),
|
||||||
|
(fd_set*)(xfds + off_bytevector_data),
|
||||||
|
NULL);
|
||||||
|
if(rv < 0){
|
||||||
|
return ikrt_io_error();
|
||||||
|
}
|
||||||
|
return fix(rv);
|
||||||
|
}
|
||||||
|
|
||||||
ikptr
|
ikptr
|
||||||
ikrt_file_ctime(ikptr filename, ikptr res){
|
ikrt_file_ctime(ikptr filename, ikptr res){
|
||||||
|
|
Loading…
Reference in New Issue