Thread::Queue::Multiplex (aka TQM) - thread-safe publish/subscribe queue



        use Thread::Queue::Multiplex;
        #       create new queue, limiting the max pending requests
        #       to 20
        my $tqm = Thread::Queue::Multiplex->new(MaxPending => 20);
        #       register as a subscriber
        #       unregister as a subscriber
        #       wait for $count subscribers to register
        $tqm->wait_for_subscribers($count, $timeout);
        #       get the list of current subscribers ID's
        my @subids = $tqm->get_subscribers();
        #       change the max pending limit
        #       enqueue elements, returning a unique queue ID
        #       (used in the client)
        my $id = $tqm->publish("foo", "bar");
        #       publish elements, and wait for a response
        #       (used in the client)
        my $resp = $tqm->publish_and_wait("foo", "bar");
        #       publish elements, and wait for a response
        #       until $timeout secs (used in the client)
        my $resp = $tqm->publish_and_wait_until($timeout, "foo", "bar");
        #       publish elements at head of queue, returning a
        #       unique queue ID (used in the client)
        my $id = $tqm->publish_urgent("foo", "bar");
        #       publish elements at head of queue and wait for response
        my $resp = $tqm->publish_urgent_and_wait("foo", "bar");
        #       publish elements at head of queue and wait for
        #       response until $timeout secs
        my $resp = $tqm->publish_urgent_and_wait_until($timeout, "foo", "bar");
        #       publish elements for simplex operation (no response)
        #       returning the queue object
        $tqm->publish_simplex("foo", "bar");

        $tqm->publish_simplex_urgent("foo", "bar");
        #       subscribers use the existing TQD dequeue() methods
        #       modified versions of the TQD base enqueue methods
        #       to support directed messaging to a single subscriber
        #       or group of subscribers
        #       enqueue elements to a specific subscriber, returning
        #       a unique queue ID (used in the client)
        my $id = $tqm->enqueue($subID, "foo", "bar");
        #       enqueue elements to 2 subscribers, and wait for a response
        #       (used in the client)
        my $resp = $tqm->enqueue_and_wait([ $subID1, $subID2 ], "foo", "bar");
        #       enqueue elements, and wait for a response
        #       until $timeout secs (used in the client)
        my $resp = $tqm->enqueue_and_wait_until($subID, $timeout, "foo", "bar");
        #       SEE Thread::Queue::Duplex for the various publisher enqueue()
        #       and wait() methods,
        #       and the subscriber dequeue() methods


A subclass of Thread::Queue::Duplex aka TQD which implements a "publish and subscribe" communications model for threads. Subscribers register with the queue, which registers the TID of the subscriber's thread as a subscriber ID. As the publisher publishes messages to the queue, each subscriber receives a copy of the message. If the publication is not simplex, the publisher expects all subscribers to read and respond to the message; otherwise, the publisher simply continues its processing. Thread::Queue::Multiplex provides publish() method counterparts for all the Thread::Queue::Duplex enqueue() methods, e.g., publish_simplex(), publish_urgent(), publish_and_wait(), publish_and_wait_until(), etc.

Subscribers receive and reply to messages using the existing TQD dequeue() and respond() methods. In addition, modified versions of the enqueue() methods are provided to publishers to permit directing a message to a single subscriber, or subset of subscribers, by specifying the scalar subscriber ID (for single subscriber messages), or an arrayref of unique subscriber ID's (for multi-subscriber messages).

Thread::Queue::Multiplex subclass overrides some of the internal behavior of Thread::Queue::Duplex by

