scsh-0.6/scheme/big/pipe.scm

414 lines
13 KiB
Scheme
Raw Normal View History

2003-05-01 06:21:33 -04:00
; Copyright (c) 1993-1999 by Richard Kelsey and Jonathan Rees. See file COPYING.
; Pipes.
;
; This would be easy except that we have to deal with threads (and who else
; would be using pipes?).
;
; Pipes either have a fixed set of buffers which are continually recycled or
; create new buffers as needed. Having a fixed buffer supply keeps readers
; and writers more-or-less synchronised, while creating buffers as needed allows
; the writers to get arbitrarily far ahead of the readers.
;
; A fixed-buffer pipe has two buffers. At any point one is acting as the
; input buffer and the other as the output buffer. When the input buffer is
; empty the two are swapped.
;
; The complexity of the code below comes from having to deal with the two
; blocking situations:
; - a read is done when all buffers are empty
; - a write is done when the output buffer is full and the input buffer
; is non-empty (and we aren't allowed to make more buffers)
;
; If a read occurs when all buffers are empty we swap a zero-length buffer in
; for the output-buffer and block on a condition variable. The zero-length
; buffer guarantees that the reading thread will be woken when the next write
; occurs. When a write occurs with a zero-length buffer we swap in the real
; buffer, do the write, and then set the input condition variable.
;
; When a write occurs with the write buffer full for a pipe without a fixed
; set of buffers the full buffer is added to a queue.
;
; For a pipe with only two buffers, if a write occurs with the write buffer
; full and the read buffer non-empty, we set the read-limit to be one shorter
; than its real value and block on a condition variable. The bogus read-limit
; means that the writing thread will be woken when a read empties the buffer
; and not have to wait until the following read. When a read reaches the
; read-limit we check to see if there are waiting output threads. If so, we
; write one more character and then wake the sleepers.
;
; If this were a little more integrated with the threads package pipes
; could use queues instead of making new condition variables all the time.
(define-record-type pipe-data :pipe-data
(make-pipe-data lock in-condvar out-condvar queue out-buffer)
pipe-data?
(lock pipe-lock) ; a lock for keeping this pipe single threaded
(in-condvar ; waiting for a non-empty buffer
pipe-in-condvar
set-pipe-in-condvar!)
(out-condvar ; waiting for an empty buffer
pipe-out-condvar
set-pipe-out-condvar!)
(queue ; queue of full buffers, or #f for a pipe with a fixed
pipe-buffer-queue); buffer set
(out-buffer ; stashed output buffer
pipe-out-buffer
set-pipe-out-buffer!))
(define (lock pipe)
(obtain-lock (pipe-lock (port-data pipe))))
(define (unlock pipe)
(release-lock (pipe-lock (port-data pipe))))
; Swap the buffers and initialize the various buffer pointers.
(define (swap-buffers! port)
(let ((temp (port-in-buffer port)))
(set-port-in-buffer! port (port-out-buffer port))
(set-port-out-buffer! port temp)
(set-port-in-limit! port (port-out-index port))
(set-port-in-index! port 0)
(set-port-out-index! port 0)))
; Get a full buffer from the queue.
(define (use-buffer-from-queue port)
(let ((queue (pipe-buffer-queue (port-data port))))
(if (and queue (not (queue-empty? queue)))
(let ((buffer (dequeue! queue)))
(set-port-in-buffer! port buffer)
(set-port-in-index! port 0)
(set-port-in-limit! port (code-vector-length buffer))
#t)
#f)))
;----------------------------------------------------------------
; Input buffers
; Get a non-empty input buffer, if possible. We have five options:
; 1. The current buffer isn't empty.
; 2. The current buffer looks empty but really isn't; the limit was
; decremented by a writer who wants an empty buffer.
; 3. There is a queue and it has a full buffer.
; 4. The output buffer isn't empty.
; 5. The output port is still open and may produce characters in the future.
; If there are no characters and the output port is closed we lose.
(define (get-in-buffer port)
(cond ((> (port-in-limit port)
(port-in-index port))
#t)
((pipe-out-condvar (port-data port))
(set-port-in-limit! port (+ 1 (port-in-limit port)))
#t)
((use-buffer-from-queue port)
#t)
((< 0 (port-out-index port))
(swap-buffers! port)
#t)
((open-output-port? port)
(wait-for-input port)
(get-in-buffer port))
(else
#f)))
; Wait on the input condition variable. If there isn't one, we make a
; new condition variable and swap in a zero-length write buffer to get the
; condition variable set as soon as a write occurs.
(define (wait-for-input port)
(let* ((data (port-data port))
(cv (if (pipe-in-condvar data)
(pipe-in-condvar data)
(let ((cv (make-condvar)))
(set-pipe-out-buffer! (port-data port) (port-out-buffer port))
(set-port-out-buffer! port (make-code-vector 0 0))
(set-pipe-in-condvar! data cv)
cv))))
(release-lock (pipe-lock data))
(condvar-ref cv)
(obtain-lock (pipe-lock data))))
; Wake any threads waiting for input if there are characters available.
(define (wake-any-input-waiters port)
(let ((data (port-data port)))
(let ((cv (pipe-in-condvar data)))
(if (and cv
(or (< 0 (port-out-index port))
(not (open-output-port? port))))
(begin
(set-pipe-in-condvar! data #f)
(condvar-set! cv (unspecific)))))))
;----------------------------------------------------------------
; Output buffers
; Get a non-full output buffer, if possible. We have five options:
; 1. The current buffer has room.
; 2. The current buffer looks full but really isn't; it is a zero-length
; buffer swapped in by a reader who wants characters.
; 3. There is a queue for full buffers.
; 4. The input buffer is empty.
; 5. The input port is still open and may empty its buffer later on.
; If there are no empty buffers and the input port is closed we lose.
(define (get-out-buffer port)
(let ((len (code-vector-length (port-out-buffer port))))
(cond ((< (port-out-index port) len)
#t)
((= 0 len)
(set-port-out-buffer! port (pipe-out-buffer (port-data port)))
#t)
((pipe-buffer-queue (port-data port))
(make-new-out-buffer port)
#t)
((= (port-in-index port) (port-in-limit port))
(swap-buffers! port)
#t)
((open-input-port? port)
(wait-for-output port)
(get-out-buffer port))
(else
#f))))
; Make a new output buffer and put the full one on the queue.
(define (make-new-out-buffer port)
(let* ((old (port-out-buffer port))
(new (make-code-vector (code-vector-length old) 0)))
(enqueue! (pipe-buffer-queue (port-data port)) old)
(set-port-out-buffer! port new)
(set-port-out-index! port 0)))
; Same as above, on a different condition variable and with a different
; wakeup method.
(define (wait-for-output port)
(let* ((data (port-data port))
(cv (if (pipe-out-condvar data)
(pipe-out-condvar data)
(let ((cv (make-condvar)))
(set-port-in-limit! port (- (port-in-limit port) 1))
(set-pipe-out-condvar! data cv)
cv))))
(release-lock (pipe-lock data))
(condvar-ref cv)
(obtain-lock (pipe-lock data))))
(define (wake-any-output-waiters port)
(let ((data (port-data port)))
(let ((cv (pipe-out-condvar data)))
(if (and cv
(or (= (port-in-limit port) (port-in-index port))
(not (open-output-port? port))))
(begin
(set-pipe-out-condvar! data #f)
(condvar-set! cv (unspecific)))))))
; Used by PEEK-CHAR to reset the wakeup limit.
(define (do-not-disturb-output-waiters port)
(if (pipe-out-condvar (port-data port))
(set-port-in-limit! port (- (port-in-limit port) 1))))
; Close both ports and wake up any sleepers.
(define (close-pipe port close-input?)
(lock port)
(if close-input?
(make-input-port-closed! port))
(make-output-port-closed! port)
(wake-any-input-waiters port)
(wake-any-output-waiters port)
(unlock port))
;----------------------------------------------------------------
; The actual handler
(define pipe-handler
(make-port-handler
;; discloser
(lambda (port)
(list 'pipe))
;; input port methods --------------------------
;; close-input-port
(lambda (port)
(close-pipe port #t))
;; The next three methods are called when the input buffer is empty
;; read-char
(lambda (port)
(lock port)
(cond ((get-in-buffer port)
(let ((c (read-char port)))
(wake-any-output-waiters port)
(unlock port)
c))
(else
(unlock port)
(eof-object))))
;; peek-char
(lambda (port)
(lock port)
(cond ((get-in-buffer port)
(let ((c (peek-char port)))
(do-not-disturb-output-waiters port)
(unlock port)
c))
(else
(unlock port)
(eof-object))))
;; char-ready?
(lambda (port)
(> (port-out-index port) 0))
;; read-block - the buffer has fewer than COUNT characters
(lambda (thing start count port)
(lock port)
(let loop ((start start) (count count))
(let* ((index (port-in-index port))
(have (min (- (port-in-limit port) index)
count)))
(cond ((> have 0)
(copy! (port-in-buffer port) index thing start have)
(set-port-in-index! port (+ index have))))
(wake-any-output-waiters port)
(cond ((= have count)
(unlock port))
((get-in-buffer port)
(loop (+ start have) (- count have)))
(else
(unlock port)
(eof-object))))))
;; output port methods -------------------------
;; close-output-port
(lambda (port)
(close-pipe port #f))
;; write-char got a full buffer
(lambda (char port)
(lock port)
(cond ((get-out-buffer port)
(write-char char port)
(wake-any-input-waiters port)
(unlock port))
(else
(unlock port)
(error "writing to a broken pipe"))))
;; write-block couldn't fit COUNT characters into the buffer
(lambda (thing start count port)
(lock port)
(let loop ((start start) (count count))
(cond ((get-out-buffer port)
(let* ((buffer (port-out-buffer port))
(index (port-out-index port))
(have (min (- (code-vector-length buffer) index)
count)))
(cond ((> have 0)
(copy! thing start buffer index have)
(set-port-out-index! port (+ index have))))
(wake-any-input-waiters port)
(if (= have count)
(unlock port)
(loop (+ start have) (- count have)))))
(else
(unlock port)
(error "writing to a broken pipe")))))
;; force-output
(lambda (port)
(values))))
(define pipe-buffer-size 1024)
; Takes an optional size to use for the buffers. A size of #f indicates
; that buffers should be made as needed (we really need omega).
(define (make-pipe . maybe-buffer-size)
(call-with-values
(lambda ()
(parse-make-pipe-args maybe-buffer-size))
(lambda (size queue)
(make-port pipe-handler
(bitwise-ior open-input-port-status
open-output-port-status)
(make-pipe-data (make-lock) ; the lock
#f ; input condition variable
#f ; output condition variable
queue ; full buffer queue
#f) ; stashed output buffer
(make-code-vector size 0) ; input buffer
0 ; input index
0 ; input limit
(make-code-vector size 0) ; output buffer
0)))) ; output limit
(define (parse-make-pipe-args maybe-buffer-size)
(if (null? maybe-buffer-size)
(values pipe-buffer-size #f)
(let ((size (car maybe-buffer-size)))
(cond ((not size)
(values pipe-buffer-size (make-queue)))
((and (integer? size)
(exact? size)
(< 0 size))
(values size #f))
(else
(call-error "invalid pipe buffer size" make-pipe size))))))
; These should probably be moved to I/O
(define (open-input-port? port)
(= (bitwise-and open-input-port-status
(port-status port))
open-input-port-status))
(define (open-output-port? port)
(= (bitwise-and open-output-port-status
(port-status port))
open-output-port-status))
; Won't do string->string copies.
(define (copy! from i to j count)
(if (code-vector? from)
(if (code-vector? to)
(copy-bytes! from i to j count)
(copy-bytes->chars! from i to j count))
(copy-chars->bytes! from i to j count)))
; Copied from more-port.scm.
(define (copy-bytes! from i to j count)
(let ((limit (+ count i)))
(do ((i i (+ i 1))
(j j (+ j 1)))
((= i limit))
(code-vector-set! to j (code-vector-ref from i)))))
(define (copy-chars->bytes! from i to j count)
(let ((limit (+ count i)))
(do ((i i (+ i 1))
(j j (+ j 1)))
((= i limit))
(code-vector-set! to j (char->ascii (string-ref from i))))))
(define (copy-bytes->chars! from i to j count)
(let ((limit (+ count i)))
(do ((i i (+ i 1))
(j j (+ j 1)))
((= i limit))
(string-set! to j (ascii->char (code-vector-ref from i))))))