added set-port-buffering. Supports gentle buffer draining and a slightly limited bufpol/line. Code looks quite ugly, but I think that's inherent...
This commit is contained in:
parent
55b3db0c72
commit
4ca964b9ea
|
@ -81,6 +81,17 @@
|
|||
(release-port-lock channel-port)
|
||||
p))
|
||||
|
||||
(define (channel-port->unbuffered-output-fdport channel-port)
|
||||
(let ((p (make-unbuffered-output-port unbuffered-output-fdport-handler
|
||||
(make-fdport-data (channel-cell-ref(port-data channel-port)) 1))))
|
||||
(obtain-port-lock channel-port)
|
||||
(set-port-lock! p (port-lock channel-port))
|
||||
(set-port-locked?! p (port-locked? channel-port))
|
||||
(install-fdport p)
|
||||
; (periodically-force-output! p)
|
||||
(release-port-lock channel-port)
|
||||
p))
|
||||
|
||||
(define (alloc-input-fdport fd revealed)
|
||||
(make-input-port input-fdport-handler
|
||||
(make-fdport-data (make-input-fdchannel fd) revealed)
|
||||
|
@ -98,7 +109,7 @@
|
|||
|
||||
(define (make-output-fdport fd revealed)
|
||||
(let ((p (alloc-output-fdport fd revealed)))
|
||||
(periodically-force-output! p)
|
||||
;(periodically-force-output! p)
|
||||
(install-fdport p)
|
||||
p))
|
||||
|
||||
|
@ -117,35 +128,248 @@
|
|||
(close-channel (fdport-data:channel fdport*)))
|
||||
|
||||
;The handlers drop straight through to the convenient channel routines.
|
||||
(define input-fdport-handler
|
||||
(define (make-input-fdport-handler bufferproc)
|
||||
(make-port-handler
|
||||
(lambda (fdport*)
|
||||
(list 'input-fdport (fdport-data:channel fdport*)))
|
||||
close-fdport*
|
||||
bufferproc
|
||||
(lambda (fdport* owner)
|
||||
(steal-channel! (fdport-data:channel fdport*) owner))))
|
||||
|
||||
(define input-fdport-handler
|
||||
(make-input-fdport-handler
|
||||
(lambda (fdport* buffer start needed)
|
||||
(channel-read buffer start needed (fdport-data:channel fdport*)))
|
||||
(channel-read buffer start needed (fdport-data:channel fdport*)))))
|
||||
|
||||
(define (make-output-fdport-handler bufferproc)
|
||||
(make-port-handler
|
||||
(lambda (fdport*)
|
||||
(list 'output-fdport (fdport-data:channel fdport*)))
|
||||
close-fdport*
|
||||
bufferproc
|
||||
(lambda (fdport* owner)
|
||||
(steal-channel! (fdport-data:channel fdport*) owner))))
|
||||
|
||||
(define output-fdport-handler
|
||||
(make-port-handler
|
||||
(lambda (fdport*)
|
||||
(list 'output-fdport (fdport-data:channel fdport*)))
|
||||
close-fdport*
|
||||
(make-output-fdport-handler
|
||||
(lambda (fdport* buffer start count)
|
||||
(channel-write buffer start count (fdport-data:channel fdport*)))
|
||||
(lambda (fdport* owner)
|
||||
(steal-channel! (fdport-data:channel fdport*) owner))))
|
||||
(channel-write buffer start count (fdport-data:channel fdport*)))))
|
||||
|
||||
(define unbuffered-output-fdport-handler
|
||||
(let ((buffer (make-code-vector 1 0)))
|
||||
(make-output-fdport-handler
|
||||
(lambda (fdport* char)
|
||||
(code-vector-set! buffer 0 (char->ascii char))
|
||||
(channel-write buffer 0 1 (fdport-data:channel fdport*))))))
|
||||
|
||||
(define fdport-data port-data)
|
||||
; That was easy.
|
||||
|
||||
(define (set-port-buffering port policy . maybe-size)
|
||||
(warn "JMG: use of set-port-buffering"))
|
||||
|
||||
(define (06-policy policy)
|
||||
(case policy
|
||||
((0) 'bufpol/block)
|
||||
((1) 'bufpol/line)
|
||||
((2) 'bufpol/none)
|
||||
(else policy)))
|
||||
|
||||
(define (guess-output-policy port)
|
||||
(if (= 0 (port-limit port))
|
||||
'bufpol/none
|
||||
'bufpol/block))
|
||||
|
||||
|
||||
(define (set-port-buffering port policy . maybe-size)
|
||||
(let ((policy (06-policy policy)))
|
||||
(cond ((and (fdport? port) (open-input-port? port))
|
||||
(let ((size (if (pair? maybe-size) (car maybe-size) 255)))
|
||||
(set-input-port-buffering port policy size)))
|
||||
((and (fdport? port) (open-output-port? port))
|
||||
(let ((size (if (pair? maybe-size) (car maybe-size) 255)))
|
||||
(if (<= size 0) (error "size must be at least 1"))
|
||||
(set-output-port-buffering port policy size)))
|
||||
(else
|
||||
(warn "port-type not supported" port)))))
|
||||
|
||||
(define (set-output-port-buffering port policy size)
|
||||
(cond ((eq? policy 'bufpol/none)
|
||||
(install-nullbuffer port unbuffered-output-fdport-handler))
|
||||
((eq? policy 'bufpol/block)
|
||||
(let ((old-size (code-vector-length (port-buffer port)))
|
||||
(new-buffer (make-code-vector size 0)))
|
||||
(if (< size old-size)
|
||||
(begin
|
||||
(really-force-output port)
|
||||
(obtain-port-lock port)
|
||||
(set-port-index! port 0))
|
||||
(begin
|
||||
(obtain-port-lock port)
|
||||
(copy-bytes! (port-buffer port) 0 new-buffer 0 old-size)))
|
||||
(install-buffer port new-buffer size)
|
||||
(release-port-lock port)))
|
||||
((eq? policy 'bufpol/line)
|
||||
(install-nullbuffer port (make-line-output-proc size)))
|
||||
(else (warn "policy not supported " policy))))
|
||||
|
||||
(define (install-nullbuffer port handler)
|
||||
(really-force-output port)
|
||||
(obtain-port-lock port)
|
||||
(set-port-limit! port 0)
|
||||
(set-port-index! port 0)
|
||||
(set-port-buffer! port (make-code-vector 0 0))
|
||||
(set-port-handler! port handler)
|
||||
(release-port-lock port))
|
||||
|
||||
(define (install-buffer port new-buffer size)
|
||||
(if (eq? 'bufpol/none (guess-output-policy port))
|
||||
(set-port-handler! port output-fdport-handler))
|
||||
(set-port-limit! port size)
|
||||
(set-port-buffer! port new-buffer))
|
||||
|
||||
; TODO flush on stdinput is required
|
||||
;;; This port can ONLY be flushed with a newline or a close-output
|
||||
;;; flush-output won't help
|
||||
(define (make-line-output-proc size)
|
||||
(let ((proc-buffer (make-code-vector size 0))
|
||||
(proc-buffer-index 0))
|
||||
(make-port-handler
|
||||
(lambda (fdport*)
|
||||
(list 'output-fdport (fdport-data:channel fdport*)))
|
||||
(lambda (fdport*)
|
||||
(channel-write proc-buffer
|
||||
0
|
||||
proc-buffer-index
|
||||
(fdport-data:channel fdport*))
|
||||
(close-fdport* fdport*))
|
||||
(lambda (fdport* char)
|
||||
(code-vector-set! proc-buffer proc-buffer-index (char->ascii char))
|
||||
(set! proc-buffer-index (+ proc-buffer-index 1))
|
||||
(cond ((or (eq? char #\newline) (= proc-buffer-index size))
|
||||
(channel-write proc-buffer
|
||||
0
|
||||
proc-buffer-index
|
||||
(fdport-data:channel fdport*))
|
||||
(set! proc-buffer-index 0))))
|
||||
(lambda (fdport* owner)
|
||||
(steal-channel! (fdport-data:channel fdport*) owner)))))
|
||||
|
||||
|
||||
(define (set-input-port-buffering port policy size)
|
||||
(cond ((eq? policy 'bufpol/none)
|
||||
(set-input-port-buffering port 'bufpol/block 1))
|
||||
((eq? policy 'bufpol/block)
|
||||
(if (<= size 0) (error "size must be at least 1"))
|
||||
(install-input-handler port input-fdport-handler size #t))
|
||||
((eq? policy 'bufpol/line)
|
||||
(if (<= size 0) (error "size must be at least 1"))
|
||||
(install-input-handler port line-input-handler size #t))
|
||||
(else (warn "policy not supported " policy))))
|
||||
|
||||
(define (install-input-handler port new-handler size gentle?)
|
||||
(obtain-port-lock port)
|
||||
(let* ((old-limit (port-limit port))
|
||||
(old-index (port-index port))
|
||||
(old-buffer (port-buffer port))
|
||||
(old-unread (- old-limit old-index))
|
||||
(new-unread (min old-unread size))
|
||||
(throw-away (max 0 (- old-unread new-unread)))
|
||||
(new-buffer (make-code-vector size 0)))
|
||||
(if (not gentle?)
|
||||
(let ((ret (if (> throw-away 0)
|
||||
(let ((return-buffer
|
||||
(make-code-vector throw-away 0)))
|
||||
(copy-bytes! old-buffer old-index
|
||||
return-buffer 0
|
||||
throw-away) return-buffer)
|
||||
#f)))
|
||||
(copy-bytes! old-buffer (+ old-index throw-away)
|
||||
new-buffer 0
|
||||
new-unread)
|
||||
(set-port-buffer! port new-buffer)
|
||||
(set-port-index! port 0)
|
||||
(set-port-limit! port new-unread)
|
||||
(set-port-handler! port new-handler)
|
||||
(release-port-lock port)
|
||||
ret)
|
||||
(begin
|
||||
(install-drain-port-handler
|
||||
old-buffer old-index old-limit port new-handler)
|
||||
(set-port-buffer! port new-buffer)
|
||||
(set-port-index! port 0)
|
||||
(set-port-limit! port 0)
|
||||
(release-port-lock port)
|
||||
#t))))
|
||||
|
||||
(define (install-drain-port-handler
|
||||
old-buffer old-start old-limit port new-handler)
|
||||
(if (< 0 (- old-limit old-start))
|
||||
(set-port-handler! port
|
||||
(make-drain-port-handler
|
||||
old-buffer old-start old-limit port new-handler))
|
||||
(set-port-handler! port new-handler)))
|
||||
|
||||
|
||||
;;; TODO: This reference to port will prevent gc !!!
|
||||
(define (make-drain-port-handler
|
||||
very-old-buffer old-start old-limit port new-handler)
|
||||
(let ((old-buffer (make-code-vector old-limit 0)))
|
||||
(copy-bytes! very-old-buffer 0 old-buffer 0 old-limit)
|
||||
(make-input-fdport-handler
|
||||
(lambda (data buffer start needed)
|
||||
(let ((old-left (- (code-vector-length old-buffer) old-start)))
|
||||
(let ((size (cond ((or (eq? needed 'any) (eq? needed 'immediate))
|
||||
(min old-left
|
||||
(code-vector-length buffer)))
|
||||
(else (min needed old-left)))))
|
||||
(copy-bytes! old-buffer old-start buffer start size)
|
||||
(set! old-start (+ size old-start))
|
||||
|
||||
(if (= old-start (code-vector-length old-buffer)) ;buffer drained ?
|
||||
(begin
|
||||
(set-port-handler! port new-handler)
|
||||
(if (and (integer? needed) (> needed size))
|
||||
(+ size ((port-handler-buffer-proc new-handler)
|
||||
data buffer (+ start size) (- needed size)))
|
||||
size))
|
||||
size)))))))
|
||||
|
||||
(define line-input-handler
|
||||
(make-input-fdport-handler
|
||||
(lambda (fdport* buffer start needed)
|
||||
(read-until-newline fdport* buffer start needed 0))))
|
||||
|
||||
(define newline-ascii (char->ascii #\newline))
|
||||
|
||||
(define (read-until-newline fdport* buffer start needed counter)
|
||||
(let ((channel-needed (if (eq? needed 'immediate) needed 1)))
|
||||
(let ((read (channel-read buffer
|
||||
start
|
||||
channel-needed
|
||||
(fdport-data:channel fdport*))))
|
||||
(if (= 0 read)
|
||||
counter
|
||||
(begin
|
||||
(set! counter (+ counter 1))
|
||||
(if (or (and (code-vector? buffer)
|
||||
(= newline-ascii (code-vector-ref buffer start)))
|
||||
(and (string? buffer)
|
||||
(eq? #\newline (string-ref buffer start)))
|
||||
(and (number? needed) (= needed counter))
|
||||
(and (eq? needed 'any) (= (+ start 1) (buffer-length buffer))))
|
||||
counter
|
||||
(read-until-newline fdport* buffer
|
||||
(+ start 1)
|
||||
needed
|
||||
counter)))))))
|
||||
(define (buffer-length buffer)
|
||||
(if (string? buffer)
|
||||
(string-length buffer)
|
||||
(code-vector-length buffer)))
|
||||
;;; Open & Close
|
||||
;;; ------------
|
||||
|
||||
;;; replace rts/channel-port.scm begin
|
||||
(define (open-file fname flags . maybe-mode)
|
||||
(let ((fd (apply open-fdes fname flags maybe-mode))
|
||||
(access (bitwise-and flags open/access-mask)))
|
||||
|
@ -165,6 +389,7 @@
|
|||
(flags (deposit-bit-field flags open/access-mask open/write)))
|
||||
(apply open-file fname flags maybe-mode)))
|
||||
|
||||
;;; replace rts/channel-port.scm end
|
||||
|
||||
;;; All these revealed-count-hacking procs have atomicity problems.
|
||||
;;; They need to run uninterrupted.
|
||||
|
@ -284,7 +509,7 @@
|
|||
(set-fluid! $current-input-port (channel-port->input-fdport (current-input-port)))
|
||||
(set-fluid! $current-output-port (channel-port->output-fdport (current-output-port)))
|
||||
|
||||
(set-fluid! $current-error-port (channel-port->output-fdport (current-error-port)))
|
||||
(set-fluid! $current-error-port (channel-port->unbuffered-output-fdport (current-error-port)))
|
||||
(set-fluid! $current-noise-port (make-null-output-port)))
|
||||
|
||||
;;; Generic port operations
|
||||
|
@ -459,6 +684,7 @@
|
|||
(if port
|
||||
(close port)))))))
|
||||
|
||||
;;; replace rts/channel-port.scm begin
|
||||
(define call-with-input-file
|
||||
(call-with-mumble-file open-input-file close-input-port))
|
||||
|
||||
|
@ -474,3 +700,5 @@
|
|||
(call-with-output-file string
|
||||
(lambda (port)
|
||||
(let-fluid $current-output-port port thunk))))
|
||||
|
||||
;;; replace rts/channel-port.scm end
|
Loading…
Reference in New Issue