A normal processing sequence for Thread::Queue::Multiplex might be:

        #       Thread A (the client):
                ...marshal parameters for a coroutine...
                my $id = $tqm->publish('function_name', \@paramlist);
                my $results = $tqm->dequeue_response($id);
                while (($subID, $subresult) = each %$results) {
                ...process $results...
        #       Thread B (a subscriber):
                while (1) {
                        my $call = $tqm->dequeue;
                        my ($id, $func, @params) = @$call;
                        $tqm->respond($id, $self->$func(@params));


$tqm = Thread::Queue::Multiplex->new([MaxPending => $limit])

Constructor. Creates a new empty queue. If the MaxPending value is a non-zero value, the number of pending requests will be limited to $limit, and any further attempt to queue a request will block until the pending count drops below $limit. This limit may be applied or modified later via the set_max_pending() method (see below).

subscribe() aka listen()

Subscribe to the queue. The listen() alias is provided for compatibility with TQD apps.

unsubscribe() aka ignore()

Unsubscribe from the queue. The ignore() alias is provided for compatibility with TQD apps.

@subIDs = $tqm->get_subscribers()

Returns the current list of subscriber IDs.

$msgID = $tqm->publish(@request)

enqueue()s the @request to all subscribers.

$results = $tqm->publish_and_wait(@request)

Same as publish, except that it waits for and returns the response hash, rather than returning immediately with the request ID.

$results = $tqm->publish_and_wait_until($timeout, @request)

Same as publish, except that it waits up to $timeout seconds for all subscribers to respond, and returns the response hash, rather than returning immediately with the request ID. If some, but not all, subscribers respond within the timeout, the responses are discarded.

$msgID = $tqm->publish_urgent(@request)

Same as publish, but adds the element to head of queue, rather than tail.

$results = $tqm->publish_urgent_and_wait(@request)

Same as publish_and_wait, but adds the element to head of queue, rather than tail.

$results = $tqm->publish_urgent_and_wait_until($timeout, @request)

Same as publish_and_wait_until, but adds the element to head of queue, rather than tail.

$msgID = $tqm->publish_simplex(@request)

Same as publish, but does not allocate an identifier, nor expect a response.

$msgID = $tqm->publish_simplex_urgent(@request)

Same as publish_simplex, but adds the element to head of queue, rather than tail.

$count = $tqm->pending()

Returns the number of items still in the queue. Note that, for subscribers, the returned value is the number of requests published to the individual subscriber, which may be less than the total number of pending requests, due to directed enqueue requests. Also, for subscribers, the number may include requests which have been cancelled, but not yet processed/discarded by the subscriber.

The following TQD methods are overloaded by TQM to support directed requests by adding either a single scalar subscriber ID, or an arrayref of multiple subscriber IDs, as the first parameter:

$msgID = $tqm->enqueue( $subID, @request)

$msgID = $tqm->enqueue( [ @subIDs ], @request)

$msgID = $tqm->enqueue_simplex( $subID, @request)

$msgID = $tqm->enqueue_simplex( [ @subIDs ], @request)

$msgID = $tqm->enqueue_urgent( $subID, @request)

$msgID = $tqm->enqueue_urgent( [ @subIDs ], @request)

$msgID = $tqm->enqueue_simplex_urgent( $subID, @request)

$msgID = $tqm->enqueue_simplex_urgent( [ @subIDs ], @request)

$result = $tqm->enqueue_and_wait( $subID, @request)

$result = $tqm->enqueue_and_wait( [ @subIDs ], @request)

$result = $tqm->enqueue_urgent_and_wait( $subID, @request)

$result = $tqm->enqueue_urgent_and_wait( [ @subIDs ], @request)

$result = $tqm->enqueue_and_wait_until( $subID, $timeout, @request)

$result = $tqm->enqueue_and_wait_until( [ @subIDs ], $timeout, @request)

$result = $tqm->enqueue_urgent_and_wait_until( $subID, $timeout, @request)

$result = $tqm->enqueue_urgent_and_wait_until( [ @subIDs ], $timeout, @request)


If any subscriber thread dies, then the publisher may hang on any of the blocking publish calls, or the wait()/dequeue_response(). A future update may support occasional scans and forced unsubscribe() for dead threads.


Thread::Queue::Duplex, Thread::Queue::Queueable, threads, threads::shared, Thread::Queue


Dean Arnold, Presicient Corp.

Copyright(C) 2006, Presicient Corp., USA

Permission is granted to use this software under the same terms as Perl itself. Refer to the Perl Artistic License for details.