; Copyright (c) 1993-1999 by Richard Kelsey and Jonathan Rees. See file COPYING. ; Pipes. ; ; This would be easy except that we have to deal with threads (and who else ; would be using pipes?). ; ; Pipes either have a fixed set of buffers which are continually recycled or ; create new buffers as needed. Having a fixed buffer supply keeps readers ; and writers more-or-less synchronised, while creating buffers as needed allows ; the writers to get arbitrarily far ahead of the readers. ; ; A fixed-buffer pipe has two buffers. At any point one is acting as the ; input buffer and the other as the output buffer. When the input buffer is ; empty the two are swapped. ; ; The complexity of the code below comes from having to deal with the two ; blocking situations: ; - a read is done when all buffers are empty ; - a write is done when the output buffer is full and the input buffer ; is non-empty (and we aren't allowed to make more buffers) ; ; If a read occurs when all buffers are empty we swap a zero-length buffer in ; for the output-buffer and block on a condition variable. The zero-length ; buffer guarantees that the reading thread will be woken when the next write ; occurs. When a write occurs with a zero-length buffer we swap in the real ; buffer, do the write, and then set the input condition variable. ; ; When a write occurs with the write buffer full for a pipe without a fixed ; set of buffers the full buffer is added to a queue. ; ; For a pipe with only two buffers, if a write occurs with the write buffer ; full and the read buffer non-empty, we set the read-limit to be one shorter ; than its real value and block on a condition variable. The bogus read-limit ; means that the writing thread will be woken when a read empties the buffer ; and not have to wait until the following read. When a read reaches the ; read-limit we check to see if there are waiting output threads. If so, we ; write one more character and then wake the sleepers. ; ; If this were a little more integrated with the threads package pipes ; could use queues instead of making new condition variables all the time. (define-record-type pipe-data :pipe-data (make-pipe-data lock in-condvar out-condvar queue out-buffer) pipe-data? (lock pipe-lock) ; a lock for keeping this pipe single threaded (in-condvar ; waiting for a non-empty buffer pipe-in-condvar set-pipe-in-condvar!) (out-condvar ; waiting for an empty buffer pipe-out-condvar set-pipe-out-condvar!) (queue ; queue of full buffers, or #f for a pipe with a fixed pipe-buffer-queue); buffer set (out-buffer ; stashed output buffer pipe-out-buffer set-pipe-out-buffer!)) (define (lock pipe) (obtain-lock (pipe-lock (port-data pipe)))) (define (unlock pipe) (release-lock (pipe-lock (port-data pipe)))) ; Swap the buffers and initialize the various buffer pointers. (define (swap-buffers! port) (let ((temp (port-in-buffer port))) (set-port-in-buffer! port (port-out-buffer port)) (set-port-out-buffer! port temp) (set-port-in-limit! port (port-out-index port)) (set-port-in-index! port 0) (set-port-out-index! port 0))) ; Get a full buffer from the queue. (define (use-buffer-from-queue port) (let ((queue (pipe-buffer-queue (port-data port)))) (if (and queue (not (queue-empty? queue))) (let ((buffer (dequeue! queue))) (set-port-in-buffer! port buffer) (set-port-in-index! port 0) (set-port-in-limit! port (code-vector-length buffer)) #t) #f))) ;---------------------------------------------------------------- ; Input buffers ; Get a non-empty input buffer, if possible. We have five options: ; 1. The current buffer isn't empty. ; 2. The current buffer looks empty but really isn't; the limit was ; decremented by a writer who wants an empty buffer. ; 3. There is a queue and it has a full buffer. ; 4. The output buffer isn't empty. ; 5. The output port is still open and may produce characters in the future. ; If there are no characters and the output port is closed we lose. (define (get-in-buffer port) (cond ((> (port-in-limit port) (port-in-index port)) #t) ((pipe-out-condvar (port-data port)) (set-port-in-limit! port (+ 1 (port-in-limit port))) #t) ((use-buffer-from-queue port) #t) ((< 0 (port-out-index port)) (swap-buffers! port) #t) ((open-output-port? port) (wait-for-input port) (get-in-buffer port)) (else #f))) ; Wait on the input condition variable. If there isn't one, we make a ; new condition variable and swap in a zero-length write buffer to get the ; condition variable set as soon as a write occurs. (define (wait-for-input port) (let* ((data (port-data port)) (cv (if (pipe-in-condvar data) (pipe-in-condvar data) (let ((cv (make-condvar))) (set-pipe-out-buffer! (port-data port) (port-out-buffer port)) (set-port-out-buffer! port (make-code-vector 0 0)) (set-pipe-in-condvar! data cv) cv)))) (release-lock (pipe-lock data)) (condvar-ref cv) (obtain-lock (pipe-lock data)))) ; Wake any threads waiting for input if there are characters available. (define (wake-any-input-waiters port) (let ((data (port-data port))) (let ((cv (pipe-in-condvar data))) (if (and cv (or (< 0 (port-out-index port)) (not (open-output-port? port)))) (begin (set-pipe-in-condvar! data #f) (condvar-set! cv (unspecific))))))) ;---------------------------------------------------------------- ; Output buffers ; Get a non-full output buffer, if possible. We have five options: ; 1. The current buffer has room. ; 2. The current buffer looks full but really isn't; it is a zero-length ; buffer swapped in by a reader who wants characters. ; 3. There is a queue for full buffers. ; 4. The input buffer is empty. ; 5. The input port is still open and may empty its buffer later on. ; If there are no empty buffers and the input port is closed we lose. (define (get-out-buffer port) (let ((len (code-vector-length (port-out-buffer port)))) (cond ((< (port-out-index port) len) #t) ((= 0 len) (set-port-out-buffer! port (pipe-out-buffer (port-data port))) #t) ((pipe-buffer-queue (port-data port)) (make-new-out-buffer port) #t) ((= (port-in-index port) (port-in-limit port)) (swap-buffers! port) #t) ((open-input-port? port) (wait-for-output port) (get-out-buffer port)) (else #f)))) ; Make a new output buffer and put the full one on the queue. (define (make-new-out-buffer port) (let* ((old (port-out-buffer port)) (new (make-code-vector (code-vector-length old) 0))) (enqueue! (pipe-buffer-queue (port-data port)) old) (set-port-out-buffer! port new) (set-port-out-index! port 0))) ; Same as above, on a different condition variable and with a different ; wakeup method. (define (wait-for-output port) (let* ((data (port-data port)) (cv (if (pipe-out-condvar data) (pipe-out-condvar data) (let ((cv (make-condvar))) (set-port-in-limit! port (- (port-in-limit port) 1)) (set-pipe-out-condvar! data cv) cv)))) (release-lock (pipe-lock data)) (condvar-ref cv) (obtain-lock (pipe-lock data)))) (define (wake-any-output-waiters port) (let ((data (port-data port))) (let ((cv (pipe-out-condvar data))) (if (and cv (or (= (port-in-limit port) (port-in-index port)) (not (open-output-port? port)))) (begin (set-pipe-out-condvar! data #f) (condvar-set! cv (unspecific))))))) ; Used by PEEK-CHAR to reset the wakeup limit. (define (do-not-disturb-output-waiters port) (if (pipe-out-condvar (port-data port)) (set-port-in-limit! port (- (port-in-limit port) 1)))) ; Close both ports and wake up any sleepers. (define (close-pipe port close-input?) (lock port) (if close-input? (make-input-port-closed! port)) (make-output-port-closed! port) (wake-any-input-waiters port) (wake-any-output-waiters port) (unlock port)) ;---------------------------------------------------------------- ; The actual handler (define pipe-handler (make-port-handler ;; discloser (lambda (port) (list 'pipe)) ;; input port methods -------------------------- ;; close-input-port (lambda (port) (close-pipe port #t)) ;; The next three methods are called when the input buffer is empty ;; read-char (lambda (port) (lock port) (cond ((get-in-buffer port) (let ((c (read-char port))) (wake-any-output-waiters port) (unlock port) c)) (else (unlock port) (eof-object)))) ;; peek-char (lambda (port) (lock port) (cond ((get-in-buffer port) (let ((c (peek-char port))) (do-not-disturb-output-waiters port) (unlock port) c)) (else (unlock port) (eof-object)))) ;; char-ready? (lambda (port) (> (port-out-index port) 0)) ;; read-block - the buffer has fewer than COUNT characters (lambda (thing start count port) (lock port) (let loop ((start start) (count count)) (let* ((index (port-in-index port)) (have (min (- (port-in-limit port) index) count))) (cond ((> have 0) (copy! (port-in-buffer port) index thing start have) (set-port-in-index! port (+ index have)))) (wake-any-output-waiters port) (cond ((= have count) (unlock port)) ((get-in-buffer port) (loop (+ start have) (- count have))) (else (unlock port) (eof-object)))))) ;; output port methods ------------------------- ;; close-output-port (lambda (port) (close-pipe port #f)) ;; write-char got a full buffer (lambda (char port) (lock port) (cond ((get-out-buffer port) (write-char char port) (wake-any-input-waiters port) (unlock port)) (else (unlock port) (error "writing to a broken pipe")))) ;; write-block couldn't fit COUNT characters into the buffer (lambda (thing start count port) (lock port) (let loop ((start start) (count count)) (cond ((get-out-buffer port) (let* ((buffer (port-out-buffer port)) (index (port-out-index port)) (have (min (- (code-vector-length buffer) index) count))) (cond ((> have 0) (copy! thing start buffer index have) (set-port-out-index! port (+ index have)))) (wake-any-input-waiters port) (if (= have count) (unlock port) (loop (+ start have) (- count have))))) (else (unlock port) (error "writing to a broken pipe"))))) ;; force-output (lambda (port) (values)))) (define pipe-buffer-size 1024) ; Takes an optional size to use for the buffers. A size of #f indicates ; that buffers should be made as needed (we really need omega). (define (make-pipe . maybe-buffer-size) (call-with-values (lambda () (parse-make-pipe-args maybe-buffer-size)) (lambda (size queue) (make-port pipe-handler (bitwise-ior open-input-port-status open-output-port-status) (make-pipe-data (make-lock) ; the lock #f ; input condition variable #f ; output condition variable queue ; full buffer queue #f) ; stashed output buffer (make-code-vector size 0) ; input buffer 0 ; input index 0 ; input limit (make-code-vector size 0) ; output buffer 0)))) ; output limit (define (parse-make-pipe-args maybe-buffer-size) (if (null? maybe-buffer-size) (values pipe-buffer-size #f) (let ((size (car maybe-buffer-size))) (cond ((not size) (values pipe-buffer-size (make-queue))) ((and (integer? size) (exact? size) (< 0 size)) (values size #f)) (else (call-error "invalid pipe buffer size" make-pipe size)))))) ; These should probably be moved to I/O (define (open-input-port? port) (= (bitwise-and open-input-port-status (port-status port)) open-input-port-status)) (define (open-output-port? port) (= (bitwise-and open-output-port-status (port-status port)) open-output-port-status)) ; Won't do string->string copies. (define (copy! from i to j count) (if (code-vector? from) (if (code-vector? to) (copy-bytes! from i to j count) (copy-bytes->chars! from i to j count)) (copy-chars->bytes! from i to j count))) ; Copied from more-port.scm. (define (copy-bytes! from i to j count) (let ((limit (+ count i))) (do ((i i (+ i 1)) (j j (+ j 1))) ((= i limit)) (code-vector-set! to j (code-vector-ref from i))))) (define (copy-chars->bytes! from i to j count) (let ((limit (+ count i))) (do ((i i (+ i 1)) (j j (+ j 1))) ((= i limit)) (code-vector-set! to j (char->ascii (string-ref from i)))))) (define (copy-bytes->chars! from i to j count) (let ((limit (+ count i))) (do ((i i (+ i 1)) (j j (+ j 1))) ((= i limit)) (string-set! to j (ascii->char (code-vector-ref from i))))))