+ print '%s.notify() got the message "%s"' %(self.number, message)
+ def __str__(self):
+ return "SimpleListener_%s" % self.number
+ def testSubscribe():
+ publisher = Publisher()
+ topic1 = 'politics'
+ topic2 = ('history','middle age')
+ topic3 = ('politics','UN')
+ topic4 = ('politics','NATO')
+ topic5 = ('politics','NATO','US')
+ lisnr1 = SimpleListener(1)
+ lisnr2 = SimpleListener(2)
+ def func(message, a=1):
+ print 'Func received message "%s"' % message
+ lisnr3 = func
+ lisnr4 = lambda x: 'Lambda received message "%s"' % x
+ assert not publisher.isSubscribed(lisnr1)
+ assert not publisher.isSubscribed(lisnr2)
+ assert not publisher.isSubscribed(lisnr3)
+ assert not publisher.isSubscribed(lisnr4)
+ publisher.subscribe(lisnr1, topic1)
+ assert publisher.getAssociatedTopics(lisnr1) == [(topic1,)]
+ publisher.subscribe(lisnr1, topic2)
+ publisher.subscribe(lisnr1, topic1) # do it again, should be no-op
+ assert publisher.getAssociatedTopics(lisnr1) == [(topic1,),topic2]
+ publisher.subscribe(lisnr2.notify, topic3)
+ assert publisher.getAssociatedTopics(lisnr2.notify) == [topic3]
+ assert publisher.getAssociatedTopics(lisnr1) == [(topic1,),topic2]
+ publisher.subscribe(lisnr3, topic5)
+ assert publisher.getAssociatedTopics(lisnr3) == [topic5]
+ assert publisher.getAssociatedTopics(lisnr2.notify) == [topic3]
+ assert publisher.getAssociatedTopics(lisnr1) == [(topic1,),topic2]
+ publisher.subscribe(lisnr4)
+ print "Publisher tree: ", publisher
+ assert publisher.isSubscribed(lisnr1)
+ assert publisher.isSubscribed(lisnr1, topic1)
+ assert publisher.isSubscribed(lisnr1, topic2)
+ assert publisher.isSubscribed(lisnr2.notify)
+ assert publisher.isSubscribed(lisnr3, topic5)
+ assert publisher.isSubscribed(lisnr4, ALL_TOPICS)
+ expectTopicTree = 'all: <lambda> (politics: SimpleListener_1 (UN: SimpleListener_2.notify ) (NATO: (US: func ))) (history: (middle age: SimpleListener_1 ))'
+ print "Publisher tree: ", publisher
+ assert str(publisher) == expectTopicTree
+ publisher.unsubscribe(lisnr1, 'booboo') # should do nothing
+ assert publisher.getAssociatedTopics(lisnr1) == [(topic1,),topic2]
+ assert publisher.getAssociatedTopics(lisnr2.notify) == [topic3]
+ assert publisher.getAssociatedTopics(lisnr3) == [topic5]
+ publisher.unsubscribe(lisnr1, topic1)
+ assert publisher.getAssociatedTopics(lisnr1) == [topic2]
+ assert publisher.getAssociatedTopics(lisnr2.notify) == [topic3]
+ assert publisher.getAssociatedTopics(lisnr3) == [topic5]
+ publisher.unsubscribe(lisnr1, topic2)
+ publisher.unsubscribe(lisnr1, topic2)
+ publisher.unsubscribe(lisnr2.notify, topic3)
+ publisher.unsubscribe(lisnr3, topic5)
+ assert publisher.getAssociatedTopics(lisnr1) == []
+ assert publisher.getAssociatedTopics(lisnr2.notify) == []
+ assert publisher.getAssociatedTopics(lisnr3) == []
+ publisher.unsubscribe(lisnr4)
+ expectTopicTree = 'all: (politics: (UN: ) (NATO: (US: ))) (history: (middle age: ))'
+ print "Publisher tree: ", publisher
+ assert str(publisher) == expectTopicTree
+ assert publisher.getDeliveryCount() == 0
+ assert publisher.getMessageCount() == 0
+ publisher.unsubAll()
+ assert str(publisher) == 'all: '
+ done('testSubscribe')
+ testSubscribe()
+ #------------------------
+ def testUnsubAll():
+ publisher = Publisher()
+ topic1 = 'politics'
+ topic2 = ('history','middle age')
+ topic3 = ('politics','UN')
+ topic4 = ('politics','NATO')
+ topic5 = ('politics','NATO','US')
+ lisnr1 = SimpleListener(1)
+ lisnr2 = SimpleListener(2)
+ def func(message, a=1):
+ print 'Func received message "%s"' % message
+ lisnr3 = func
+ lisnr4 = lambda x: 'Lambda received message "%s"' % x
+ publisher.subscribe(lisnr1, topic1)
+ publisher.subscribe(lisnr1, topic2)
+ publisher.subscribe(lisnr2.notify, topic3)
+ publisher.subscribe(lisnr3, topic2)
+ publisher.subscribe(lisnr3, topic5)
+ publisher.subscribe(lisnr4)
+ expectTopicTree = 'all: <lambda> (politics: SimpleListener_1 (UN: SimpleListener_2.notify ) (NATO: (US: func ))) (history: (middle age: SimpleListener_1 func ))'
+ print "Publisher tree: ", publisher
+ assert str(publisher) == expectTopicTree
+ publisher.unsubAll(topic1)
+ assert publisher.getAssociatedTopics(lisnr1) == [topic2]
+ assert not publisher.isSubscribed(lisnr1, topic1)
+ publisher.unsubAll(topic2)
+ print publisher
+ assert publisher.getAssociatedTopics(lisnr1) == []
+ assert publisher.getAssociatedTopics(lisnr3) == [topic5]
+ assert not publisher.isSubscribed(lisnr1)
+ assert publisher.isSubscribed(lisnr3, topic5)
+ #print "Publisher tree: ", publisher
+ expectTopicTree = 'all: <lambda> (politics: (UN: SimpleListener_2.notify ) (NATO: (US: func ))) (history: (middle age: ))'
+ assert str(publisher) == expectTopicTree
+ publisher.unsubAll(ALL_TOPICS)
+ #print "Publisher tree: ", publisher
+ expectTopicTree = 'all: (politics: (UN: SimpleListener_2.notify ) (NATO: (US: func ))) (history: (middle age: ))'
+ assert str(publisher) == expectTopicTree
+ publisher.unsubAll()
+ done('testUnsubAll')
+ testUnsubAll()
+ #------------------------
+ def testSend():
+ publisher = Publisher()
+ called = []
+ class TestListener:
+ def __init__(self, num):
+ self.number = num
+ def __call__(self, b):
+ called.append( 'TL%scb' % self.number )
+ def notify(self, b):
+ called.append( 'TL%sm' % self.number )
+ def funcListener(b):
+ called.append('func')
+ lisnr1 = TestListener(1)
+ lisnr2 = TestListener(2)
+ lisnr3 = funcListener
+ lisnr4 = lambda x: called.append('lambda')
+ topic1 = 'politics'
+ topic2 = 'history'
+ topic3 = ('politics','UN')
+ topic4 = ('politics','NATO','US')
+ topic5 = ('politics','NATO')
+ publisher.subscribe(lisnr1, topic1)
+ publisher.subscribe(lisnr2, topic2)
+ publisher.subscribe(lisnr2.notify, topic2)
+ publisher.subscribe(lisnr3, topic4)
+ publisher.subscribe(lisnr4)
+ print publisher
+ # setup ok, now test send/receipt
+ publisher.sendMessage(topic1)
+ assert called == ['lambda','TL1cb']
+ called = []
+ publisher.sendMessage(topic2)
+ assert called == ['lambda','TL2cb','TL2m']
+ called = []
+ publisher.sendMessage(topic3)
+ assert called == ['lambda','TL1cb']
+ called = []
+ publisher.sendMessage(topic4)
+ assert called == ['lambda','TL1cb','func']
+ called = []
+ publisher.sendMessage(topic5)
+ assert called == ['lambda','TL1cb']
+ assert publisher.getDeliveryCount() == 12
+ assert publisher.getMessageCount() == 5
+ # test weak referencing works:
+ _NodeCallback.notified = 0
+ del lisnr2
+ called = []
+ publisher.sendMessage(topic2)
+ assert called == ['lambda']
+ assert _NodeCallback.notified == 2
+ done('testSend')
+ testSend()
+ assert _NodeCallback.notified == 5
+ def testDead():
+ # verify if weak references work as expected
+ print '------ Starting testDead ----------'
+ node = _TopicTreeNode('t1', None)
+ lisnr1 = SimpleListener(1)
+ lisnr2 = SimpleListener(2)
+ lisnr3 = SimpleListener(3)
+ lisnr4 = SimpleListener(4)
+ node.addCallable(lisnr1)
+ node.addCallable(lisnr2)
+ node.addCallable(lisnr3)
+ node.addCallable(lisnr4)
+ print 'Deleting listeners first'
+ _NodeCallback.notified = 0
+ del lisnr1
+ del lisnr2
+ assert _NodeCallback.notified == 2
+ print 'Deleting node first'
+ _NodeCallback.notified = 0
+ del node
+ del lisnr3
+ del lisnr4
+ assert _NodeCallback.notified == 0
+ lisnr1 = SimpleListener(1)
+ lisnr2 = SimpleListener(2)
+ lisnr3 = SimpleListener(3)
+ lisnr4 = SimpleListener(4)
+ # try same with root of tree
+ node = _TopicTreeRoot()
+ node.addTopic(('',), lisnr1)
+ node.addTopic(('',), lisnr2)
+ node.addTopic(('',), lisnr3)
+ node.addTopic(('',), lisnr4)
+ # add objects that will die immediately to see if cleanup occurs
+ # this must be done visually as it is a low-level detail
+ _NodeCallback.notified = 0
+ _TopicTreeRoot.callbackDeadLimit = 3
+ node.addTopic(('',), SimpleListener(5))
+ node.addTopic(('',), SimpleListener(6))
+ node.addTopic(('',), SimpleListener(7))
+ print node.numListeners()
+ assert node.numListeners() == (4, 3)
+ node.addTopic(('',), SimpleListener(8))
+ assert node.numListeners() == (4, 0)
+ assert _NodeCallback.notified == 4
+ print 'Deleting listeners first'
+ _NodeCallback.notified = 0
+ del lisnr1
+ del lisnr2
+ assert _NodeCallback.notified == 2
+ print 'Deleting node first'
+ _NodeCallback.notified = 0
+ del node
+ del lisnr3
+ del lisnr4
+ assert _NodeCallback.notified == 0
+ done('testDead')
+ testDead()
+ print 'Exiting tests'