Icestorm是一个高效的用于ICE应用的发布/订阅服务,IceStorm有几个比较重要的概念: 2 消息:IceStorm的消息和普通的消息队列中间件中描述的消息有点区别,IceStorm 的消息是强类型的,由对某个Slice 操作的调用代表:操作名标识消息的类型,操作参数定义消息内容。要发布消息,可以按普通的方式调用某个IceStorm 代理上的操作。与此类似,订阅者会像收到常规的向上调用(upcall)一样收到消息。所以IceStorm 的消息递送使用的是“推”模式 2 主题:应用要通过订阅某个主题(topic)来表明自己有兴趣接收某些消息。IceStorm 服务器能够支持任意数量的主题,这些主题是动态创建的,通过唯一的名字来区分。每个主题都可以有多个发布者和订阅者。 2 持久模式:IceStorm 拥有一个数据库,里面维护的是关于其主题和链接的信息。但是,通过IceStorm 发送的消息不会被持久地存储,而是会在递送给主题目前的订阅者集之后,马上被丢弃。如果在把消息递送给某个订阅者的过程中发生错误, IceStorm 不会为该订阅者进行消息排队。 2 订阅者出错:因为IceStorm 消息是采用单向语义递送的, IceStorm 只能检测到连接或超时错误。如果在把消息递送给订阅者的过程中, IceStorm 遇到这样的错误,该订阅者就会立刻被解除与该消息对应的主题的订阅。当然用户在使用过程中也可以通过设定QOS参数来改善这个问题,比如重试次数(retryCount),但是对于ObjectNotExistException 或者NotRegisteredException之类的硬错误,Ice运行时不会重试,而是仍然直接解除订阅关系。
service.py -
-
-
-
- import os
- import Ice
- import IceStorm
- import sys
-
- Ice.loadSlice('Hello.ice')
-
- import Demo
-
-
- ic=Ice.initialize()
-
-
- adapter=ic.stringToProxy('DemoIceStorm/TopicManager:default -p 20000')
-
-
- manager=IceStorm.TopicManagerPrx.checkedCast(adapter)
-
-
- topicName = "msg"
-
-
- allmsg=manager.retrieveAll()
-
-
- if not topicName in allmsg:
- topic=manager.create(topicName)
- else:
- topic=manager.retrieve(topicName)
-
-
- publisher = topic.getPublisher()
-
-
- msg = Demo.HelloPrx.uncheckedCast(publisher)
- msg.sayHello("hello %s" %(' '.join(sys.argv)))
-
- if ic:
- ic.destroy()
client.py -
-
-
-
- import os
- import Ice
- import IceStorm
- import sys
-
- Ice.loadSlice('Hello.ice')
-
- import Demo
-
-
- class HelloI(Demo.Hello):
- def sayHello(self, msg, current=None):
- print "Hello World!",msg
-
- def shutdown(self, current=None):
- current.adapter.getCommunicator().shutdown()
-
-
-
-
- ic=Ice.initialize()
-
-
- adapter0=ic.stringToProxy('DemoIceStorm/TopicManager:default -p 20000')
-
-
- manager=IceStorm.TopicManagerPrx.checkedCast(adapter0)
-
-
- topicName = "msg"
-
-
- allmsg=manager.retrieveAll()
-
-
- if not topicName in allmsg:
- topic=manager.create(topicName)
- else:
- topic=manager.retrieve(topicName)
-
-
- adapter = ic.createObjectAdapterWithEndpoints("SimperHello","default -p 20002")
-
- subId = Ice.Identity()
- subId.name = Ice.generateUUID()
-
- subscriber = adapter.add(HelloI(), subId)
-
- qos = {'reliability':'ordered'}
-
-
-
- topic.subscribeAndGetPublisher(qos, subscriber)
-
- adapter.activate()
- ic.waitForShutdown()
|