Replaced implementation of surveillance thread: We have now only one
thread which checks every second for sessions to be deleted.
This commit is contained in:
parent
3de0a9c480
commit
b93e275415
|
@ -41,6 +41,7 @@
|
||||||
(define (surflet-handler surflet-path . maybe-options)
|
(define (surflet-handler surflet-path . maybe-options)
|
||||||
(let-optionals maybe-options ((options (make-default-surflet-options)))
|
(let-optionals maybe-options ((options (make-default-surflet-options)))
|
||||||
(set-thread-fluid! *options* options)
|
(set-thread-fluid! *options* options)
|
||||||
|
(spawn surveillance-thread)
|
||||||
(lambda (path req)
|
(lambda (path req)
|
||||||
(if (pair? path) ; need at least one element
|
(if (pair? path) ; need at least one element
|
||||||
(let ((request-method (request-method req))
|
(let ((request-method (request-method req))
|
||||||
|
@ -73,12 +74,10 @@
|
||||||
((string=? (file-name-extension path-string) ".scm")
|
((string=? (file-name-extension path-string) ".scm")
|
||||||
(obtain-lock *session-table-lock*)
|
(obtain-lock *session-table-lock*)
|
||||||
;; no access to session table until new session-id is saved
|
;; no access to session table until new session-id is saved
|
||||||
(let ((session-id (generate-new-table-id *session-table*))
|
(let ((session-id (generate-new-table-id *session-table*)))
|
||||||
(memo (make-default-memo)))
|
|
||||||
(table-set! *session-table* session-id
|
(table-set! *session-table* session-id
|
||||||
(make-session path-string ; used to make
|
(make-session path-string ; used to make
|
||||||
; redirections to origin
|
; redirections to origin
|
||||||
memo
|
|
||||||
(make-integer-table) ; continuation table
|
(make-integer-table) ; continuation table
|
||||||
(make-lock) ; continuation table lock
|
(make-lock) ; continuation table lock
|
||||||
(make-thread-safe-counter) ; continuation counter
|
(make-thread-safe-counter) ; continuation counter
|
||||||
|
@ -92,10 +91,10 @@
|
||||||
(delete-session! session-id)
|
(delete-session! session-id)
|
||||||
(bad-gateway-error-response s-req path-string condition))
|
(bad-gateway-error-response s-req path-string condition))
|
||||||
(let ((surflet (get-surflet-rt-structure path-string surflet-path)))
|
(let ((surflet (get-surflet-rt-structure path-string surflet-path)))
|
||||||
(fork-thread
|
(timeout-queue-register-session!
|
||||||
(session-surveillance session-id
|
session-id
|
||||||
(+ (time) (options-session-lifetime))
|
(+ (time) (options-session-lifetime)))
|
||||||
memo))
|
|
||||||
(reset
|
(reset
|
||||||
(with-fatal-error-handler
|
(with-fatal-error-handler
|
||||||
;; Catch conditions that occur while running the surflet.
|
;; Catch conditions that occur while running the surflet.
|
||||||
|
@ -117,40 +116,71 @@
|
||||||
))
|
))
|
||||||
|
|
||||||
|
|
||||||
;;; SESSION-SURVEILLANCE
|
;;; SESSION-SURVEILLANCE
|
||||||
;; Returns surveillance procedure to be fork-threaded, that kills a
|
(define (timeout-queue-register-session! session-id timeout)
|
||||||
;; session after TIME-TO-DIE (seconds) has expired. MEMO contains
|
(set! *timeout-queue* (queue-add *timeout-queue* session-id timeout)))
|
||||||
;; current status of session.
|
|
||||||
(define (session-surveillance session-id time-to-die memo)
|
(define (timeout-queue-remove-session! session-id)
|
||||||
(lambda ()
|
(set! *timeout-queue* (queue-delete *timeout-queue* session-id)))
|
||||||
(let loop ((time-to-die time-to-die)
|
|
||||||
(memo memo))
|
(define (timeout-queue-adjust-session-timeout! session-id new-timeout)
|
||||||
(debug "session-surveillance[~s]: going to sleep until ~a"
|
(set! *timeout-queue* (queue-delete *timeout-queue* session-id))
|
||||||
session-id (format-date "~c" (date time-to-die)))
|
(set! *timeout-queue*
|
||||||
(let ((seconds-to-sleep (- time-to-die (time))))
|
(queue-add *timeout-queue* session-id new-timeout)))
|
||||||
(if (positive? seconds-to-sleep)
|
|
||||||
(sleep (* 1000 seconds-to-sleep))))
|
(define *timeout-queue*)
|
||||||
;; check state of the world
|
|
||||||
(case (memo:message memo)
|
(define (surveillance-thread)
|
||||||
((killed) ; too late
|
(set! *timeout-queue* (empty-queue))
|
||||||
(debug "session-surveillance[~s]: session already killed, dieing"
|
(let lp ()
|
||||||
session-id)
|
(with-lock *session-table-lock*
|
||||||
)
|
(lambda ()
|
||||||
((adjust-timeout) ; new timeout
|
(let ((now (time)))
|
||||||
(debug "session-surveillance[~s]: adjusting timeout" session-id)
|
(let lp2 ()
|
||||||
(loop (memo:value memo)
|
(if (not (queue-empty? *timeout-queue*))
|
||||||
(memo:new-memo memo)))
|
(let ((session-id.time (queue-head *timeout-queue*)))
|
||||||
((kill) ; kill session
|
(if (<= (cdr session-id.time) now)
|
||||||
(debug "session-surveillance[~s]: killing"
|
(let ((session-id (car session-id.time)))
|
||||||
session-id)
|
(debug "session-surveillance[~s]: killing"
|
||||||
(obtain-lock *session-table-lock*)
|
session-id)
|
||||||
(table-set! *session-table* session-id #f)
|
(table-set! *session-table* session-id #f)
|
||||||
(release-lock *session-table-lock*))
|
(set! *timeout-queue*
|
||||||
(else
|
(queue-delete *timeout-queue* session-id))
|
||||||
(format (current-error-port)
|
(lp2)))))))))
|
||||||
"session-surveillance[~s]: unknown message ~s; dieing"
|
(sleep 1000)
|
||||||
session-id (memo:message memo)))))))
|
(lp)))
|
||||||
|
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
;; actually an alist with a head operation on the second argument.
|
||||||
|
(define (empty-queue)
|
||||||
|
'())
|
||||||
|
|
||||||
|
(define queue-empty? null?)
|
||||||
|
|
||||||
|
(define (queue-add q key value)
|
||||||
|
(alist-cons key value q))
|
||||||
|
|
||||||
|
(define (queue-delete key q)
|
||||||
|
(alist-delete! q key))
|
||||||
|
|
||||||
|
(define (queue-head q)
|
||||||
|
(let lp ((maybe-min (car q)) (q q))
|
||||||
|
(if (null? q)
|
||||||
|
maybe-min
|
||||||
|
(if (< (cdar q) (cdr maybe-min))
|
||||||
|
(lp (car q) (cdr q))
|
||||||
|
(lp maybe-min (cdr q))))))
|
||||||
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||||
|
|
||||||
|
;; doesn't belong to here...
|
||||||
|
(define (with-lock lock thunk)
|
||||||
|
(dynamic-wind
|
||||||
|
(lambda ()
|
||||||
|
(obtain-lock lock))
|
||||||
|
thunk
|
||||||
|
(lambda ()
|
||||||
|
(release-lock lock))))
|
||||||
|
|
||||||
;;; RESUME-URL
|
;;; RESUME-URL
|
||||||
;; Resumes a suspended URL and returns a (HTTP-)RESPONSE. PATH-STRING
|
;; Resumes a suspended URL and returns a (HTTP-)RESPONSE. PATH-STRING
|
||||||
;; is the virtual path, SURFLET-PATH a string pointing to the real
|
;; is the virtual path, SURFLET-PATH a string pointing to the real
|
||||||
|
@ -319,7 +349,7 @@
|
||||||
(let ((session (table-ref *session-table* session-id)))
|
(let ((session (table-ref *session-table* session-id)))
|
||||||
(if session
|
(if session
|
||||||
(begin
|
(begin
|
||||||
(memo-killed! (session-memo session))
|
(timeout-queue-remove-session! session-id)
|
||||||
(table-set! *session-table* session-id #f))))
|
(table-set! *session-table* session-id #f))))
|
||||||
;; else: somebody was faster than we
|
;; else: somebody was faster than we
|
||||||
(release-lock *session-table-lock*))
|
(release-lock *session-table-lock*))
|
||||||
|
@ -332,22 +362,14 @@
|
||||||
(:optional maybe-time-to-live (options-session-lifetime))))
|
(:optional maybe-time-to-live (options-session-lifetime))))
|
||||||
|
|
||||||
(define (really-session-adjust-timeout! session-id time-to-live)
|
(define (really-session-adjust-timeout! session-id time-to-live)
|
||||||
(obtain-lock *session-table-lock*)
|
(with-lock *session-table-lock*
|
||||||
(let ((session (table-ref *session-table* session-id))
|
(lambda ()
|
||||||
(new-memo (make-default-memo)))
|
(let ((session (table-ref *session-table* session-id)))
|
||||||
(if session
|
(if session
|
||||||
(let ((memo (session-memo session)))
|
(timeout-queue-adjust-session-timeout!
|
||||||
;; Do it this way: new values and then new message
|
session-id
|
||||||
(set-memo:value memo
|
(+ (time) time-to-live))
|
||||||
(+ (time) time-to-live))
|
(error "There is no session with this ID" session-id))))))
|
||||||
(set-memo:new-memo memo new-memo)
|
|
||||||
;; I don't think we need locking here. Do you agree?
|
|
||||||
(set-session-memo! session new-memo)
|
|
||||||
(set-memo:message memo 'adjust-timeout)
|
|
||||||
(release-lock *session-table-lock*))
|
|
||||||
(begin
|
|
||||||
(release-lock *session-table-lock*)
|
|
||||||
(error "There is no session with this ID" session-id)))))
|
|
||||||
|
|
||||||
;;; ADJUST-TIMEOUT!
|
;;; ADJUST-TIMEOUT!
|
||||||
;; Resets time-to-die of current session. The argument must be
|
;; Resets time-to-die of current session. The argument must be
|
||||||
|
@ -369,7 +391,7 @@
|
||||||
;; notify session killing
|
;; notify session killing
|
||||||
(table-walk
|
(table-walk
|
||||||
(lambda (session-id session)
|
(lambda (session-id session)
|
||||||
(memo-killed! (session-memo session)))
|
(timeout-queue-remove-session! session-id))
|
||||||
*session-table*)
|
*session-table*)
|
||||||
(set! *session-table* (make-integer-table))
|
(set! *session-table* (make-integer-table))
|
||||||
(release-lock *session-table*)))
|
(release-lock *session-table*)))
|
||||||
|
@ -617,34 +639,17 @@
|
||||||
|
|
||||||
;;; SESSION: session-table entry for every new request on a surflet page
|
;;; SESSION: session-table entry for every new request on a surflet page
|
||||||
(define-record-type session :session
|
(define-record-type session :session
|
||||||
(make-session surflet-name memo
|
(make-session surflet-name
|
||||||
continuation-table continuation-table-lock
|
continuation-table continuation-table-lock
|
||||||
continuation-counter
|
continuation-counter
|
||||||
session-data)
|
session-data)
|
||||||
session?
|
session?
|
||||||
(surflet-name session-surflet-name)
|
(surflet-name session-surflet-name)
|
||||||
(memo session-memo set-session-memo!)
|
|
||||||
(continuation-table session-continuation-table)
|
(continuation-table session-continuation-table)
|
||||||
(continuation-table-lock session-continuation-table-lock)
|
(continuation-table-lock session-continuation-table-lock)
|
||||||
(continuation-counter session-continuation-counter)
|
(continuation-counter session-continuation-counter)
|
||||||
(session-data session-session-data set-session-session-data!))
|
(session-data session-session-data set-session-session-data!))
|
||||||
|
|
||||||
;;; MEMO: Information for session surveiller about session status
|
|
||||||
(define-record-type memo :memo
|
|
||||||
(make-memo message value new-memo)
|
|
||||||
memo?
|
|
||||||
(message memo:message set-memo:message) ;kill, killed, adjust-timeout
|
|
||||||
(value memo:value set-memo:value)
|
|
||||||
(new-memo memo:new-memo set-memo:new-memo))
|
|
||||||
|
|
||||||
(define (make-default-memo)
|
|
||||||
(make-memo 'kill #f #f))
|
|
||||||
|
|
||||||
;; caller must do locking stuff
|
|
||||||
(define (memo-killed! memo)
|
|
||||||
(set-memo:message memo 'killed))
|
|
||||||
|
|
||||||
|
|
||||||
;;; INSTANCE: Every request corresponds to an instance.
|
;;; INSTANCE: Every request corresponds to an instance.
|
||||||
(define-record-type instance :instance
|
(define-record-type instance :instance
|
||||||
(make-instance session-id)
|
(make-instance session-id)
|
||||||
|
@ -652,7 +657,6 @@
|
||||||
(session-id really-instance-session-id
|
(session-id really-instance-session-id
|
||||||
set-instance-session-id!))
|
set-instance-session-id!))
|
||||||
|
|
||||||
|
|
||||||
;;; OPTIONS: options for the surflet-handler
|
;;; OPTIONS: options for the surflet-handler
|
||||||
(define-record-type surflet-options :suflet-options
|
(define-record-type surflet-options :suflet-options
|
||||||
(make-surflet-options cache-surflets? session-lifetime)
|
(make-surflet-options cache-surflets? session-lifetime)
|
||||||
|
|
Loading…
Reference in New Issue