; Copyright (c) 1993-1999 by Richard Kelsey and Jonathan Rees. See file COPYING.

; Pipes.
;
; This would be easy except that we have to deal with threads (and who else
; would be using pipes?).
;
; Pipes either have a fixed set of buffers which are continually recycled or
; create new buffers as needed.  Having a fixed buffer supply keeps readers
; and writers more-or-less synchronised, while creating buffers as needed allows
; the writers to get arbitrarily far ahead of the readers.
;
; A fixed-buffer pipe has two buffers.  At any point one is acting as the
; input buffer and the other as the output buffer.  When the input buffer is
; empty the two are swapped.
;
; The complexity of the code below comes from having to deal with the two
; blocking situations:
;  - a read is done when all buffers are empty
;  - a write is done when the output buffer is full and the input buffer
;    is non-empty (and we aren't allowed to make more buffers)
; 
; If a read occurs when all buffers are empty we swap a zero-length buffer in
; for the output-buffer and block on a condition variable.  The zero-length
; buffer guarantees that the reading thread will be woken when the next write
; occurs.  When a write occurs with a zero-length buffer we swap in the real
; buffer, do the write, and then set the input condition variable.
;
; When a write occurs with the write buffer full for a pipe without a fixed
; set of buffers the full buffer is added to a queue.
;
; For a pipe with only two buffers, if a write occurs with the write buffer
; full and the read buffer non-empty, we set the read-limit to be one shorter
; than its real value and block on a condition variable.  The bogus read-limit
; means that the writing thread will be woken when a read empties the buffer
; and not have to wait until the following read.  When a read reaches the
; read-limit we check to see if there are waiting output threads.  If so, we
; write one more character and then wake the sleepers.
;
; If this were a little more integrated with the threads package pipes
; could use queues instead of making new condition variables all the time.

(define-record-type pipe-data :pipe-data
  (make-pipe-data lock in-condvar out-condvar queue out-buffer)
  pipe-data?
  (lock pipe-lock)   ; a lock for keeping this pipe single threaded
  (in-condvar        ; waiting for a non-empty buffer
   pipe-in-condvar
   set-pipe-in-condvar!)
  (out-condvar       ; waiting for an empty buffer
   pipe-out-condvar
   set-pipe-out-condvar!)
  (queue             ; queue of full buffers, or #f for a pipe with a fixed
   pipe-buffer-queue);   buffer set
  (out-buffer        ; stashed output buffer
   pipe-out-buffer
   set-pipe-out-buffer!))
    
(define (lock pipe)
  (obtain-lock (pipe-lock (port-data pipe))))

(define (unlock pipe)
  (release-lock (pipe-lock (port-data pipe))))

; Swap the buffers and initialize the various buffer pointers.

(define (swap-buffers! port)
  (let ((temp (port-in-buffer port)))
    (set-port-in-buffer! port (port-out-buffer port))
    (set-port-out-buffer! port temp)
    (set-port-in-limit! port (port-out-index port))
    (set-port-in-index! port 0)
    (set-port-out-index! port 0)))

; Get a full buffer from the queue.

(define (use-buffer-from-queue port)
  (let ((queue (pipe-buffer-queue (port-data port))))
    (if (and queue (not (queue-empty? queue)))
	(let ((buffer (dequeue! queue)))
	  (set-port-in-buffer! port buffer)
	  (set-port-in-index! port 0)
	  (set-port-in-limit! port (code-vector-length buffer))
	  #t)
	#f)))

;----------------------------------------------------------------
; Input buffers

; Get a non-empty input buffer, if possible.  We have five options:
;  1. The current buffer isn't empty.
;  2. The current buffer looks empty but really isn't; the limit was
;     decremented by a writer who wants an empty buffer.
;  3. There is a queue and it has a full buffer.
;  4. The output buffer isn't empty.
;  5. The output port is still open and may produce characters in the future.
; If there are no characters and the output port is closed we lose.

(define (get-in-buffer port)
  (cond ((> (port-in-limit port)
	    (port-in-index port))
	 #t)
	((pipe-out-condvar (port-data port))
	 (set-port-in-limit! port (+ 1 (port-in-limit port)))
	 #t)
	((use-buffer-from-queue port)
	 #t)
	((< 0 (port-out-index port))
	 (swap-buffers! port)
	 #t)
	((open-output-port? port)
	 (wait-for-input port)
	 (get-in-buffer port))
	(else
	 #f)))

; Wait on the input condition variable.  If there isn't one, we make a
; new condition variable and swap in a zero-length write buffer to get the
; condition variable set as soon as a write occurs.

(define (wait-for-input port)
  (let* ((data (port-data port))
	 (cv (if (pipe-in-condvar data)
		 (pipe-in-condvar data)
		 (let ((cv (make-condvar)))
		   (set-pipe-out-buffer! (port-data port) (port-out-buffer port))
		   (set-port-out-buffer! port (make-code-vector 0 0))
		   (set-pipe-in-condvar! data cv)
		   cv))))
    (release-lock (pipe-lock data))
    (condvar-ref cv)
    (obtain-lock (pipe-lock data))))

; Wake any threads waiting for input if there are characters available.

(define (wake-any-input-waiters port)
  (let ((data (port-data port)))
    (let ((cv (pipe-in-condvar data)))
      (if (and cv
	       (or (< 0 (port-out-index port))
		   (not (open-output-port? port))))
	  (begin
            (set-pipe-in-condvar! data #f)
	    (condvar-set! cv (unspecific)))))))

;----------------------------------------------------------------
; Output buffers

; Get a non-full output buffer, if possible.  We have five options:
;  1. The current buffer has room.
;  2. The current buffer looks full but really isn't; it is a zero-length
;     buffer swapped in by a reader who wants characters.
;  3. There is a queue for full buffers.
;  4. The input buffer is empty.
;  5. The input port is still open and may empty its buffer later on.
; If there are no empty buffers and the input port is closed we lose.

(define (get-out-buffer port)
  (let ((len (code-vector-length (port-out-buffer port))))
    (cond ((< (port-out-index port) len)
           #t)
	  ((= 0 len)
	   (set-port-out-buffer! port (pipe-out-buffer (port-data port)))
	   #t)
	  ((pipe-buffer-queue (port-data port))
	   (make-new-out-buffer port)
	   #t)
	  ((= (port-in-index port) (port-in-limit port))
           (swap-buffers! port)
	   #t)
	  ((open-input-port? port)
	   (wait-for-output port)
	   (get-out-buffer port))
	  (else
	   #f))))

; Make a new output buffer and put the full one on the queue.

(define (make-new-out-buffer port)
  (let* ((old (port-out-buffer port))
	 (new (make-code-vector (code-vector-length old) 0)))
    (enqueue! (pipe-buffer-queue (port-data port)) old)
    (set-port-out-buffer! port new)
    (set-port-out-index! port 0)))

; Same as above, on a different condition variable and with a different
; wakeup method.

(define (wait-for-output port)
  (let* ((data (port-data port))
	 (cv (if (pipe-out-condvar data)
                 (pipe-out-condvar data)
                 (let ((cv (make-condvar)))
		   (set-port-in-limit! port (- (port-in-limit port) 1))
		   (set-pipe-out-condvar! data cv)
		   cv))))
    (release-lock (pipe-lock data))
    (condvar-ref cv)
    (obtain-lock (pipe-lock data))))

(define (wake-any-output-waiters port)
  (let ((data (port-data port)))
    (let ((cv (pipe-out-condvar data)))
      (if (and cv
	       (or (= (port-in-limit port) (port-in-index port))
		   (not (open-output-port? port))))
	  (begin
            (set-pipe-out-condvar! data #f)
	    (condvar-set! cv (unspecific)))))))

; Used by PEEK-CHAR to reset the wakeup limit.

(define (do-not-disturb-output-waiters port)
  (if (pipe-out-condvar (port-data port))
      (set-port-in-limit! port (- (port-in-limit port) 1))))

; Close both ports and wake up any sleepers.

(define (close-pipe port close-input?)
  (lock port)
  (if close-input? 
      (make-input-port-closed! port))
  (make-output-port-closed! port)
  (wake-any-input-waiters port)
  (wake-any-output-waiters port)
  (unlock port))

;----------------------------------------------------------------
; The actual handler

(define pipe-handler
  (make-port-handler

   ;; discloser
   (lambda (port)
     (list 'pipe))

   ;; input port methods --------------------------

   ;; close-input-port
   (lambda (port)
     (close-pipe port #t))

   ;; The next three methods are called when the input buffer is empty
   ;; read-char
   (lambda (port)
     (lock port)
     (cond ((get-in-buffer port)
	    (let ((c (read-char port)))
	      (wake-any-output-waiters port)
	      (unlock port)
	      c))
	   (else
            (unlock port)
	    (eof-object))))

   ;; peek-char
   (lambda (port)
     (lock port)
     (cond ((get-in-buffer port)
	    (let ((c (peek-char port)))
	      (do-not-disturb-output-waiters port)
	      (unlock port)
	      c))
	   (else
            (unlock port)
	    (eof-object))))

   ;; char-ready?
   (lambda (port)
     (> (port-out-index port) 0))

   ;; read-block - the buffer has fewer than COUNT characters
   (lambda (thing start count port)
     (lock port)
     (let loop ((start start) (count count))
       (let* ((index (port-in-index port))
	      (have (min (- (port-in-limit port) index)
			 count)))
	 (cond ((> have 0)
		(copy! (port-in-buffer port) index thing start have)
		(set-port-in-index! port (+ index have))))
	 (wake-any-output-waiters port)
	 (cond ((= have count)
		(unlock port))
	       ((get-in-buffer port)
                (loop (+ start have) (- count have)))
	       (else
		(unlock port)
		(eof-object))))))

   ;; output port methods -------------------------

   ;; close-output-port
   (lambda (port)
     (close-pipe port #f))

   ;; write-char got a full buffer
   (lambda (char port)
     (lock port)
     (cond ((get-out-buffer port)
	    (write-char char port)
	    (wake-any-input-waiters port)
	    (unlock port))
	   (else
            (unlock port)
	    (error "writing to a broken pipe"))))

   ;; write-block couldn't fit COUNT characters into the buffer
   (lambda (thing start count port)
     (lock port)
     (let loop ((start start) (count count))
       (cond ((get-out-buffer port)
	      (let* ((buffer (port-out-buffer port))
		     (index (port-out-index port))
		     (have (min (- (code-vector-length buffer) index)
				count)))
                (cond ((> have 0)
		       (copy! thing start buffer index have)
		       (set-port-out-index! port (+ index have))))
		(wake-any-input-waiters port)
		(if (= have count)
		    (unlock port)
		    (loop (+ start have) (- count have)))))
	     (else
	      (unlock port)
	      (error "writing to a broken pipe")))))

   ;; force-output
   (lambda (port)
     (values))))

(define pipe-buffer-size 1024)

; Takes an optional size to use for the buffers.  A size of #f indicates
; that buffers should be made as needed (we really need omega).

(define (make-pipe . maybe-buffer-size)
  (call-with-values
   (lambda ()
     (parse-make-pipe-args maybe-buffer-size))
   (lambda (size queue)
     (make-port pipe-handler
		(bitwise-ior open-input-port-status
			     open-output-port-status)
		(make-pipe-data (make-lock) ; the lock
				#f	    ; input condition variable
				#f	    ; output condition variable
				queue       ; full buffer queue
				#f)	    ; stashed output buffer
		(make-code-vector size 0)   ; input buffer
		0			    ; input index
		0			    ; input limit
		(make-code-vector size 0)   ; output buffer
		0))))			    ; output limit
  
(define (parse-make-pipe-args maybe-buffer-size)
  (if (null? maybe-buffer-size)
      (values pipe-buffer-size #f)
      (let ((size (car maybe-buffer-size)))
	(cond ((not size)
	       (values pipe-buffer-size (make-queue)))
	      ((and (integer? size)
		    (exact? size)
		    (< 0 size))
	       (values size #f))
	      (else
	       (call-error "invalid pipe buffer size" make-pipe size))))))

; These should probably be moved to I/O

(define (open-input-port? port)
  (= (bitwise-and open-input-port-status
		  (port-status port))
     open-input-port-status))

(define (open-output-port? port)
  (= (bitwise-and open-output-port-status
		  (port-status port))
     open-output-port-status))

; Won't do string->string copies.

(define (copy! from i to j count)
  (if (code-vector? from)
      (if (code-vector? to)
	  (copy-bytes! from i to j count)
	  (copy-bytes->chars! from i to j count))
      (copy-chars->bytes! from i to j count)))

; Copied from more-port.scm.

(define (copy-bytes! from i to j count)
  (let ((limit (+ count i)))
    (do ((i i (+ i 1))
	 (j j (+ j 1)))
	((= i limit))
      (code-vector-set! to j (code-vector-ref from i)))))

(define (copy-chars->bytes! from i to j count)
  (let ((limit (+ count i)))
    (do ((i i (+ i 1))
	 (j j (+ j 1)))
	((= i limit))
      (code-vector-set! to j (char->ascii (string-ref from i))))))

(define (copy-bytes->chars! from i to j count)
  (let ((limit (+ count i)))
    (do ((i i (+ i 1))
	 (j j (+ j 1)))
	((= i limit))
      (string-set! to j (ascii->char (code-vector-ref from i))))))