fixed a deadlock. make-rule-cml now behaves like fork-bombing (if

there are enough "bombs"). Added some cosmetics in make-rule.scm and
collect-channels.scm.
This commit is contained in:
jottbee 2005-02-22 07:03:02 +00:00
parent 7a6e3585c8
commit 6fe70b47e3
4 changed files with 169 additions and 134 deletions

View File

@ -11,8 +11,7 @@
(data cmd-msg-data))
(define (print-info tuid event name)
(display ">>> ") (display tuid) (display " : ")
(display event) (display " [") (display name) (display "]") (newline))
(format (current-error-port) ">>> ~a : ~a [~a]~%" tuid event name))
(define (no-modify msg) msg)
(define (always msg) #t)
@ -25,12 +24,7 @@
(cml-sync-ch/send tmp-ch (thread-uid (current-thread)))
(let cond-sink-lp ((msg (cml-sync-ch/receive in)))
(if (pred msg)
; (begin
; (print-info (thread-uid (current-thread))
; "cond-sink, forward" (symbol->string name))
(cml-sync-ch/send out (modify msg)))
; (print-info (thread-uid (current-thread))
; "cond-sink, shredder" (symbol->string name)))
(cml-sync-ch/send out (modify msg)))
(cond-sink-lp (cml-sync-ch/receive in))))
name)
(cml-sync-ch/receive tmp-ch)))
@ -44,13 +38,7 @@
(cml-sync-ch/send tmp-ch (thread-uid (current-thread)))
(let cond-tee-lp ((msg (cml-sync-ch/receive in)))
(if (pred msg)
; (begin
; (print-info (thread-uid (current-thread))
; "cond-tee, default" (symbol->string name))
(cml-sync-ch/send out (modify msg))
; (begin
; (print-info (thread-uid (current-thread))
; "cond-tee, alternate" (symbol->string name))
(cml-sync-ch/send alt msg))
(cond-tee-lp (cml-sync-ch/receive in))))
name)
@ -62,8 +50,10 @@
(let* ((id (tee from-sink to-head))
(tag-msg (lambda (msg) (make-tagged-msg id msg)))
(pred (lambda (tmsg) (eq? (tagged-msg-tag tmsg) id))))
(cond-tee pred tagged-msg-stripped from-head out to-sink 'tail-element-switch)
(cond-tee always tag-msg in to-head #f 'tail-element-insert)
(cond-tee pred tagged-msg-stripped from-head out to-sink
(string->symbol (string-append "tail-switch " (number->string id))))
(cond-tee always tag-msg in to-head #f
(string->symbol (string-append "tail-insert " (number->string id))))
id))
(define-enumerated-type collect-cmd :collect-cmd
@ -85,21 +75,14 @@
(spawn
(lambda ()
(cml-sync-ch/send id-res-ch (thread-uid (current-thread)))
(sink head-out head-in)
; (sink head-out head-in)
(let head-element-lp ((from-tail head-in)
(to-tail head-out))
(let* ((->cmd-out (lambda (msg)
; (print-info (thread-uid (current-thread))
; "head-element, ->cmd-out"
; (symbol->string name))
(cml-sync-ch/send cmd-out (modify msg))
(cons from-tail to-tail)))
(->to-tail (lambda (msg)
; (print-info (thread-uid (current-thread))
; "head-element, ->to-tail"
; (symbol->string name))
(cml-sync-ch/send to-tail (modify msg))
(cons from-tail to-tail)))
(let* ((forward-msg (lambda (ch msg async?)
(if async?
(cml-async-ch/send-async ch (modify msg))
(cml-sync-ch/send ch (modify msg)))
(cons from-tail to-tail)))
(new-tail-el (lambda (msg)
(let* ((chs (cmd-msg-data msg))
(new-from-tail (cml-sync-ch/make-channel))
@ -110,19 +93,16 @@
(id (tail-element new-to-tail new-from-tail
from-tail to-tail
link-in link-out)))
; (print-info (thread-uid (current-thread))
; "head-element, new-tail-el"
; (symbol->string name))
(cml-sync-ch/send tmp-ch id)
(cml-async-ch/send-async tmp-ch id)
(cons new-from-tail new-to-tail))))
(chs (cml-rv/select
(cml-rv/wrap (cml-sync-ch/receive-rv cmd-in)
(cml-rv/wrap (cml-async-ch/receive-async-rv cmd-in)
(lambda (msg)
(if (pred msg)
(->to-tail msg)
(forward-msg to-tail msg #f)
(new-tail-el msg))))
(cml-rv/wrap (cml-sync-ch/receive-rv from-tail)
(lambda (msg) (->cmd-out msg))))))
(cml-rv/wrap (cml-sync-ch/receive-rv from-tail)
(lambda (msg) (forward-msg cmd-out msg #t))))))
(head-element-lp (car chs) (cdr chs)))))
name)
(cml-sync-ch/receive id-res-ch)))
@ -134,8 +114,8 @@
(cmd-out collect&reply-channel-cmd-out))
(define (collect&reply/make-channel)
(let ((cmd-in (cml-sync-ch/make-channel))
(cmd-out (cml-sync-ch/make-channel))
(let ((cmd-in (cml-async-ch/make-async-channel))
(cmd-out (cml-async-ch/make-async-channel))
(head-in (cml-sync-ch/make-channel))
(head-out (cml-sync-ch/make-channel)))
(head-element no-modify cmd-in cmd-out head-in head-out 'collect&reply)
@ -144,8 +124,8 @@
(define (make-link from to)
(let* ((from-->to (cml-sync-ch/make-channel))
(from<--to (cml-sync-ch/make-channel))
(to-tmp-ch (cml-sync-ch/make-channel))
(from-tmp-ch (cml-sync-ch/make-channel))
(to-tmp-ch (cml-async-ch/make-async-channel))
(from-tmp-ch (cml-async-ch/make-async-channel))
(chs-for-to (make-cmd-msg (collect-cmd make-link)
(list from-->to from<--to to-tmp-ch)))
(chs-for-from (make-cmd-msg (collect-cmd make-link)
@ -153,9 +133,19 @@
(cond
((and (is-send&collect-channel? from)
(is-collect&reply-channel? to))
(cml-sync-ch/send (collect&reply-channel-cmd-in to) chs-for-to)
(cml-sync-ch/send (send&collect-channel-cmd-in from) chs-for-from)
(cons (cml-sync-ch/receive from-tmp-ch) (cml-sync-ch/receive to-tmp-ch)))
(collect&reply/send to chs-for-to)
(send&collect/send from chs-for-from)
(cml-rv/select
(cml-rv/wrap (cml-async-ch/receive-async-rv from-tmp-ch)
(lambda (id-from)
(cons id-from
(cml-rv/sync
(cml-async-ch/receive-async-rv to-tmp-ch)))))
(cml-rv/wrap (cml-async-ch/receive-async-rv to-tmp-ch)
(lambda (id-to)
(cons (cml-rv/sync (cml-async-ch/receive-async-rv
from-tmp-ch))
id-to)))))
(else (error "make-link: wrong type" from to)))))
(define-record-type :send&collect-channel
@ -165,33 +155,29 @@
(cmd-out send&collect-channel-cmd-out))
(define (send&collect/make-channel)
(let ((cmd-in (cml-sync-ch/make-channel))
(cmd-out (cml-sync-ch/make-channel))
(let ((cmd-in (cml-async-ch/make-async-channel))
(cmd-out (cml-async-ch/make-async-channel))
(head-in (cml-sync-ch/make-channel))
(head-out (cml-sync-ch/make-channel)))
(head-element no-modify cmd-in cmd-out head-in head-out 'send&collect)
(send&collect/really-make-channel cmd-in cmd-out)))
(define (collect&reply/receive ch)
(cml-sync-ch/receive (collect&reply-channel-cmd-out ch)))
(cml-rv/sync
(cml-async-ch/receive-async-rv (collect&reply-channel-cmd-out ch))))
(define (collect&reply/receive-rv ch)
(cml-sync-ch/receive-rv (collect&reply-channel-cmd-out ch)))
(cml-async-ch/receive-async-rv (collect&reply-channel-cmd-out ch)))
(define (collect&reply/send ch msg)
(cml-sync-ch/send (collect&reply-channel-cmd-in ch) msg))
(define (collect&reply/send-rv ch msg)
(cml-sync-ch/send-rv (collect&reply-channel-cmd-in ch) msg))
(cml-async-ch/send-async (collect&reply-channel-cmd-in ch) msg))
(define (send&collect/send ch msg)
(cml-sync-ch/send (send&collect-channel-cmd-in ch) msg))
(define (send&collect/send-rv ch msg)
(cml-sync-ch/send-rv (send&collect-channel-cmd-in ch) msg))
(cml-async-ch/send-async (send&collect-channel-cmd-in ch) msg))
(define (send&collect/receive ch)
(cml-sync-ch/receive (send&collect-channel-cmd-out ch)))
(cml-rv/sync
(cml-async-ch/receive-async-rv (send&collect-channel-cmd-out ch))))
(define (send&collect/receive-rv ch)
(cml-sync-ch/receive-rv (send&collect-channel-cmd-out ch)))
(cml-async-ch/receive-async-rv (send&collect-channel-cmd-out ch)))

View File

@ -65,19 +65,17 @@
(build-func-result rule-result-build-func))
(define (rule-make rule init-state rule-set)
;;
;; this could be rewritten in future
;;
;; check for unused threads -> dont start them
;;
(map (lambda (r)
(rule-node r (rule-set-get-listen-ch r rule-set) init-state rule-set))
(map car (rule-set-rules rule-set)))
(let* ((server (rule-set-get-listen-ch rule rule-set))
(client (send&collect/make-channel))
(recipient (make-link client server)))
(link (make-link client server))
(recipient (car link)))
(send&collect/send client (make-tagged-msg recipient (rule-cmd make)))
(tagged-msg-stripped (send&collect/receive client))))
(let ((res (tagged-msg-stripped (send&collect/receive client))))
; (send&collect/send client (make-tagged-msg recipient (rule-cmd shutdown)))
res)))
(define-enumerated-type rule-cmd :rule-cmd
is-rule-cmd?
@ -86,6 +84,7 @@
rule-cmd-index
(make link shutdown))
;;; this only works if there are no duplicates in list
(define (position< maybe-lesser maybe-greater objects)
(if (null? objects)
(error "position< has empty objects-list.")
@ -95,7 +94,8 @@
((= (tagged-msg-tag maybe-lesser) current) #t)
((= (tagged-msg-tag maybe-greater) current) #f)
((null? todo)
(error "position< maybe-lesser or maybe-greater not found."))
(error "position<: maybe-lesser or maybe-greater not found."
maybe-lesser maybe-greater))
(else (search-objects (car todo) (cdr todo)))))))
(define (rule-node/sort-msgs unsorted to-order)
@ -104,12 +104,22 @@
(position< maybe-lesser maybe-greater to-order))
unsorted (list))))
;;; (define (rule-node/prereqs-results rule connect-ch recipients)
;;; (let ((unsorted-msgs (map (lambda (recipient)
;;; (let ((tmsg (make-tagged-msg recipient
;;; (rule-cmd make))))
;;; (send&collect/send connect-ch tmsg)
;;; (send&collect/receive connect-ch)))
;;; recipients)))
;;; (rule-node/sort-msgs unsorted-msgs recipients)))
(define (rule-node/prereqs-results rule connect-ch recipients)
(let ((unsorted-msgs (map (lambda (recipient)
(let ((tmsg (make-tagged-msg recipient
(rule-cmd make))))
(send&collect/send connect-ch tmsg)
(send&collect/receive connect-ch)))
(for-each (lambda (recipient)
(let ((tmsg (make-tagged-msg recipient (rule-cmd make))))
(send&collect/send connect-ch tmsg)))
recipients)
(let ((unsorted-msgs (map (lambda (ignore)
(send&collect/receive connect-ch))
recipients)))
(rule-node/sort-msgs unsorted-msgs recipients)))
@ -135,38 +145,31 @@
(make-rule-result wants-build?-result #f))))))
(define (rule-node/make-links rule connect-ch rule-set)
(let ((listen-chs (map (lambda (r)
(cdr (assq r (rule-set-rules rule-set))))
(let ((listen-chs (map (lambda (prereq-rule)
(cdr (assoc prereq-rule (rule-set-rules rule-set))))
(rule-prereqs rule))))
(map (lambda (listen-ch)
(make-link connect-ch listen-ch))
listen-chs)))
(define (rule-node rule listen-ch init-state rule-set)
(let ((connect-ch (send&collect/make-channel)))
(let* ((connect-ch (send&collect/make-channel))
(get-rcpts (lambda ()
(map car (rule-node/make-links rule connect-ch rule-set))))
(do-answer (lambda (tmsg rcpts)
(let* ((sender (tagged-msg-tag tmsg))
(cmd (tagged-msg-stripped tmsg))
(result (rule-node/make rule listen-ch connect-ch
rcpts init-state))
(reply (make-tagged-msg sender result)))
(collect&reply/send listen-ch reply)))))
(spawn
(lambda ()
;;
;; wait for anything on the listen-ch
;; check if it is a known command
;; if so: process this command
;; otherwise it was noise
;;
;; if its the first time the make command drops in
;; initially make the connections to every prereq-listen-ch
;;
(let node-loop ((tmsg (collect&reply/receive listen-ch))
(maybe-recipients #f))
(let ((sender (tagged-msg-tag tmsg))
(cmd (tagged-msg-stripped tmsg)))
(cond
((eq? (rule-cmd-name cmd) 'make)
(if (not maybe-recipients)
(set! maybe-recipients
(rule-node/make-links rule connect-ch rule-set)))
(let ((res (rule-node/make rule listen-ch connect-ch
maybe-recipients init-state)))
(collect&reply/send listen-ch (make-tagged-msg sender res))))
((eq? (rule-cmd-name cmd) 'shutdown) (terminate-current-thread))))
(node-loop (collect&reply/receive listen-ch) maybe-recipients)))
(rcpts (get-rcpts)))
(cond
((eq? (rule-cmd-name (tagged-msg-stripped tmsg)) 'make)
(do-answer tmsg rcpts))
(else (error "rule-node: no match")))
(node-loop (collect&reply/receive listen-ch) rcpts)))
'rule-node)))

View File

@ -104,17 +104,23 @@
is-tagged-msg?
tagged-msg-tag
tagged-msg-stripped
make-cmd-msg
is-cmd-msg?
cmd-msg-cmd
cmd-msg-data
print-info
collect&reply/make-channel
send&collect/make-channel
is-collect&reply-channel?
is-send&collect-channel?
make-link
collect-cmd
collect&reply/receive
collect&reply/receive-rv
collect&reply/send
collect&reply/send-rv
; collect&reply/send-rv
send&collect/send
send&collect/send-rv
; send&collect/send-rv
send&collect/receive
send&collect/receive-rv))
@ -122,10 +128,13 @@
(open scheme-with-scsh
finite-types
srfi-9
big-util ; for breakpoints
let-opt ; for logging
threads
threads-internal
(with-prefix rendezvous cml-rv/)
(with-prefix rendezvous-channels cml-sync-ch/))
(with-prefix rendezvous-channels cml-sync-ch/)
(with-prefix rendezvous-async-channels cml-async-ch/))
(files collect-channels))
(define-interface make-rule-interface
@ -145,6 +154,8 @@
locks
with-lock
threads
threads-internal
big-util ; for breakpoints
srfi-1
srfi-9
finite-types

View File

@ -11,37 +11,56 @@
(define *k-out?* #t)
(define *l-out?* #t)
(define (is-a-out? ist) (display "setting a\n") (cons *a-out?* ist))
(define (is-b-out? pa ist) (display "setting b\n") (cons *b-out?* ist))
(define (is-c-out? pa pb ist) (display "setting c\n") (cons *c-out?* ist))
(define (is-d-out? pa pb pc ist) (display "setting d\n") (cons *d-out?* ist))
(define (is-e-out? pc pd ist) (display "setting e\n") (cons *e-out?* ist))
(define (is-f-out? pa pb pc pd pe ist) (cons *f-out?* ist))
(define (is-g-out? pa pb pc pd pe pf ist) (cons *g-out?* ist))
(define (is-h-out? pa pb pc pd pe pf pg ist) (cons *h-out?* ist))
(define (is-i-out? pa pb pc pd pe pf pg ph ist) (cons *i-out?* ist))
(define (is-j-out? pa pb pc pd pe pf pg ph pi ist) (cons *j-out?* ist))
(define (is-k-out? pa pb pc pd pe pf pg ph pi pj ist) (cons *k-out?* ist))
(define (is-l-out? pa pb pc pd pe pf pg ph pi pj pk ist) (cons *l-out?* ist))
(define (reset!)
(set! *a-out?* #t)
(set! *b-out?* #t)
(set! *c-out?* #t)
(set! *d-out?* #t)
(set! *e-out?* #t))
(define (build-a b? ist) (display "a\n") (set! *a-out?* #f) (cons *a-out?* ist))
(define (build-b b? pa ist) (display "b\n") (set! *b-out?* #f) (cons *b-out?* ist))
(define (build-c b? pa pb ist) (display "c\n") (set! *c-out?* #f) (cons *c-out?* ist))
(define (build-d b? pa pb pc ist) (display "d\n") (set! *d-out?* #f) (cons *d-out?* ist))
(define (build-e b? pc pd ist) (display "e\n") (set! *e-out?* #f) (cons *e-out?* ist))
(define (build-f b? pa pb pc pd pe ist) (display "f\n") (set! *f-out?* #f) (cons *f-out?* ist))
(define (build-g b? pa pb pc pd pe pf ist) (display "g\n") (set! *g-out?* #f) (cons *g-out?* ist))
(define (build-h b? pa pb pc pd pe pf pg ist) (display "h\n") (set! *h-out?* #f) (cons *h-out?* ist))
(define (build-i b? pa pb pc pd pe pf pg ph ist) (display "i\n") (set! *i-out?* #f) (cons *i-out?* ist))
(define (build-j b? pa pb pc pd pe pf pg ph pi ist) (display "j\n") (set! *j-out?* #f) (cons *j-out?* ist))
(define (build-k b? pa pb pc pd pe pf pg ph pi pj ist) (display "k\n") (set! *k-out?* #f) (cons *k-out?* ist))
(define (build-l b? pa pb pc pd pe pf pg ph pi pj pk ist) (display "l\n") (set! *l-out?* #f) (cons *l-out?* ist))
(define (is-a-out? ist) (display "setting a\n") (cons *a-out?* ist))
(define (is-b-out? . args) (display "setting b\n") (cons *b-out?* (last args)))
(define (is-c-out? . args) (display "setting c\n") (cons *c-out?* (last args)))
(define (is-d-out? . args) (display "setting d\n") (cons *d-out?* (last args)))
(define (is-e-out? . args) (display "setting e\n") (cons *e-out?* (last args)))
(define (is-f-out? . args) (display "setting f\n") (cons *f-out?* (last args)))
(define (is-g-out? . args) (display "setting f\n") (cons *g-out?* (last args)))
(define (is-h-out? . args) (display "setting f\n") (cons *h-out?* (last args)))
(define (is-i-out? . args) (display "setting f\n") (cons *i-out?* (last args)))
(define (is-j-out? . args) (display "setting f\n") (cons *j-out?* (last args)))
(define (is-k-out? . args) (display "setting f\n") (cons *k-out?* (last args)))
(define (is-l-out? . args) (display "setting f\n") (cons *l-out?* (last args)))
(define a (make-rule (list) is-a-out? build-a))
(define b (make-rule (list a) is-b-out? build-b))
(define c (make-rule (list a b) is-c-out? build-c))
(define d (make-rule (list a b c) is-d-out? build-d))
(define e (make-rule (list c d) is-e-out? build-e))
(define (build-a b? . args)
(display "a\n") (set! *a-out?* #f) (cons *a-out?* (last args)))
(define (build-b b? . args)
(display "b\n") (set! *b-out?* #f) (cons *b-out?* (last args)))
(define (build-c b? . args)
(display "c\n") (set! *c-out?* #f) (cons *c-out?* (last args)))
(define (build-d b? . args)
(display "d\n") (set! *d-out?* #f) (cons *d-out?* (last args)))
(define (build-e b? . args)
(display "e\n") (set! *e-out?* #f) (cons *e-out?* (last args)))
(define (build-f b? . args)
(display "f\n") (set! *f-out?* #f) (cons *f-out?* (last args)))
(define (build-g b? . args)
(display "g\n") (set! *g-out?* #f) (cons *g-out?* (last args)))
(define (build-h b? . args)
(display "h\n") (set! *h-out?* #f) (cons *h-out?* (last args)))
(define (build-i b? . args)
(display "i\n") (set! *i-out?* #f) (cons *i-out?* (last args)))
(define (build-j b? . args)
(display "j\n") (set! *j-out?* #f) (cons *j-out?* (last args)))
(define (build-k b? . args)
(display "k\n") (set! *k-out?* #f) (cons *k-out?* (last args)))
(define (build-l b? . args)
(display "l\n") (set! *l-out?* #f) (cons *l-out?* (last args)))
;(define a (make-rule (list) is-a-out? build-a))
;(define b (make-rule (list a) is-b-out? build-b))
;(define c (make-rule (list b) is-c-out? build-c))
;(define d (make-rule (list b) is-d-out? build-d))
;(define e (make-rule (list c d) is-e-out? build-e))
;(define f (make-rule (list a b c d e) is-f-out? build-f))
;(define g (make-rule (list a b c d e f) is-g-out? build-g))
;(define h (make-rule (list a b c d e f g) is-h-out? build-h))
@ -51,17 +70,33 @@
;(define l (make-rule (list a b c d e f g h i j k) is-l-out? build-l))
;(define rules (list a b c d e f g h i j k l))
(define rules (list a b c d e))
;(define rules (list a b c d e))
(define (make-rule-set rules rule-set)
(cond
((null? rules) rule-set)
(else (make-rule-set (cdr rules) (rule-set-add (car rules) rule-set)))))
(define rule-set (make-rule-set rules (make-empty-rule-set)))
(define rule-set 'unset-rule-set)
(rule-make e '() rule-set)
(rule-make d '() rule-set)
(rule-make e '() rule-set)
(rule-make c '() rule-set)
(define (make!)
(define a (make-rule (list) is-a-out? build-a))
(define b (make-rule (list a) is-b-out? build-b))
(define c (make-rule (list b) is-c-out? build-c))
(define d (make-rule (list b) is-d-out? build-d))
(define e (make-rule (list b c d) is-e-out? build-e))
(define f (make-rule (list b c d e) is-f-out? build-f))
(define g (make-rule (list b c d e f) is-g-out? build-g))
(define h (make-rule (list b c d e f g) is-h-out? build-h))
(define i (make-rule (list a b c d e f g h) is-i-out? build-i))
(define j (make-rule (list a b c d e f g h i) is-j-out? build-j))
(define k (make-rule (list a b c d e f g h i j) is-k-out? build-k))
(define l (make-rule (list a b c d e f g h i j k) is-l-out? build-l))
(define rules (list a b c d e f g h i j k l))
(reset!)
(set! rule-set (make-rule-set rules (make-empty-rule-set)))
(rule-make l '() rule-set))
;(rule-make d '() rule-set)
;(rule-make e '() rule-set)
;(rule-make c '() rule-set)