replaced tail-element consisting of three threads (tee, cond-tee,
sink) by only one thread; used only async channels
This commit is contained in:
		
							parent
							
								
									8ef87159b0
								
							
						
					
					
						commit
						9a25d38343
					
				| 
						 | 
				
			
			@ -17,44 +17,72 @@
 | 
			
		|||
(define (always msg) #t)
 | 
			
		||||
(define (never msg) #f)
 | 
			
		||||
 | 
			
		||||
(define (cond-sink pred modify in out name)
 | 
			
		||||
  (let ((tmp-ch (cml-sync-ch/make-channel)))
 | 
			
		||||
    (spawn 
 | 
			
		||||
      (lambda () 
 | 
			
		||||
	(cml-sync-ch/send tmp-ch (thread-uid (current-thread)))
 | 
			
		||||
	(let cond-sink-lp ((msg (cml-sync-ch/receive in)))
 | 
			
		||||
	  (if (pred msg) 
 | 
			
		||||
	      (cml-sync-ch/send out (modify msg)))
 | 
			
		||||
	  (cond-sink-lp (cml-sync-ch/receive in))))
 | 
			
		||||
      name)
 | 
			
		||||
    (cml-sync-ch/receive tmp-ch)))
 | 
			
		||||
 | 
			
		||||
(define (sink in out) (cond-sink never no-modify in out 'sink))
 | 
			
		||||
 | 
			
		||||
(define (cond-tee pred modify in out alt name)
 | 
			
		||||
  (let ((tmp-ch (cml-sync-ch/make-channel)))
 | 
			
		||||
    (spawn 
 | 
			
		||||
      (lambda ()
 | 
			
		||||
	(cml-sync-ch/send tmp-ch (thread-uid (current-thread)))
 | 
			
		||||
	(let cond-tee-lp ((msg (cml-sync-ch/receive in)))
 | 
			
		||||
	  (if (pred msg) 
 | 
			
		||||
		(cml-sync-ch/send out (modify msg))
 | 
			
		||||
		(cml-sync-ch/send alt msg))
 | 
			
		||||
	  (cond-tee-lp (cml-sync-ch/receive in))))
 | 
			
		||||
      name)
 | 
			
		||||
    (cml-sync-ch/receive tmp-ch)))
 | 
			
		||||
 | 
			
		||||
(define (tee in out) (cond-tee always no-modify in out #f 'tee))
 | 
			
		||||
;;; (define (cond-sink pred modify in out name)
 | 
			
		||||
;;;   (let ((tmp-ch (cml-sync-ch/make-channel)))
 | 
			
		||||
;;;     (spawn 
 | 
			
		||||
;;;       (lambda () 
 | 
			
		||||
;;; 	(cml-sync-ch/send tmp-ch (thread-uid (current-thread)))
 | 
			
		||||
;;; 	(let cond-sink-lp ((msg (cml-sync-ch/receive in)))
 | 
			
		||||
;;; 	  (if (pred msg) 
 | 
			
		||||
;;; 	      (cml-sync-ch/send out (modify msg)))
 | 
			
		||||
;;; 	  (cond-sink-lp (cml-sync-ch/receive in))))
 | 
			
		||||
;;;       name)
 | 
			
		||||
;;;     (cml-sync-ch/receive tmp-ch)))
 | 
			
		||||
;;; 
 | 
			
		||||
;;; (define (sink in out) (cond-sink never no-modify in out 'sink))
 | 
			
		||||
;;; 
 | 
			
		||||
;;; (define (cond-tee pred modify in out alt name)
 | 
			
		||||
;;;   (let ((tmp-ch (cml-sync-ch/make-channel)))
 | 
			
		||||
;;;     (spawn 
 | 
			
		||||
;;;       (lambda ()
 | 
			
		||||
;;; 	(cml-sync-ch/send tmp-ch (thread-uid (current-thread)))
 | 
			
		||||
;;; 	(let cond-tee-lp ((msg (cml-sync-ch/receive in)))
 | 
			
		||||
;;; 	  (if (pred msg) 
 | 
			
		||||
;;; 		(cml-sync-ch/send out (modify msg))
 | 
			
		||||
;;; 		(cml-sync-ch/send alt msg))
 | 
			
		||||
;;; 	  (cond-tee-lp (cml-sync-ch/receive in))))
 | 
			
		||||
;;;       name)
 | 
			
		||||
;;;     (cml-sync-ch/receive tmp-ch)))
 | 
			
		||||
;;; 
 | 
			
		||||
;;; (define (tee in out) (cond-tee always no-modify in out #f 'tee))
 | 
			
		||||
;;; 
 | 
			
		||||
;;; (define (tail-element from-head to-head from-sink to-sink in out)
 | 
			
		||||
;;;   (let* ((id (tee from-sink to-head))
 | 
			
		||||
;;; 	 (tag-msg (lambda (msg) (make-tagged-msg id msg)))
 | 
			
		||||
;;; 	 (pred (lambda (tmsg) (eq? (tagged-msg-tag tmsg) id))))
 | 
			
		||||
;;;     (cond-tee pred tagged-msg-stripped from-head out to-sink 
 | 
			
		||||
;;; 	      (string->symbol (string-append "tail-switch " (number->string id))))
 | 
			
		||||
;;;     (cond-tee always tag-msg in to-head #f 
 | 
			
		||||
;;; 	      (string->symbol (string-append "tail-insert " (number->string id))))
 | 
			
		||||
;;;     id))
 | 
			
		||||
 | 
			
		||||
(define (tail-element from-head to-head from-sink to-sink in out)
 | 
			
		||||
  (let* ((id (tee from-sink to-head))
 | 
			
		||||
  (let ((id-res-ch (cml-sync-ch/make-channel)))
 | 
			
		||||
    (spawn 
 | 
			
		||||
      (lambda () 
 | 
			
		||||
	(let* ((id (thread-uid (current-thread)))
 | 
			
		||||
	       (tag-msg (lambda (msg) (make-tagged-msg id msg)))
 | 
			
		||||
	       (pred (lambda (tmsg) (eq? (tagged-msg-tag tmsg) id))))
 | 
			
		||||
    (cond-tee pred tagged-msg-stripped from-head out to-sink 
 | 
			
		||||
	      (string->symbol (string-append "tail-switch " (number->string id))))
 | 
			
		||||
    (cond-tee always tag-msg in to-head #f 
 | 
			
		||||
	      (string->symbol (string-append "tail-insert " (number->string id))))
 | 
			
		||||
    id))
 | 
			
		||||
	  (cml-sync-ch/send id-res-ch id)
 | 
			
		||||
	  (let ((insert-msg (lambda (msg)
 | 
			
		||||
			      (cml-async-ch/send-async to-head (tag-msg msg))))
 | 
			
		||||
		(insert-rv (cml-async-ch/receive-async-rv in))
 | 
			
		||||
		(forward-msg (lambda (msg)
 | 
			
		||||
			       (cml-async-ch/send-async to-head msg)))
 | 
			
		||||
		(forward-rv (cml-async-ch/receive-async-rv from-sink))
 | 
			
		||||
		(deliver-msg (lambda (msg)
 | 
			
		||||
			       (if (pred msg)
 | 
			
		||||
				   (let ((stripped-msg (tagged-msg-stripped msg)))
 | 
			
		||||
				     (cml-async-ch/send-async out stripped-msg))
 | 
			
		||||
				   (cml-async-ch/send-async to-sink msg))))
 | 
			
		||||
		(deliver-rv (cml-async-ch/receive-async-rv from-head)))
 | 
			
		||||
	    (let receive+send-lp ()
 | 
			
		||||
	      (cml-rv/select
 | 
			
		||||
	       (cml-rv/wrap insert-rv insert-msg)
 | 
			
		||||
	       (cml-rv/wrap forward-rv forward-msg)
 | 
			
		||||
	       (cml-rv/wrap deliver-rv deliver-msg))
 | 
			
		||||
	      (receive+send-lp))))))
 | 
			
		||||
    (cml-sync-ch/receive id-res-ch)))
 | 
			
		||||
 | 
			
		||||
(define-enumerated-type collect-cmd :collect-cmd
 | 
			
		||||
  is-collect-cmd?
 | 
			
		||||
| 
						 | 
				
			
			@ -78,15 +106,15 @@
 | 
			
		|||
;	(sink head-out head-in)
 | 
			
		||||
	(let head-element-lp ((from-tail head-in)
 | 
			
		||||
			      (to-tail head-out))
 | 
			
		||||
	  (let* ((forward-msg (lambda (ch msg async?)
 | 
			
		||||
				(if async?
 | 
			
		||||
	  (let* ((forward-msg (lambda (ch msg)
 | 
			
		||||
				(cml-async-ch/send-async ch (modify msg))
 | 
			
		||||
				    (cml-sync-ch/send ch (modify msg)))
 | 
			
		||||
				(cons from-tail to-tail)))
 | 
			
		||||
		 (new-tail-el (lambda (msg)
 | 
			
		||||
				(let* ((chs (cmd-msg-data msg))
 | 
			
		||||
				       (new-from-tail (cml-sync-ch/make-channel))
 | 
			
		||||
				       (new-to-tail (cml-sync-ch/make-channel))
 | 
			
		||||
				       (new-from-tail 
 | 
			
		||||
					(cml-async-ch/make-async-channel))
 | 
			
		||||
				       (new-to-tail 
 | 
			
		||||
					(cml-async-ch/make-async-channel))
 | 
			
		||||
				       (link-in (list-ref chs 0))
 | 
			
		||||
				       (link-out (list-ref chs 1))
 | 
			
		||||
				       (tmp-ch (list-ref chs 2))
 | 
			
		||||
| 
						 | 
				
			
			@ -99,10 +127,10 @@
 | 
			
		|||
		       (cml-rv/wrap (cml-async-ch/receive-async-rv cmd-in)
 | 
			
		||||
				    (lambda (msg) 
 | 
			
		||||
				      (if (pred msg)
 | 
			
		||||
					  (forward-msg to-tail msg #f)
 | 
			
		||||
					  (forward-msg to-tail msg)
 | 
			
		||||
					  (new-tail-el msg))))
 | 
			
		||||
		       (cml-rv/wrap (cml-sync-ch/receive-rv from-tail)
 | 
			
		||||
				    (lambda (msg) (forward-msg cmd-out msg #t))))))
 | 
			
		||||
		       (cml-rv/wrap (cml-async-ch/receive-async-rv from-tail)
 | 
			
		||||
				    (lambda (msg) (forward-msg cmd-out msg))))))
 | 
			
		||||
	    (head-element-lp (car chs) (cdr chs)))))
 | 
			
		||||
      name)
 | 
			
		||||
    (cml-sync-ch/receive id-res-ch)))
 | 
			
		||||
| 
						 | 
				
			
			@ -116,14 +144,14 @@
 | 
			
		|||
(define (collect&reply/make-channel)
 | 
			
		||||
  (let ((cmd-in (cml-async-ch/make-async-channel))
 | 
			
		||||
	(cmd-out (cml-async-ch/make-async-channel))
 | 
			
		||||
	(head-in (cml-sync-ch/make-channel))
 | 
			
		||||
	(head-out (cml-sync-ch/make-channel)))
 | 
			
		||||
	(head-in (cml-async-ch/make-async-channel))
 | 
			
		||||
	(head-out (cml-async-ch/make-async-channel)))
 | 
			
		||||
    (head-element no-modify cmd-in cmd-out head-in head-out 'collect&reply)
 | 
			
		||||
    (collect&reply/really-make-channel cmd-in cmd-out)))
 | 
			
		||||
 | 
			
		||||
(define (make-link from to)
 | 
			
		||||
  (let* ((from-->to (cml-sync-ch/make-channel))
 | 
			
		||||
	 (from<--to (cml-sync-ch/make-channel))
 | 
			
		||||
  (let* ((from-->to (cml-async-ch/make-async-channel))
 | 
			
		||||
	 (from<--to (cml-async-ch/make-async-channel))
 | 
			
		||||
	 (to-tmp-ch (cml-async-ch/make-async-channel))
 | 
			
		||||
	 (from-tmp-ch (cml-async-ch/make-async-channel))
 | 
			
		||||
	 (chs-for-to (make-cmd-msg (collect-cmd make-link)
 | 
			
		||||
| 
						 | 
				
			
			@ -157,8 +185,8 @@
 | 
			
		|||
(define (send&collect/make-channel)
 | 
			
		||||
  (let ((cmd-in (cml-async-ch/make-async-channel))
 | 
			
		||||
	(cmd-out (cml-async-ch/make-async-channel))
 | 
			
		||||
	(head-in (cml-sync-ch/make-channel))
 | 
			
		||||
	(head-out (cml-sync-ch/make-channel)))
 | 
			
		||||
	(head-in (cml-async-ch/make-async-channel))
 | 
			
		||||
	(head-out (cml-async-ch/make-async-channel)))
 | 
			
		||||
    (head-element no-modify cmd-in cmd-out head-in head-out 'send&collect)
 | 
			
		||||
    (send&collect/really-make-channel cmd-in cmd-out)))
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue