Thread::Queue::Multiplex (aka TQM) - thread-safe publish/subscribe queue
Thread-Queue-Multiplex-0.90.tar.gz use Thread::Queue::Multiplex;
#
# create new queue, limiting the max pending requests
# to 20
#
my $tqm = Thread::Queue::Multiplex->new(MaxPending => 20);
#
# register as a subscriber
#
$tqm->subscribe();
#
# unregister as a subscriber
#
$tqm->unsubscribe();
#
# wait for $count subscribers to register
#
$tqm->wait_for_subscribers($count, $timeout);
#
# get the list of current subscribers ID's
#
my @subids = $tqm->get_subscribers();
#
# change the max pending limit
#
$tqm->set_max_pending($limit);
#
# enqueue elements, returning a unique queue ID
# (used in the client)
#
my $id = $tqm->publish("foo", "bar");
#
# publish elements, and wait for a response
# (used in the client)
#
my $resp = $tqm->publish_and_wait("foo", "bar");
#
# publish elements, and wait for a response
# until $timeout secs (used in the client)
#
my $resp = $tqm->publish_and_wait_until($timeout, "foo", "bar");
#
# publish elements at head of queue, returning a
# unique queue ID (used in the client)
#
my $id = $tqm->publish_urgent("foo", "bar");
#
# publish elements at head of queue and wait for response
#
my $resp = $tqm->publish_urgent_and_wait("foo", "bar");
#
# publish elements at head of queue and wait for
# response until $timeout secs
#
my $resp = $tqm->publish_urgent_and_wait_until($timeout, "foo", "bar");
#
# publish elements for simplex operation (no response)
# returning the queue object
#
$tqm->publish_simplex("foo", "bar");
$tqm->publish_simplex_urgent("foo", "bar");
#
#########################################################
#
# subscribers use the existing TQD dequeue() methods
#
#######################################################
#
# modified versions of the TQD base enqueue methods
# to support directed messaging to a single subscriber
# or group of subscribers
#
#######################################################
#
# enqueue elements to a specific subscriber, returning
# a unique queue ID (used in the client)
#
my $id = $tqm->enqueue($subID, "foo", "bar");
#
# enqueue elements to 2 subscribers, and wait for a response
# (used in the client)
#
my $resp = $tqm->enqueue_and_wait([ $subID1, $subID2 ], "foo", "bar");
#
# enqueue elements, and wait for a response
# until $timeout secs (used in the client)
#
my $resp = $tqm->enqueue_and_wait_until($subID, $timeout, "foo", "bar");
#
# SEE Thread::Queue::Duplex for the various publisher enqueue()
# and wait() methods,
# and the subscriber dequeue() methods
#
A subclass of Thread::Queue::Duplex aka TQD which implements a "publish and subscribe" communications model for threads. Subscribers register with the queue, which registers the TID of the subscriber's thread as a subscriber ID. As the publisher publishes messages to the queue, each subscriber receives a copy of the message. If the publication is not simplex, the publisher expects all subscribers to read and respond to the message; otherwise, the publisher simply continues its processing. Thread::Queue::Multiplex provides publish()
method counterparts for all the Thread::Queue::Duplex enqueue()
methods, e.g., publish_simplex(), publish_urgent(), publish_and_wait(), publish_and_wait_until()
, etc.
Subscribers receive and reply to messages using the existing TQD dequeue() and respond() methods. In addition, modified versions of the enqueue() methods are provided to publishers to permit directing a message to a single subscriber, or subset of subscribers, by specifying the scalar subscriber ID (for single subscriber messages), or an arrayref of unique subscriber ID's (for multi-subscriber messages).
Thread::Queue::Multiplex
subclass overrides some of the internal behavior of Thread::Queue::Duplex by
->self()->tid()
when the subscriber subscribe()
s) mapped to a threads::shared array to hold ID's of messages published to the subscriber[message ID, flags, refcount, @params]
, where flags
indicates the urgent and/or simplex status of the request, and refcount
indicates the number of subscribers assigned to the request. A special refcount
value of -1 indicates that only the first subscriber to retrieve/process the request should respond (to mimic the behavior of Thread::Queueu::Duplex), which is specified by the publisher using any of the enqueue
methods with a subscriber ID of -1. $msg_map = {
$msgid => {
$subID1 => $subID1_response,
$subID2 => $subID2_response,
etc.
}
}
A normal processing sequence for Thread::Queue::Multiplex might be:
#
# Thread A (the client):
#
...marshal parameters for a coroutine...
my $id = $tqm->publish('function_name', \@paramlist);
my $results = $tqm->dequeue_response($id);
while (($subID, $subresult) = each %$results) {
...process $results...
}
#
# Thread B (a subscriber):
#
while (1) {
my $call = $tqm->dequeue;
my ($id, $func, @params) = @$call;
$tqm->respond($id, $self->$func(@params));
}
MaxPending
value is a non-zero value, the number of pending requests will be limited to $limit
, and any further attempt to queue a request will block until the pending count drops below $limit
. This limit may be applied or modified later via the set_max_pending()
method (see below).@request
to all subscribers.enqueue
requests. Also, for subscribers, the number may include requests which have been cancelled, but not yet processed/discarded by the subscriber.The following TQD methods are overloaded by TQM to support directed requests by adding either a single scalar subscriber ID, or an arrayref of multiple subscriber IDs, as the first parameter:
If any subscriber thread dies, then the publisher may hang on any of the blocking publish calls, or the wait()/dequeue_response(). A future update may support occasional scans and forced unsubscribe() for dead threads.
Thread::Queue::Duplex, Thread::Queue::Queueable, threads, threads::shared, Thread::Queue
Dean Arnold, Presicient Corp. darnold@presicient.com
Copyright(C) 2006, Presicient Corp., USA
Permission is granted to use this software under the same terms as Perl itself. Refer to the Perl Artistic License for details.