+The Server and Message classes are the two that clients interact
+with..
+
+This module is compatible with Python 2.1.
+
+Author: Robb Shecter
+"""
+
+#---------------------------------------------------------------------------
+
+class Publisher:
+ """
+ The publish/subscribe server. This class is a Singleton.
+ """
+ def __init__(self):
+ self.topicDict = {}
+ self.functionDict = {}
+ self.subscribeAllList = []
+ self.messageCount = 0
+ self.deliveryCount = 0
+
+
+ #
+ # Public API
+ #
+
+ def subscribe(self, topic, listener):
+ """
+ Add the given subscription to the list. This will
+ add an entry recording the fact that the listener wants
+ to get messages for (at least) the given topic. This
+ method may be called multiple times for one listener,
+ registering it with many topics. It can also be invoked
+ many times for a particular topic, each time with a
+ different listener.
+
+ listener: expected to be either a method or function that
+ takes zero or one parameters. (Not counting 'self' in the
+ case of methods. If it accepts a parameter, it will be given
+ a reference to a Message object.
+
+ topic: will be converted to a tuple if it isn't one.
+ It's a pattern matches any topic that it's a sublist
+ of. For example, this pattern:
+
+ ('sports',)
+
+ would match these:
+
+ ('sports',)
+ ('sports', 'baseball')
+ ('sports', 'baseball', 'highscores')
+
+ but not these:
+
+ ()
+ ('news')
+ (12345)
+ """
+ if not callable(listener):
+ raise TypeError('The P/S listener, '+`listener`+', is not callable.')
+ aTopic = Topic(topic)
+
+ # Determine now (at registration time) how many parameters
+ # the listener expects, and get a reference to a function which
+ # calls it correctly at message-send time.
+ callableVersion = self.__makeCallable(listener)
+
+ # Add this tuple to a list which is in a dict keyed by
+ # the topic's first element.
+ self.__addTopicToCorrectList(aTopic, listener, callableVersion)
+
+ # Add to a dict in order to speed-up unsubscribing.
+ self.__addFunctionLookup(listener, aTopic)
+
+
+ def unsubscribe(self, listener):
+ """
+ Remove the given listener from the registry,
+ for all topics that it's associated with.
+ """
+ if not callable(listener):
+ raise TypeError('The P/S listener, '+`listener`+', is not callable.')
+ topicList = self.getAssociatedTopics(listener)
+ for aTopic in topicList:
+ subscriberList = self.__getTopicList(aTopic)
+ listToKeep = []
+ for subscriber in subscriberList:
+ if subscriber[0] != listener:
+ listToKeep.append(subscriber)
+ self.__setTopicList(aTopic, listToKeep)
+ self.__delFunctionLookup(listener)
+
+
+ def getAssociatedTopics(self, listener):
+ """
+ Return a list of topics the given listener is
+ registered with.
+ """
+ return self.functionDict.get(listener, [])
+
+
+ def sendMessage(self, topic, data=None):
+ """
+ Relay a message to registered listeners.
+ """
+ aTopic = Topic(topic)
+ message = Message(aTopic.items, data)
+ topicList = self.__getTopicList(aTopic)
+
+ # Send to the matching topics
+ for subscriber in topicList:
+ if subscriber[1].matches(aTopic):
+ subscriber[2](message)
+
+ # Send to any listeners registered for ALL
+ for subscriber in self.subscribeAllList:
+ subscriber[2](message)
+
+
+ #
+ # Private methods
+ #
+
+ def __makeCallable(self, function):
+ """
+ Return a function that is what the server
+ will actually call.
+
+ This is a time optimization: this removes a test
+ for the number of parameters from the inner loop
+ of sendMessage().
+ """
+ parameters = self.__parameterCount(function)
+ if parameters == 0:
+ # Return a function that calls the listener
+ # with no arguments.
+ return lambda m, f=function: f()
+ elif parameters == 1:
+ # Return a function that calls the listener
+ # with one argument (which will be the message).
+ return lambda m, f=function: f(m)
+ else:
+ raise TypeError('The publish/subscribe listener, '+`function`+', has wrong parameter count')
+
+
+ def __parameterCount(self, callableObject):
+ """
+ Return the effective number of parameters required
+ by the callable object. In other words, the 'self'
+ parameter of methods is not counted.
+ """
+ try:
+ # Try to handle this like a method
+ return callableObject.im_func.func_code.co_argcount - 1
+ except AttributeError:
+ pass
+
+ try:
+ # Try to handle this like a function
+ return callableObject.func_code.co_argcount
+ except AttributeError:
+ raise 'Cannot determine if this is a method or function: '+str(callableObject)
+
+ def __addFunctionLookup(self, aFunction, aTopic):
+ try:
+ aList = self.functionDict[aFunction]
+ except KeyError:
+ aList = []
+ self.functionDict[aFunction] = aList
+ aList.append(aTopic)
+
+
+ def __delFunctionLookup(self, aFunction):
+ try:
+ del self.functionDict[aFunction]
+ except KeyError:
+ print 'Warning: listener not found. Logic error in PublishSubscribe?', aFunction
+
+
+ def __addTopicToCorrectList(self, topic, listener, callableVersion):
+ if len(topic.items) == 0:
+ self.subscribeAllList.append((listener, topic, callableVersion))
+ else:
+ self.__getTopicList(topic).append((listener, topic, callableVersion))
+
+
+ def __getTopicList(self, aTopic):
+ """
+ Return the correct sublist of subscribers based on the
+ given topic.
+ """
+ try:
+ elementZero = aTopic.items[0]
+ except IndexError:
+ return self.subscribeAllList
+
+ try:
+ subList = self.topicDict[elementZero]
+ except KeyError:
+ subList = []
+ self.topicDict[elementZero] = subList
+ return subList
+
+
+ def __setTopicList(self, aTopic, aSubscriberList):
+ try:
+ self.topicDict[aTopic.items[0]] = aSubscriberList
+ except IndexError:
+ self.subscribeAllList = aSubscriberList
+
+
+ def __call__(self):
+ return self
+
+
+# Create an instance with the same name as the class, effectivly
+# hiding the class object so it can't be instantiated any more. From
+# this point forward any calls to Publisher() will invoke the __call__
+# of this instance which just returns itself.
+#
+# The only flaw with this approach is that you can't derive a new
+# class from Publisher without jumping through hoops. If this ever
+# becomes an issue then a new Singleton implementaion will need to be
+# employed.
+Publisher = Publisher()
+
+
+#---------------------------------------------------------------------------
+
+class Message:
+ """
+ A simple container object for the two components of
+ a message; the topic and the data.
+ """
+ def __init__(self, topic, data):
+ self.topic = topic
+ self.data = data
+
+ def __str__(self):
+ return '[Topic: '+`self.topic`+', Data: '+`self.data`+']'
+
+
+#---------------------------------------------------------------------------
+
+class Topic:
+ """
+ A class that represents a publish/subscribe topic.
+ Currently, it's only used internally in the framework; the
+ API expects and returns plain old tuples.
+
+ It currently exists mostly as a place to keep the matches()
+ function. This function, though, could also correctly be
+ seen as an attribute of the P/S server. Getting rid of this
+ class would also mean one fewer object instantiation per
+ message send.
+ """
+
+ listType = type([])
+ tupleType = type(())
+
+ def __init__(self, items):
+ # Make sure we have a tuple.
+ if type(items) == self.__class__.listType:
+ items = tuple(items)
+ elif type(items) != self.__class__.tupleType:
+ items = (items,)
+ self.items = items
+ self.length = len(items)
+
+
+ def matches(self, aTopic):
+ """
+ Consider myself to be a topic pattern,
+ and return True if I match the given specific
+ topic. For example,
+ a = ('sports')
+ b = ('sports','baseball')
+ a.matches(b) --> 1
+ b.matches(a) --> 0
+ """
+ # The question this method answers is equivalent to;
+ # is my list a sublist of aTopic's? So, my algorithm
+ # is: 1) make a copy of the aTopic list which is
+ # truncated to the pattern's length. 2) Test for
+ # equality.
+ #
+ # This algorithm may be somewhat memory-intensive,
+ # because it creates a temporary list on each
+ # call to match. A possible to-do would be to
+ # re-write this with a hand-coded loop.
+ return (self.items == aTopic.items[:self.length])
+
+
+ def __repr__(self):
+ import string
+ return '<Topic>' + string.join(map(repr, self.items), ', ') + '</Topic>'
+
+
+ def __eq__(self, aTopic):
+ """
+ Return True if I equal the given topic. We're considered
+ equal if our tuples are equal.
+ """
+ if type(self) != type(aTopic):
+ return 0
+ else:
+ return self.items == aTopic.items
+
+
+ def __ne__(self, aTopic):
+ """
+ Return False if I equal the given topic.
+ """
+ return not self == aTopic
+
+
+#---------------------------------------------------------------------------
+
+
+#
+# Code for a simple command-line test
+#
+if __name__ == '__main__':
+
+ class SimpleListener:
+ def __init__(self, number):
+ self.number = number
+ def notify(self, message):
+ print '#'+str(self.number)+' got the message:', message
+
+ # Build a list of ten listeners.
+ lList = []
+ for x in range(10):
+ lList.append(SimpleListener(x))
+
+ server = Publisher()
+
+ # Everyone's interested in politics...
+ for x in lList:
+ Publisher().subscribe(topic='politics', listener=x.notify) # also tests singleton
+
+ # But only the first four are interested in trivia.
+ for x in lList[:4]:
+ server.subscribe(topic='trivia', listener=x.notify)
+
+ # This one subscribes to everything.
+ everythingListener = SimpleListener(999)
+ server.subscribe(topic=(), listener=everythingListener.notify)
+
+ # Now send out two messages, testing topic matching.
+ server.sendMessage(topic='trivia', data='What is the capitol of Oregon?')
+ server.sendMessage(topic=('politics','germany'), data='The Greens have picked up another seat in the Bundestag.')
+
+#---------------------------------------------------------------------------