Subject: Re: Adding/Removing Streams from a Broadcast Stream From: Erik Naggum <erik@naggum.no> Date: 1999/06/30 Newsgroups: comp.lang.lisp Message-ID: <3139716696746976@naggum.no> * bbtech@rmc1.crocker.com (Blackboard Technology Group) | I have a set of clients can connect/disconnect to a producer of streaming | output data. A broadcast stream is the natural mechanism for this, but | doesn't have an add or remove stream function. Making a new broadcast | stream every time a client comes/goes is an option (by adding/deleting | from the BROADCAST-STREAM-STREAMS of the current stream and using a | synonym-stream for indirection), but I was wondering if there is a better | approach... there are two issues involved in this decision: (1) what happens when there's an error on the stream (such as being closed by the other side)? and (2) does a stream get deleted from the broadcast list when closed? first, BROADCAST-STREAM-STREAMS must be SETF'able: (defsetf broadcast-stream-streams (stream) (new-streams) `(setf (slot-value ,stream 'indir-list) ,new-streams)) (another option is to ascertain that the list BROADCAST-STREAM-STREAMS returns is the actual list of streams, always has at least one element, and PUSH and DROP on the REST of that list, instead.) (defun broadcast-stream-subscribe (broadcast-stream stream) "Subscribe STREAM to BROADCAST-STREAM." (unless (member stream (broadcast-stream-streams broadcast-stream)) (push stream (broadcast-stream-streams broadcast-stream)) (push broadcast-stream (getf (stream-property-list stream) 'subscriptions)))) (defun broadcast-stream-unsubscribe (broadcast-stream stream) "Unsubscribe STREAM from BROADCAST-STREAM." (when (member stream (broadcast-stream-streams broadcast-stream)) (drop stream (broadcast-stream-streams broadcast-stream)) (drop broadcast-stream (getf (stream-property-list stream) 'subscriptions)))) (defun broadcast-stream-unsubscribe-all (stream) "Unsubscribe STREAM from all the BROADCAST streams it subscribes to." (dolist (broadcast-stream (copy-list (getf (stream-property-list stream) 'subscriptions))) (broadcast-stream-unsubscribe broadcast-stream stream))) this cleans up after dead sockets and discards input. it is normally called on output to the socket, but if that is insufficient, call it periodically, like once an hour. (defun broadcast-stream-sanitize (broadcast-stream) "Remove stray input and dead streams from broadcast streams." (dolist (stream (copy-list (broadcast-stream-streams broadcast-stream))) (when (input-stream-p stream) (clear-input stream) (when (eq stream (read-char-no-hang stream nil stream)) (broadcast-stream-unsubscribe-all stream) (ignore-errors (close stream :abort t)))))) using stream classes, let's get out of some serious troble: (defclass sanitizing-broadcast-stream (broadcast-stream) ()) (defmethod stream-force-output ((stream sanitizing-broadcast-stream)) (broadcast-stream-sanitize stream) (call-next-method)) I find it useful to force output of lines. (defmethod stream-terpri ((stream sanitizing-broadcast-stream)) (call-next-method) (stream-force-output stream)) this does not address the error-handling for socket errors, which I have taken care of by modifying the underlying filesystem-level read and write operations not to signal errors, but call an error handler that may tell it to ignore the error selectively on error type, stream in question, etc. oh, DROP is the reverse of PUSH: (defmacro drop (object place &rest keys &key key test test-not &environment environment) "Drop a particular OBJECT from list in PLACE. (Intended as counterpart to PUSH.)" (declare (ignore key test test-not)) (multiple-value-bind (vars vals store-vars writer reader) (get-setf-expansion place environment) (let ((evaled-value (gensym)) (store-var (first store-vars))) (if (cdr store-vars) `(let* ((,evaled-value ,object) ,@(mapcar #'list vars vals)) (multiple-value-bind ,store-vars ,reader (setq ,store-var (delete ,evaled-value ,store-var :count 1 ,@keys)) ,writer)) `(let* ((,evaled-value ,object) ,@(mapcar #'list vars vals) (,store-var (delete ,evaled-value ,reader :count 1 ,@keys))) ,writer))))) #:Erik -- @1999-07-22T00:37:33Z -- pi billion seconds since the turn of the century