配色: 字号:
storm入门教程
2017-03-31 | 阅:  转:  |  分享 
  
storm入门教程第一章storm概述?1.1实时流计算互联网从诞生的第一时间起,对世界的最大的改变就是让信息能够实时交互,从而大大加速
了各个环节的效率。正因为大家对信息实时响应、实时交互的需求,软件行业除了个人操作系统之外,数据库(更精确的说是关系型数据库)应该是
软件行业发展最快、收益最为丰厚的产品了。记得十年前,很多银行别说实时转账,连实时查询都做不到,但是数据库和高速网络改变了这个情况。
随着互联网的更进一步发展,从Portal信息浏览型到Search信息搜索型到SNS关系交互传递型,以及电子商务、互联网旅游生活产
品等将生活中的流通环节在线化。对效率的要求让大家对于实时性的要求进一步提升,而信息的交互和沟通正在从点对点往信息链甚至信息网的方向
发展,这样必然带来数据在各个维度的交叉关联,数据爆炸已不可避免。因此流式处理加NoSQL产品应运而生,分别解决实时框架和数据大规模
存储计算的问题。早在7、8年前诸如UC伯克利、斯坦福等大学就开始了对流式数据处理的研究,但是由于更多的关注于金融行业的业务场景或
者互联网流量监控的业务场景,以及当时互联网数据场景的限制,造成了研究多是基于对传统数据库处理的流式化,对流式框架本身的研究偏少。目
前这样的研究逐渐没有了声音,工业界更多的精力转向了实时数据库。2010年Yahoo!对S4的开源,2011年twitter对St
orm的开源,改变了这个情况。以前互联网的开发人员在做一个实时应用的时候,除了要关注应用逻辑计算处理本身,还要为了数据的实时流转、
交互、分布大伤脑筋。但是现在情况却大为不同,以Storm为例,开发人员可以快速的搭建一套健壮、易用的实时流处理框架,配合SQL产品
或者NoSQL产品或者MapReduce计算平台,就可以低成本的做出很多以前很难想象的实时产品:比如一淘数据部的量子恒道品牌旗下的
多个产品就是构建在实时流处理平台上的。本教程是一本对storm的基础介绍手册,但是我们也希望它不仅仅是一本storm的使用手册,
我们会在其中加入更多我们在实际数据生产过程的经验和应用的架构,最后的目的是帮助所有愿意使用实时流处理框架的技术同仁,同时也默默的改
变这个世界。1.2Storm特点Storm是一个开源的分布式实时计算系统,可以简单、可靠的处理大量的数据流。Storm有很多使用
场景:如实时分析,在线机器学习,持续计算,分布式RPC,ETL等等。Storm支持水平扩展,具有高容错性,保证每个消息都会得到处理
,而且处理速度很快(在一个小集群中,每个结点每秒可以处理数以百万计的消息)。Storm的部署和运维都很便捷,而且更为重要的是可以使
用任意编程语言来开发应用。Storm有如下特点:编程模型简单在大数据处理方面相信大家对hadoop已经耳熟能详,基于Goog
leMap/Reduce来实现的Hadoop为开发者提供了map、reduce原语,使并行批处理程序变得非常地简单和优美。同样,
Storm也为大数据的实时计算提供了一些简单优美的原语,这大大降低了开发并行实时处理的任务的复杂性,帮助你快速、高效的开发应用。
可扩展在Storm集群中真正运行topology的主要有三个实体:工作进程、线程和任务。Storm集群中的每台机器上都可以运行多
个工作进程,每个工作进程又可创建多个线程,每个线程可以执行多个任务,任务是真正进行数据处理的实体,我们开发的spout、bolt就
是作为一个或者多个任务的方式执行的。因此,计算任务在多个线程、进程和服务器之间并行进行,支持灵活的水平扩展。高可靠性Stor
m可以保证spout发出的每条消息都能被“完全处理”,这也是直接区别于其他实时系统的地方,如S4。请注意,spout发出的消息后
续可能会触发产生成千上万条消息,可以形象的理解为一棵消息树,其中spout发出的消息为树根,Storm会跟踪这棵消息树的处理情况,
只有当这棵消息树中的所有消息都被处理了,Storm才会认为spout发出的这个消息已经被“完全处理”。如果这棵消息树中的任何一个消
息处理失败了,或者整棵消息树在限定的时间内没有“完全处理”,那么spout发出的消息就会重发。考虑到尽可能减少对内存的消耗,St
orm并不会跟踪消息树中的每个消息,而是采用了一些特殊的策略,它把消息树当作一个整体来跟踪,对消息树中所有消息的唯一id进行异或计
算,通过是否为零来判定spout发出的消息是否被“完全处理”,这极大的节约了内存和简化了判定逻辑,后面会对这种机制进行详细介绍。
这种模式,每发送一个消息,都会同步发送一个ack/fail,对于网络的带宽会有一定的消耗,如果对于可靠性要求不高,可通过使用不同的
emit接口关闭该模式。上面所说的,Storm保证了每个消息至少被处理一次,但是对于有些计算场合,会严格要求每个消息只被处理一次
,幸而Storm的0.7.0引入了事务性拓扑,解决了这个问题,后面会有详述。高容错性如果在消息处理过程中出了一些异常,Stor
m会重新安排这个出问题的处理单元。Storm保证一个处理单元永远运行(除非你显式杀掉这个处理单元)。当然,如果处理单元中存储了中
间状态,那么当处理单元重新被Storm启动的时候,需要应用自己处理中间状态的恢复。支持多种编程语言除了用java实现spout
和bolt,你还可以使用任何你熟悉的编程语言来完成这项工作,这一切得益于Storm所谓的多语言协议。多语言协议是Storm内部的一
种特殊协议,允许spout或者bolt使用标准输入和标准输出来进行消息传递,传递的消息为单行文本或者是json编码的多行。Sto
rm支持多语言编程主要是通过ShellBolt,ShellSpout和ShellProcess这些类来实现的,这些类都实现了IB
olt和ISpout接口,以及让shell通过java的ProcessBuilder类来执行脚本或者程序的协议。可以看到,采
用这种方式,每个tuple在处理的时候都需要进行json的编解码,因此在吞吐量上会有较大影响。支持本地模式Storm有一种“本
地模式”,也就是在进程中模拟一个Storm集群的所有功能,以本地模式运行topology跟在集群上运行topology类似,这对于
我们开发和测试来说非常有用。高效用ZeroMQ作为底层消息队列,保证消息能快速被处理第二章Storm术语介绍及构建Top
ology2.1Storm基本概念在运行一个Storm任务之前,需要了解一些概念:TopologiesStreamsSp
outsBoltsStreamgroupingsReliabilityTasksWorkersConfigurati
onStorm集群和Hadoop集群表面上看很类似。但是Hadoop上运行的是MapReducejobs,而在Storm上运行
的是拓扑(topology),这两者之间是非常不一样的。一个关键的区别是:一个MapReducejob最终会结束,而一个to
pology永远会运行(除非你手动kill掉)。在Storm的集群里面有两种节点:控制节点(masternode)和工作节点
(workernode)。控制节点上面运行一个叫Nimbus后台程序,它的作用类似Hadoop里面的JobTracker。Nim
bus负责在集群里面分发代码,分配计算任务给机器,并且监控状态。每一个工作节点上面运行一个叫做Supervisor的节点。Su
pervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程。每一个工作进程执行一个topology的一个子集;一个运
行的topology由运行在很多机器上的很多工作进程组成。Nimbus和Supervisor之间的所有协调工作都是通过Zook
eeper集群完成。另外,Nimbus进程和Supervisor进程都是快速失败(fail-fast)和无状态的。所有的状态要么在
zookeeper里面,要么在本地磁盘上。这也就意味着你可以用kill-9来杀死Nimbus和Supervisor进程,然后
再重启它们,就好像什么都没有发生过。这个设计使得Storm异常的稳定。一个topology是spouts和bolts组成的图,
通过streamgroupings将图中的spouts和bolts连接起来,如下图:一个topology会一直运行直到你手动
kill掉,Storm自动重新分配执行失败的任务,并且Storm可以保证你不会有数据丢失(如果开启了高可靠性的话)。如果一些机器
意外停机它上面的所有任务会被转移到其他机器上。运行一个topology很简单。首先,把你所有的代码以及所依赖的jar打进一个ja
r包。然后运行类似下面的这个命令:stormjarall-my-code.jarbacktype.storm.MyTopo
logyarg1arg2复制代码这个命令会运行主类:backtype.strom.MyTopology,参数是arg1,
arg2。这个类的main函数定义这个topology并且把它提交给Nimbus。stormjar负责连接到Nimbus并且上
传jar包。Topology的定义是一个Thrift结构,并且Nimbus就是一个Thrift服务,你可以提交由任何语言创建的
topology。上面的方面是用JVM-based语言提交的最简单的方法。消息流stream是storm里的关键抽象。一个消息流
是一个没有边界的tuple序列,而这些tuple序列会以一种分布式的方式并行地创建和处理。通过对stream中tuple序列中每
个字段命名来定义stream。在默认的情况下,tuple的字段类型可以是:integer,long,short,byte,str
ing,double,float,boolean和bytearray。你也可以自定义类型(只要实现相应的序列化器)。每个消息流
在定义的时候会被分配给一个id,因为单向消息流使用的相当普遍,OutputFieldsDeclarer定义了一些方法让你可以定义
一个stream而不用指定这个id。在这种情况下这个stream会分配个值为‘default’默认的id。Storm提供的最基
本的处理stream的原语是spout和bolt。你可以实现spout和bolt提供的接口来处理你的业务逻辑。消息源spout是
Storm里面一个topology里面的消息生产者。一般来说消息源会从一个外部源读取数据并且向topology里面发出消息:tup
le。Spout可以是可靠的也可以是不可靠的。如果这个tuple没有被storm成功处理,可靠的消息源spouts可以重新发射一个
tuple,但是不可靠的消息源spouts一旦发出一个tuple就不能重发了。消息源可以发射多条消息流stream。使用Out
putFieldsDeclarer.declareStream来定义多个stream,然后使用SpoutOutputCollect
or来发射指定的stream。Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面
或者简单的返回如果已经没有新的tuple。要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源s
pout的方法。另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处
理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。所有的消息处理逻辑被封装在bolts里
面。Bolts可以做很多事情:过滤,聚合,查询数据库等等。Bolts可以简单的做消息流的传递。复杂的消息流处理往往需要很多步骤,
从而也就需要经过很多bolts。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量。第二步找出转发最
多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。Bolts可以发射多条消息流,使用OutputF
ieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发射的
stream。Bolts的主要方法是execute,它以一个tuple作为输入,bolts使用OutputCollector来
发射tuple,bolts必须要为它处理的每一个tuple调用OutputCollector的ack方法,以通知Storm这个tu
ple被处理完成了,从而通知这个tuple的发射者spouts。一般的流程是:bolts处理一个输入tuple,发射0个或者
多个tuple,然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用a
ck。定义一个topology的其中一步是定义每个bolt接收什么样的流作为输入。streamgrouping就是用来定义一个
stream应该如果分配数据给bolts上面的多个tasks。Storm里面有7种类型的streamgroupingShu
ffleGrouping:随机分组,随机派发stream里面的tuple,保证每个bolt接收到的tuple数目大致相同。
FieldsGrouping:按字段分组,比如按userid来分组,具有同样userid的tuple会被分到相同的Bolt
s里的一个task,而不同的userid则会被分配到不同的bolts里的task。AllGrouping:广播发送,对于每一
个tuple,所有的bolts都会收到。GlobalGrouping:全局分组,这个tuple被分配到storm中的一个bo
lt的其中一个task。再具体一点就是分配给id值最低的那个task。NonGrouping:不分组,这个分组的意思是说st
ream不关心到底谁会收到它的tuple。目前这种分组和Shufflegrouping是一样的效果,有一点不同的是storm会
把这个bolt放到这个bolt的订阅者同一个线程里面去执行。DirectGrouping:直接分组,这是一种比较特别的分
组方法,用这种分组意味着消息的发送者指定由消息接收者的哪个task处理这个消息。只有被声明为DirectStream的消息流可
以声明这种分组方法。而且这种消息tuple必须使用emitDirect方法来发射。消息处理者可以通过TopologyContext
来获取处理它的消息的task的id(OutputCollector.emit方法也会返回task的id)。Localors
hufflegrouping:如果目标bolt有一个或者多个task在同一个工作进程中,tuple将会被随机发生给这些tasks
。否则,和普通的ShuffleGrouping行为一致。Storm保证每个tuple会被topology完整的执行。Storm
会追踪由每个spouttuple所产生的tuple树(一个bolt处理一个tuple之后可能会发射别的tuple从而形成树状结构
),并且跟踪这棵tuple树什么时候成功处理完。每个topology都有一个消息超时的设置,如果storm在这个超时的时间内检测不
到某个tuple树到底有没有执行成功,那么topology会把这个tuple标记为执行失败,并且过一会儿重新发射这个tuple。
为了利用Storm的可靠性特性,在你发出一个新的tuple以及你完成处理一个tuple的时候你必须要通知storm。这一切是由O
utputCollector来完成的。通过emit方法来通知一个新的tuple产生了,通过ack方法通知一个tuple处理完成了。
Storm的可靠性我们在Storm入门教程4会深入介绍。每一个spout和bolt会被当作很多task在整个集群里执行。每一个
executor对应到一个线程,在这个线程上运行多个task,而streamgrouping则是定义怎么从一堆task发射tup
le到另外一堆task。你可以调用TopologyBuilder类的setSpout和setBolt来设置并行度(也就是有多少个t
ask)。一个topology可能会在一个或者多个worker(工作进程)里面执行,每个worker是一个物理JVM并且执行整个
topology的一部分。比如,对于并行度是300的topology来说,如果我们使用50个工作进程来执行,那么每个工作进程会处理
其中的6个tasks。Storm会尽量均匀的工作分配给所有的worker。Storm里面有一堆参数可以配置来调整Nimbus,
Supervisor以及正在运行的topology的行为,一些配置是系统级别的,一些配置是topology级别的。default.
yaml里面有所有的默认配置。你可以通过定义个storm.yaml在你的classpath里来覆盖这些默认配置。并且你也可以在代码
里面设置一些topology相关的配置信息(使用StormSubmitter)。2.2构建Topology我们将设计一个top
ology,来实现对一个句子里面的单词出现的频率进行统计。这是一个简单的例子,目的是让大家对于topology快速上手,有一个初步
的理解。在开始开发Storm项目的第一步,就是要设计topology。确定好你的数据处理逻辑,我们今天将的这个简单的例子,top
ology也非常简单。整个topology如下:?整个topology分为三个部分:KestrelSpout:数据源,负责发
送sentenceSplitsentence:负责将sentence切分Wordcount:负责对单词的频率进行累加这个to
pology从kestrelqueue读取句子,并把句子划分成单词,然后汇总每个单词出现的次数,一个tuple负责读取句子,每一
个tuple分别对应计算每一个单词出现的次数,大概样子如下所示:?1)构建maven环境:为了开发stormtopolo
gy,你需要把storm相关的jar包添加到classpath里面去:要么手动添加所有相关的jar包,要么使用maven来管
理所有的依赖。storm的jar包发布在Clojars(一个maven库),如果你使用maven的话,把下面的配置添加在你项目的
pom.xml里面。clojars.orghttp://clojars.
org/repo%3C/url%3Ehttp://clojars.org/repo
pendency>stormstorm
0.5.3test2)定义
topology:TopologyBuilderbuilder=newTopologyBuilder();builde
r.setSpout(1,newKestrelSpout(“kestrel.backtype.com”,22133,”sen
tence_queue”,newStringScheme()));builder.setBolt(2,newSplitS
entence(),10).shuffleGrouping(1);builder.setBolt(3,newWordCo
unt(),20).fieldsGrouping(2,newFields(“word”));这种topology的spo
ut从句子队列中读取句子,在kestrel.backtype.com位于一个Kestrel的服务器端口22133。Spout用s
etSpout方法插入一个独特的id到topology。Topology中的每个节点必须给予一个id,id是由其他bolts用于
订阅该节点的输出流。KestrelSpout在topology中id为1。setBolt是用于在Topology中插入bolt
s。在topology中定义的第一个bolts是切割句子的bolts。这个bolts将句子流转成成单词流。让我们看看Sp
litSentence实施:publicclassSplitSentenceimplementsIBasicBolt{
publicvoidprepare(Mapconf,TopologyContextcontext){}public
voidexecute(Tupletuple,BasicOutputCollectorcollector){Stri
ngsentence=tuple.getString(0);for(Stringword:sentence.split
(“”)){collector.emit(newValues(word));}}publicvoidcleanu
p(){}publicvoiddeclareOutputFields(OutputFieldsDeclarerdecl
arer){declarer.declare(newFields(“word”));}关键的方法是execute方法。
正如你可以看到,它将句子拆分成单词,并发出每个单词作为一个新的元组。另一个重要的方法是declareOutputFields,
其中宣布bolts输出元组的架构。在这里宣布,它发出一个域为word的元组setBolt的最后一个参数是你想为bolts的并行
量。SplitSentencebolts是10个并发,这将导致在storm集群中有十个线程并行执行。你所要做的的是增加bo
lts的并行量在遇到topology的瓶颈时。setBolt方法返回一个对象,用来定义bolts的输入。例如,SplitSen
tence螺栓订阅组件“1”使用随机分组的输出流。“1”是指已经定义KestrelSpout。我将解释在某一时刻的随机分组的一
部分。到目前为止,最要紧的是,SplitSentencebolts会消耗KestrelSpout发出的每一个元组。下面在让我
们看看wordcount的实现:publicclassWordCountimplementsIBasicBolt{p
rivateMap_counts=newHashMap>();publicvoidprepare(Mapconf,TopologyContextcontext){}p
ublicvoidexecute(Tupletuple,BasicOutputCollectorcollector){
Stringword=tuple.getString(0);intcount;if(_counts.contains
Key(word)){count=_counts.get(word);}else{count=0;}cou
nt++;_counts.put(word,count);collector.emit(newValues(word,c
ount));}publicvoidcleanup(){}publicvoiddeclareOutputFiel
ds(OutputFieldsDeclarerdeclarer){declarer.declare(newFields(“
word”,“count”));}}SplitSentence对于句子里面的每个单词发射一个新的tuple,WordCo
unt在内存里面维护一个单词->次数的mapping,WordCount每收到一个单词,它就更新内存里面的统计状态。stor
m的运行有两种模式:本地模式和分布式模式.1)本地模式:storm用一个进程里面的线程来模拟所有的spout和bolt.
本地模式对开发和测试来说比较有用。你运行storm-starter里面的topology的时候它们就是以本地模式运行的,你可以
看到topology里面的每一个组件在发射什么消息。2)分布式模式:storm由一堆机器组成。当你提交topology给ma
ster的时候,你同时也把topology的代码提交了。master负责分发你的代码并且负责给你的topolgoy分配工作进程。
如果一个工作进程挂掉了,master节点会把认为重新分配到其它节点。3)下面是以本地模式运行的代码:Configconf
=newConfig();conf.setDebug(true);conf.setNumWorkers(2);Loca
lClustercluster=newLocalCluster();cluster.submitTopology(“te
st”,conf,builder.createTopology());Utils.sleep(10000);cluster
.killTopology(“test”);cluster.shutdown();首先,这个代码定义通过定义一个LocalC
luster对象来定义一个进程内的集群。提交topology给这个虚拟的集群和提交topology给分布式集群是一样的。通过调用s
ubmitTopology方法来提交topology,它接受三个参数:要运行的topology的名字,一个配置对象以及要运行的t
opology本身。topology的名字是用来唯一区别一个topology的,这样你然后可以用这个名字来杀死这个topolog
y的。前面已经说过了,你必须显式的杀掉一个topology,否则它会一直运行。Conf对象可以配置很多东西,下面两个是最常
见的:TOPOLOGY_WORKERS(setNumWorkers)定义你希望集群分配多少个工作进程给你来执行这个topolo
gy.topology里面的每个组件会被需要线程来执行。每个组件到底用多少个线程是通过setBolt和setSpout来指定的。
这些线程都运行在工作进程里面.每一个工作进程包含一些节点的一些工作线程。比如,如果你指定300个线程,60个进程,那么每个工
作进程里面要执行6个线程,而这6个线程可能属于不同的组件(Spout,Bolt)。你可以通过调整每个组件的并行度以及这些线程所
在的进程数量来调整topology的性能。TOPOLOGY_DEBUG(setDebug),当它被设置成true的话,sto
rm会记录下每个组件所发射的每条消息。这在本地环境调试topology很有用,但是在线上这么做的话会影响性能的。结论:本章从s
torm的基本对象的定义,到广泛的介绍了storm的开发环境,从一个简单的例子讲解了topology的构建和定义。希望大家可以从本
章的内容对storm有一个基本的理解和概念,并且已经可以构建一个简单的topology!!第三章Storm安装部署步骤本文以Tw
itterStorm官方Wiki为基础,详细描述如何快速搭建一个Storm集群,其中,项目实践中遇到的问题及经验总结,在相应章节
以“注意事项”的形式给出。3.1Storm集群组件Storm集群中包含两类节点:主控节点(MasterNode)和工作节点(W
orkNode)。其分别对应的角色如下:1.主控节点(MasterNode)上运行一个被称为Nimbus的后台程序,它负责在
Storm集群内分发代码,分配任务给工作机器,并且负责监控集群运行状态。Nimbus的作用类似于Hadoop中JobTracker
的角色。2.每个工作节点(WorkNode)上运行一个被称为Supervisor的后台程序。Supervisor负责监听从Ni
mbus分配给它执行的任务,据此启动或停止执行任务的工作进程。每一个工作进程执行一个Topology的子集;一个运行中的Topol
ogy由分布在不同工作节点上的多个工作进程组成。?Storm集群组件Nimbus和Supervisor节点之间所有的协调工作是通过
Zookeeper集群来实现的。此外,Nimbus和Supervisor进程都是快速失败(fail-fast)和无状态(state
less)的;Storm集群所有的状态要么在Zookeeper集群中,要么存储在本地磁盘上。这意味着你可以用kill-9来杀死N
imbus和Supervisor进程,它们在重启后可以继续工作。这个设计使得Storm集群拥有不可思议的稳定性。3.2安装Stor
m集群这一章节将详细描述如何搭建一个Storm集群。下面是接下来需要依次完成的安装步骤:1.搭建Zookeeper集群;2.安
装Storm依赖库;3.下载并解压Storm发布版本;4.修改storm.yaml配置文件;5.启动Storm各个后台进程。
3.2.1搭建Zookeeper集群Storm使用Zookeeper协调集群,由于Zookeeper并不用于消息传递,所以Stor
m给Zookeeper带来的压力相当低。大多数情况下,单个节点的Zookeeper集群足够胜任,不过为了确保故障恢复或者部署大规模
Storm集群,可能需要更大规模节点的Zookeeper集群(对于Zookeeper集群的话,官方推荐的最小节点数为3个)。在Zo
okeeper集群的每台机器上完成以下安装部署步骤:1.下载安装JavaJDK,官方下载链接为http://www.oracl
e.com/technetwork/java/index.htmlhttp://java.sun.com/javase/downl
oads/index.jsp,JDK版本为JDK6或以上。2.根据Zookeeper集群的负载情况,合理设置Java堆大小,尽
可能避免发生swap,导致Zookeeper性能下降。保守起见,4GB内存的机器可以为Zookeeper分配3GB最大堆空间。3.
下载后解压安装Zookeeper包,官方下载链接为http://zookeeper.apache.org/releases.ht
ml?spm=0.0.0.0.SMAyaXhttp://hadoop.apache.org/zookeeper/releases.
html。4.根据Zookeeper集群节点情况,在conf目录下创建Zookeeper配置文件zoo.cfg:tickTime
=2000?dataDir=/var/zookeeper/?clientPort=2181?initLimit=5?syncLim
it=2?server.1=zoo1:2888:3888?server.2=zoo2:2888:3888?server.3=zoo
3:2888:3888复制代码其中,dataDir指定Zookeeper的数据文件目录;其中server.id=host:port
:port,id是为每个Zookeeper节点的编号,保存在dataDir目录下的myid文件中,zoo1~zoo3表示各个Zoo
keeper节点的hostname,第一个port是用于连接leader的端口,第二个port是用于leader选举的端口。5.
在dataDir目录下创建myid文件,文件中只包含一行,且内容为该节点对应的server.id中的id编号。6.启动Zooke
eper服务:java-cpzookeeper.jar:lib/log4j-1.2.15.jar:conf\org.apa
che.zookeeper.server.quorum.QuorumPeerMainzoo.cfg复制代码或者bin/zkSer
ver.shstart复制代码7.通过Zookeeper客户端测试服务是否可用:java-cpzookeeper.jar:
src/java/lib/log4j-1.2.15.jar:conf:src/java/lib/jline-0.9.94.jar
\org.apache.zookeeper.ZooKeeperMain-server127.0.0.1:2181复制代码或者
bin/zkCli.sh-server127.0.0.1:2181复制代码注意事项:由于Zookeeper是快速失败(fail
-fast)的,且遇到任何错误情况,进程均会退出,因此,最好能通过监控程序将Zookeeper管理起来,保证Zookeeper退出
后能被自动重启。详情参考http://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin
.html这里。Zookeeper运行过程中会在dataDir目录下生成很多日志和快照文件,而Zookeeper运行进程并不负责定
期清理合并这些文件,导致占用大量磁盘空间,因此,需要通过cron等方式定期清除没用的日志和快照文件。详情参考http://zook
eeper.apache.org/doc/r3.3.3/zookeeperAdmin.html这里。具体命令格式如下:java-
cpzookeeper.jar:log4j.jar:conforg.apache.zookeeper.server.Purge
TxnLog-n3.2.2安装Storm依赖库接下来,需要在Nimbus
和Supervisor机器上安装Storm的依赖库,具体如下:1.http://www.zeromq.org/area:downl
oadZeroMQ2.1.7?–请勿使用2.1.10版本,因为该版本的一些严重bug会导致Storm集群运行时出现奇怪的问题。
少数用户在2.1.7版本会遇到”IllegalArgumentException”的异常,此时降为2.1.4版本可修复这一问题。2
.?http://github.com/nathanmarz/jzmqJZMQ3.Java64.Python2.6.65.
unzip以上依赖库的版本是经过Storm测试的,Storm并不能保证在其他版本的Java或Python库下可运行。3.2.2.
1安装ZMQ2.1.7下载后编译安装ZMQ:wgethttp://download.zeromq.org/zeromq-2.
1.7.tar.gztar-xzfzeromq-2.1.7.tar.gzcdzeromq-2.1.7./configurem
akesudomakeinstall复制代码注意事项:如果安装过程报错uuid找不到,则通过如下的包安装uuid库:如果安装过
程报错uuid找不到,则通过如下的包安装uuid库:sudoyuminstalle2fsprogslsudoyumins
talle2fsprogs-devel复制代码3.2.2.2安装JZMQ下载后编译安装JZMQ:gitclonehttps
://github.com/nathanmarz/jzmq.gitcdjzmq./autogen.sh./configurema
kesudomakeinstall复制代码为了保证JZMQ正常工作,可能需要完成以下配置:正确设置JAVA_HOME环境变量
安装Java开发包升级autoconf如果你是MacOSX,参考http://blog.pmorelli.com/getting
-zeromq-and-jzmq-running-on-mac-os-x这里注意事项:如果运行./configure命令出现问
题,参考http://stackoverflow.com/questions/3522248/how-do-i-compile-j
zmq-for-zeromq-on-osx这里。3.2.2.3安装Java61.下载并安装JDK6,参考http://ww
w.oracle.com/technetwork/java/javase/index-137561.html这里;2.配置JAV
A_HOME环境变量;3.运行java、javac命令,测试java正常安装。3.2.2.4安装Python2.6.61.下
载Python2.6.6:wget[url]http://www.python.org/ftp/python/2.6.6/Pyt
hon-2.6.6.tar.bz2[/url]复制代码2.编译安装Python2.6.6:tar–jxvfPython-2.
6.6.tar.bz2cdPython-2.6.6./configuremakemakeinstall复制代码3.测试Pyt
hon2.6.6:python-VPython2.6.6复制代码3.2.2.5安装unzip1.如果使用RedHat系列L
inux系统,执行以下命令安装unzip:yuminstallunzip复制代码2.如果使用Debian系列Linux系统,
执行以下命令安装unzip:apt-getinstallunzip复制代码3.2.3下载并解压Storm发布版本下一步,需要
在Nimbus和Supervisor机器上安装Storm发行版本。1.下载Storm发行版本,推荐使用Storm0.8.1:wg
ethttps://github.com/downloads/nathanmarz/storm/storm-0.8.1.zip复
制代码2.解压到安装目录下:unzipstorm-0.8.1.zip复制代码3.2.4修改storm.yaml配置文件Sto
rm发行版本解压目录下有一个conf/storm.yaml文件,用于配置Storm。默认配置在这里可以查看。conf/storm.
yaml中的配置选项将覆盖defaults.yaml中的默认配置。以下配置选项是必须在conf/storm.yaml中进行配置的:
1)storm.zookeeper.servers:?Storm集群使用的Zookeeper集群地址,其格式如下:storm.z
ookeeper.servers:-“111.222.333.444″-“555.666.777.888″复制代码如果Zook
eeper集群使用的不是默认端口,那么还需要storm.zookeeper.port选项。2)?storm.local.dir:
Nimbus和Supervisor进程用于存储少量状态,如jars、confs等的本地磁盘目录,需要提前创建该目录并给以足够的访问
权限。然后在storm.yaml中配置该目录,如:storm.local.dir:"/home/admin/storm/work
dir"复制代码3)?java.library.path:Storm使用的本地库(ZMQ和JZMQ)加载路径,默认为”/usr/
local/lib:/opt/local/lib:/usr/lib”,一般来说ZMQ和JZMQ默认安装在/usr/local/li
b下,因此不需要配置即可。4)?nimbus.host:Storm集群Nimbus机器地址,各个Supervisor工作节点需
要知道哪个机器是Nimbus,以便下载Topologies的jars、confs等文件,如:nimbus.host:"111.2
22.333.444"复制代码5)?supervisor.slots.ports:对于每个Supervisor工作节点,需要配置
该工作节点可以运行的worker数量。每个worker占用一个单独的端口用于接收消息,该配置选项即用于定义哪些端口是可被worke
r使用的。默认情况下,每个节点上可运行4个workers,分别在6700、6701、6702和6703端口,如:superviso
r.slots.ports:-6700-6701-6702-6703复制代码3.2.5启动Storm各个后台进程
最后一步,启动Storm的所有后台进程。和Zookeeper一样,Storm也是快速失败(fail-fast)的系统,这样Stor
m才能在任意时刻被停止,并且当进程重启后被正确地恢复执行。这也是为什么Storm不在进程内保存状态的原因,即使Nimbus或Sup
ervisors被重启,运行中的Topologies不会受到影响。以下是启动Storm各个后台进程的方式:Nimbus:在Sto
rm主控节点上运行”bin/stormnimbus>/dev/null2>&1&”启动Nimbus后台程序,并放到后台执行
;Supervisor:在Storm各个工作节点上运行”bin/stormsupervisor>/dev/null2>&1
&”启动Supervisor后台程序,并放到后台执行;UI:在Storm主控节点上运行”bin/stormui>/dev/
null2>&1&”启动UI后台程序,并放到后台执行,启动后可以通过http://{nimbushost}:8080观察集群
的worker资源使用情况、Topologies的运行状态等信息。注意事项:启动Storm后台进程时,需要对conf/storm.
yaml配置文件中设置的storm.local.dir目录具有写权限。Storm后台进程被启动后,将在Storm安装部署目录下的l
ogs/子目录下生成各个进程的日志文件。经测试,StormUI必须和StormNimbus部署在同一台机器上,否则UI无法正常
工作,因为UI进程会检查本机是否存在Nimbus链接。为了方便使用,可以将bin/storm加入到系统环境变量中。至此,Storm
集群已经部署、配置完毕,可以向集群提交拓扑运行了。3.3向集群提交任务1.启动StormTopology:stormjar
allmycode.jarorg.me.MyTopologyarg1arg2arg3复制代码其中,allmycode.j
ar是包含Topology实现代码的jar包,org.me.MyTopology的main方法是Topology的入口,arg1、
arg2和arg3为org.me.MyTopology执行时需要传入的参数。2.停止StormTopology:stormk
ill{toponame}其中,{toponame}为Topology提交到Storm集群时指定的Topology任务名称。3.
4参考资料1.?https://github.com/nathanmarz/storm/wiki/Tutorialhttps:/
/github.com/nathanmarz/storm/wiki/Tutorialhttps://github.com/nath
anmarz/storm/wiki/Setting-up-a-Storm-clusterhttps://github.com/na
thanmarz/st...-up-a-Storm-cluster3.http://blog.linezing.com/?p=
1892第四章torm消息的可靠处理4.1简介storm可以确保spout发送出来的每个消息都会被完整的处理。本章将会描述st
orm体系是如何达到这个目标的,并将会详述开发者应该如何使用storm的这些机制来实现数据的可靠处理。4.2理解消息被完整处理一
个消息(tuple)从spout发送出来,可能会导致成百上千的消息基于此消息被创建。我们来思考一下流式的“单词统计”的例子:sto
rm任务从数据源(Kestrelqueue)每次读取一个完整的英文句子;将这个句子分解为独立的单词,最后,实时的输出每个单词以及
它出现过的次数。本例中,每个从spout发送出来的消息(每个英文句子)都会触发很多的消息被创建,那些从句子中分隔出来的单词就是被创
建出来的新消息。这些消息构成一个树状结构,我们称之为“tupletree”,看起来如图1所示:图1示例tupletree在什
么条件下,Storm才会认为一个从spout发送出来的消息被完整处理呢?答案就是下面的条件同时被满足:tupletree不再生长
树中的任何消息被标识为“已处理”如果在指定的时间内,一个消息衍生出来的tupletree未被完全处理成功,则认为此消息未被完整处
理。这个超时值可以通过任务级参数http://nathanmarz.github.com/storm/doc/backtype/s
torm/Config.htmlConfig.TOPOLOGY_MESSAGE_TIMEOUT_SECS?进行配置,默认超时值为3
0秒。4.3消息的生命周期如果消息被完整处理或者未被完整处理,Storm会如何进行接下来的操作呢?为了弄清这个问题,我们来研究一
下从spout发出来的消息的生命周期。这里列出了spout应该实现的接口:?首先,Storm使用spout实例的nextTupl
e()方法从spout请求一个消息(tuple)。收到请求以后,spout使用open方法中提供的SpoutOutputColl
ector向它的输出流发送一个或多个消息。每发送一个消息,Spout会给这个消息提供一个messageID,它将会被用来标识这个
消息。假设我们从kestrel队列中读取消息,Spout会将kestrel队列为这个消息设置的ID作为此消息的messageI
D。向SpoutOutputCollector中发送消息格式如下:?接来下,这些消息会被发送到后续业务处理的bolts,并且S
torm会跟踪由此消息产生出来的新消息。当检测到一个消息衍生出来的tupletree被完整处理后,Storm会调用Spout中的
ack方法,并将此消息的messageID作为参数传入。同理,如果某消息处理超时,则此消息对应的Spout的fail方法会被调用,
调用时此消息的messageID会被作为参数传入。注意:一个消息只会由发送它的那个spout任务来调用ack或fail。如果系统中
某个spout由多个任务运行,消息也只会由创建它的spout任务来应答(ack或fail),决不会由其他的spout任务来应答。我
们继续使用从kestrel队列中读取消息的例子来阐述高可靠性下spout需要做些什么(假设这个spout的名字是KestrelSp
out)。我们先简述一下kestrel消息队列:当KestrelSpout从kestrel队列中读取一个消息,表示它“打开”了队列
中某个消息。这意味着,此消息并未从队列中真正的删除,而是将此消息设置为“pending”状态,它等待来自客户端的应答,被应答以后,
此消息才会被真正的从队列中删除。处于“pending”状态的消息不会被其他的客户端看到。另外,如果一个客户端意外的断开连接,则由此
客户端“打开”的所有消息都会被重新加入到队列中。当消息被“打开”的时候,kestrel队列同时会为这个消息提供一个唯一的标识。Ke
strelSpout就是使用这个唯一的标识作为这个tuple的messageID的。稍后当ack或fail被调用的时候,Kestr
elSpout会把ack或者fail连同messageID一起发送给kestrel队列,kestrel会将消息从队列中真正删除或者
将它重新放回队列中。4.4靠相关的API为了使用Storm提供的可靠处理特性,我们需要做两件事情:无论何时在tupletree
中创建了一个新的节点,我们需要明确的通知Storm;当处理完一个单独的消息时,我们需要告诉Storm这棵tupletree的变
化状态。通过上面的两步,storm就可以检测到一个tupletree何时被完全处理了,并且会调用相关的ack或fail方法。St
orm提供了简单明了的方法来完成上述两步。为tupletree中指定的节点增加一个新的节点,我们称之为锚定(anchoring)
。锚定是在我们发送消息的同时进行的。为了更容易说明问题,我们使用下面代码作为例子。本示例的bolt将包含整句话的消息分解为一系列的
子消息,每个子消息包含一个单词。?每个消息都通过这种方式被锚定:把输入消息作为emit方法的第一个参数。因为word消息被锚定在了
输入消息上,这个输入消息是spout发送过来的tupletree的根节点,如果任意一个word消息处理失败,派生这个tuple
tree那个spout消息将会被重新发送。与此相反,我们来看看使用下面的方式emit消息时,Storm会如何处理:?如果以这种方
式发送消息,将会导致这个消息不会被锚定。如果此tupletree中的消息处理失败,派生此tupletree的根消息不会被重新发
送。根据任务的容错级别,有时候很适合发送一个非锚定的消息。一个输出消息可以被锚定在一个或者多个输入消息上,这在做join或聚合的时
候是很有用的。一个被多重锚定的消息处理失败,会导致与之关联的多个spout消息被重新发送。多重锚定通过在emit方法中指定多个输入
消息来实现:多重锚定会将被锚定的消息加到多棵tupletree上。注意:多重绑定可能会破坏传统的树形结构,从而构成一个DAGs(
有向无环图),如图2所示:?图2多重锚定构成的钻石型结构Storm的实现可以像处理树那样来处理DAGs。锚定表明了如何将一个消息
加入到指定的tupletree中,高可靠处理API的接下来部分将向您描述当处理完tupletree中一个单独的消息时我们该做些
什么。这些是通过OutputCollector的ack和fail方法来实现的。回头看一下例子SplitSentence,可以发现
当所有的word消息被发送完成后,输入的表示句子的消息会被应答(acked)。每个被处理的消息必须表明成功或失败(acked或者
failed)。Storm是使用内存来跟踪每个消息的处理情况的,如果被处理的消息没有应答的话,迟早内存会被耗尽!很多bolt遵循特
定的处理流程:读取一个消息、发送它派生出来的子消息、在execute结尾处应答此消息。一般的过滤器(filter)或者是简单的处
理功能都是这类的应用。Storm有一个BasicBolt接口封装了上述的流程。示例SplitSentence可以使用BasicBo
lt来重写:?使用这种方式,代码比之前稍微简单了一些,但是实现的功能是一样的。发送到BasicOutputCollector的消息
会被自动的锚定到输入消息,并且,当execute执行完毕的时候,会自动的应答输入消息。很多情况下,一个消息需要延迟应答,例如聚合或
者是join。只有根据一组输入消息得到一个结果之后,才会应答之前所有的输入消息。并且聚合和join大部分时候对输出消息都是多重锚定
。然而,这些特性不是IBasicBolt所能处理的。4.5高效的实现tupletreeStorm系统中有一组叫做“acker
”的特殊的任务,它们负责跟踪DAG(有向无环图)中的每个消息。每当发现一个DAG被完全处理,它就向创建这个根消息的spout任务发
送一个信号。拓扑中acker任务的并行度可以通过配置参数http://nathanmarz.github.com/storm/do
c/backtype/storm/Config.htmlConfig.TOPOLOGY_ACKERS来设置。默认的acker任务并
行度为1,当系统中有大量的消息时,应该适当提高acker任务的并发度。为了理解Storm可靠性处理机制,我们从研究一个消息的生命周
期和tupletree的管理入手。当一个消息被创建的时候(无论是在spout还是bolt中),系统都为该消息分配一个64bit的
随机值作为id。这些随机的id是acker用来跟踪由spout消息派生出来的tupletree的。每个消息都知道它所在的tupl
etree对应的根消息的id。每当bolt新生成一个消息,对应tupletree中的根消息的messageId就拷贝到这个消息
中。当这个消息被应答的时候,它就把关于tupletree变化的信息发送给跟踪这棵树的acker。例如,他会告诉acker:本消息
已经处理完毕,但是我派生出了一些新的消息,帮忙跟踪一下吧。举个例子,假设消息D和E是由消息C派生出来的,这里演示了消息C被应答时,
tupletree是如何变化的。?因为在C被从树中移除的同时D和E会被加入到tupletree中,因此tupletree不会
被过早的认为已完全处理。关于Storm如何跟踪tupletree,我们再深入的探讨一下。前面说过系统中可以有任意个数的acker
,那么,每当一个消息被创建或应答的时候,它怎么知道应该通知哪个acker呢?系统使用一种哈希算法来根据spout消息的messag
eId确定由哪个acker跟踪此消息派生出来的tupletree。因为每个消息都知道与之对应的根消息的messageId,因此它
知道应该与哪个acker通信。当spout发送一个消息的时候,它就通知对应的acker一个新的根消息产生了,这时acker就会创建
一个新的tupletree。当acker发现这棵树被完全处理之后,他就会通知对应的spout任务。tuple是如何被跟踪的呢?系
统中有成千上万的消息,如果为每个spout发送的消息都构建一棵树的话,很快内存就会耗尽。所以,必须采用不同的策略来跟踪每个消息。由
于使用了新的跟踪算法,Storm只需要固定的内存(大约20字节)就可以跟踪一棵树。这个算法是storm正确运行的核心,也是stor
m最大的突破。acker任务保存了spout消息id到一对值的映射。第一个值就是spout的任务id,通过这个id,acker就知
道消息处理完成时该通知哪个spout任务。第二个值是一个64bit的数字,我们称之为“ackval”,它是树中所有消息的随机i
d的异或结果。ackval表示了整棵树的的状态,无论这棵树多大,只需要这个固定大小的数字就可以跟踪整棵树。当消息被创建和被应答的
时候都会有相同的消息id发送过来做异或。每当acker发现一棵树的ackval值为0的时候,它就知道这棵树已经被完全处理了。因为
消息的随机ID是一个64bit的值,因此ackval在树处理完之前被置为0的概率非常小。假设你每秒钟发送一万个消息,从概率上说,
至少需要50,000,000年才会有机会发生一次错误。即使如此,也只有在这个消息确实处理失败的情况下才会有数据的丢失!4.6选择
合适的可靠性级别Acker任务是轻量级的,所以在拓扑中并不需要太多的acker存在。可以通过StormUI来观察acker任务的
吞吐量,如果看上去吞吐量不够的话,说明需要添加额外的acker。如果你并不要求每个消息必须被处理(你允许在处理过程中丢失一些信息)
,那么可以关闭消息的可靠处理机制,从而可以获取较好的性能。关闭消息的可靠处理机制意味着系统中的消息数会减半(每个消息不需要应答了)
。另外,关闭消息的可靠处理可以减少消息的大小(不需要每个tuple记录它的根id了),从而节省带宽。有三种方法可以关系消息的可靠处
理机制:将参数Config.TOPOLOGY_ACKERS设置为0,通过此方法,当Spout发送一个消息的时候,它的ack方法将立
刻被调用;第二个方法是Spout发送一个消息时,不指定此消息的messageID。当需要关闭特定消息可靠性的时候,可以使用此方法;
最后,如果你不在意某个消息派生出来的子孙消息的可靠性,则此消息派生出来的子消息在发送时不要做锚定,即在emit方法中不指定输入消息
。因为这些子孙消息没有被锚定在任何tupletree中,因此他们的失败不会引起任何spout重新发送消息。4.7集群的各级容错
到现在为止,大家已经理解了Storm的可靠性机制,并且知道了如何选择不同的可靠性级别来满足需求。接下来我们研究一下Storm如何保
证在各种情况下确保数据不丢失。4.7.1任务级失败因为bolt任务crash引起的消息未被应答。此时,acker中所有与此bol
t任务关联的消息都会因为超时而失败,对应spout的fail方法将被调用。acker任务失败。如果acker任务本身失败了,它在失
败之前持有的所有消息都将会因为超时而失败。Spout的fail方法将被调用。Spout任务失败。这种情况下,Spout任务对接的外
部设备(如MQ)负责消息的完整性。例如当客户端异常的情况下,kestrel队列会将处于pending状态的所有的消息重新放回到队列
中。4.7.2??任务槽(slot)故障worker失败。每个worker中包含数个bolt(或spout)任务。supervi
sor负责监控这些任务,当worker失败后,supervisor会尝试在本机重启它。supervisor失败。superviso
r是无状态的,因此supervisor的失败不会影响当前正在运行的任务,只要及时的将它重新启动即可。supervisor不是自举的
,需要外部监控来及时重启。nimbus失败。nimbus是无状态的,因此nimbus的失败不会影响当前正在运行的任务(nimbus
失败时,无法提交新的任务),只要及时的将它重新启动即可。nimbus不是自举的,需要外部监控来及时重启。4.7.3.??集群节点(
机器)故障storm集群中的节点故障。此时nimbus会将此机器上所有正在运行的任务转移到其他可用的机器上运行。zookeeper
集群中的节点故障。zookeeper保证少于半数的机器宕机仍可正常运行,及时修复故障机器即可。4.8小结本章介绍了storm集群
如何实现数据的可靠处理。借助于创新性的tupletree跟踪技术,storm高效的通过数据的应答机制来保证数据不丢失。storm
集群中除nimbus外,没有单点存在,任何节点都可以出故障而保证数据不会丢失。nimbus被设计为无状态的,只要可以及时重启,就不
会影响正在运行的任务。第五章一致性事务Storm是一个分布式的流处理系统,利用anchor和ack机制保证所有tuple都被成
功处理。如果tuple出错,则可以被重传,但是如何保证出错的tuple只被处理一次呢?Storm提供了一套事务性组件Transac
tionTopology,用来解决这个问题。TransactionalTopology目前已经不再维护,由Trident来实
现事务性topology,但是原理相同。5.1一致性事务的设计Storm如何实现即对tuple并行处理,又保证事务性。本节从简单
的事务性实现方法入手,逐步引出TransactionalTopology的原理。保证tuple只被处理一次,最简单的方法就是将
tuple流变成强顺序的,并且每次只处理一个tuple。从1开始,给每个tuple都顺序加上一个id。在处理tuple的时候,将处
理成功的tupleid和计算结果存在数据库中。下一个tuple到来的时候,将其id与数据库中的id做比较。如果相同,则说明这个t
uple已经被成功处理过了,忽略它;如果不同,根据强顺序性,说明这个tuple没有被处理过,将它的id及计算结果更新到数据库中。
以统计消息总数为例。每来一个tuple,如果数据库中存储的id与当前tupleid不同,则数据库中的消息总数加1,同时更新数据
库中的当前tupleid值。如图:?但是这种机制使得系统一次只能处理一个tuple,无法实现分布式计算。为了实现分布式,我
们可以每次处理一批tuple,称为一个batch。一个batch中的tuple可以被并行处理。我们要保证一个batch只被处理一
次,机制和上一节类似。只不过数据库中存储的是batchid。batch的中间计算结果先存在局部变量中,当一个batch中的所有t
uple都被处理完之后,判断batchid,如果跟数据库中的id不同,则将中间计算结果更新到数据库中。如何确保一个batch里
面的所有tuple都被处理完了呢?可以利用Storm提供的CoordinateBolt。如图:?但是强顺序batch流也有局限
,每次只能处理一个batch,batch之间无法并行。要想实现真正的分布式事务处理,可以使用storm提供的Transaction
alTopology。在此之前,我们先详细介绍一下CoordinateBolt的原理。CoordinateBolt具体原理如下
:真正执行计算的bolt外面封装了一个CoordinateBolt。真正执行任务的bolt我们称为realbolt。每个Co
ordinateBolt记录两个值:有哪些task给我发送了tuple(根据topology的grouping信息);我要给哪些t
uple发送信息(同样根据groping信息)Realbolt发出一个tuple后,其外层的CoordinateBolt会记录
下这个tuple发送给哪个task了。等所有的tuple都发送完了之后,CoordinateBolt通过另外一个特殊的strea
m以emitDirect的方式告诉所有它发送过tuple的task,它发送了多少tuple给这个task。下游task会将这个数字
和自己已经接收到的tuple数量做对比,如果相等,则说明处理完了所有的tuple。下游CoordinateBolt会重复上面的步
骤,通知其下游。整个过程如图所示:?CoordinateBolt主要用于两个场景:DRPCTransactionalT
opologyCoordinatedBolt对于业务是有侵入的,要使用CoordinatedBolt提供的功能,你必须要保证你的
每个bolt发送的每个tuple的第一个field是request-id。所谓的“我已经处理完我的上游”的意思是说当前这个bol
t对于当前这个request-id所需要做的工作做完了。这个request-id在DRPC里面代表一个DRPC请求;在Transa
ctionalTopology里面代表一个batch。Storm提供的TransactionalTopology将batch
计算分为process和commit两个阶段。Process阶段可以同时处理多个batch,不用保证顺序性;commit阶段保证b
atch的强顺序性,并且一次只能处理一个batch,第1个batch成功提交之前,第2个batch不能被提交。还是以统计消息总数
为例,以下代码来自storm-starter里面的TransactionalGlobalCount。MemoryTransact
ionalSpoutspout=newMemoryTransactionalSpout(DATA,newFields(“
word“),PARTITION_TAKE_PER_BATCH);TransactionalTopologyBuilderbu
ilder=newTransactionalTopologyBuilder(“global-count“,“spout“,
spout,3);builder.setBolt(“partial-count“,newBatchCount(),5).
noneGrouping(“spout“);builder.setBolt(“sum“,newUpdateGlobalCoun
t()).globalGrouping(“partial-count“);TransactionalTopologyBuilde
r共接收四个参数。这个TransactionalTopology的id。Id用来在Zookeeper中保存当前topology
的进度,如果这个topology重启,可以继续之前的进度执行。Spout在这个topology中的id一个Transactio
nalSpout。一个TrasactionalTopology中只能有一个TrasactionalSpout.在本例中是一个Me
moryTransactionalSpout,从一个内存变量(DATA)中读取数据。TransactionalSpout的并行度
(可选)。下面是BatchCount的定义:publicstaticclassBatchCountextendsBa
seBatchBolt{Object_id;BatchOutputCollector_collector;int_c
ount=0;@Overridepublicvoidprepare(Mapconf,TopologyContext
context,BatchOutputCollectorcollector,Objectid){_collector
=collector;_id=id;}@Overridepublicvoidexecute(Tupletup
le){_count++;}@OverridepublicvoidfinishBatch(){_collecto
r.emit(newValues(_id,_count));}@OverridepublicvoiddeclareO
utputFields(OutputFieldsDeclarerdeclarer){declarer.declare(new
Fields(“id“,“count“));}}BatchCount的prepare方法的最后一个参数是batchid
,在TransactionalTolpoloyg里面这id是一个TransactionAttempt对象。Transactio
nalTopology里发送的tuple都必须以TransactionAttempt作为第一个field,storm根据这个fi
eld来判断tuple属于哪一个batch。TransactionAttempt包含两个值:一个transactionid,一
个attemptid。transactionid的作用就是我们上面介绍的对于每个batch中的tuple是唯一的,而且不管这个
batchreplay多少次都是一样的。attemptid是对于每个batch唯一的一个id,但是对于同一个batch,它r
eplay之后的attemptid跟replay之前就不一样了,我们可以把attemptid理解成replay-times,
storm利用这个id来区别一个batch发射的tuple的不同版本。execute方法会为batch里面的每个tuple执行
一次,你应该把这个batch里面的计算状态保持在一个本地变量里面。对于这个例子来说,它在execute方法里面递增tuple的个
数。最后,当这个bolt接收到某个batch的所有的tuple之后,finishBatch方法会被调用。这个例子里面的Bat
chCount类会在这个时候发射它的局部数量到它的输出流里面去。下面是UpdateGlobalCount类的定义:public
staticclassUpdateGlobalCountextendsBaseTransactionalBoltimp
lementsICommitter{TransactionAttempt_attempt;BatchOutputColl
ector_collector;int_sum=0;@Overridepublicvoidprepare(Map
conf,TopologyContextcontext,BatchOutputCollectorcollector,T
ransactionAttemptattempt){_collector=collector;_attempt=a
ttempt;}@Overridepublicvoidexecute(Tupletuple){_sum+=tupl
e.getInteger(1);}@OverridepublicvoidfinishBatch(){Valueva
l=DATABASE.get(GLOBAL_COUNT_KEY);Valuenewval;if(val==null
||!val.txid.equals(_attempt.getTransactionId())){newval=new
Value();newval.txid=_attempt.getTransactionId();if(val==null)
{newval.count=_sum;}else{newval.count=_sum+val.count;
}DATABASE.put(GLOBAL_COUNT_KEY,newval);}else{newval=val;
}_collector.emit(newValues(_attempt,newval.count));}@Overri
depublicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer)
{declarer.declare(newFields(“id“,“sum“));}}UpdateGlobalCou
nt实现了ICommitter接口,所以storm只会在commit阶段执行finishBatch方法。而execute方法可以在
任何阶段完成。在UpdateGlobalCount的finishBatch方法中,将当前的transactionid与数据库中
存储的id做比较。如果相同,则忽略这个batch;如果不同,则把这个batch的计算结果加到总结果中,并更新数据库。Transa
ctionalTopolgy运行示意图如下:?下面总结一下TransactionalTopology的一些特性Trans
actionalTopology将事务性机制都封装好了,其内部使用CoordinateBolt来保证一个batch中的tuple
被处理完。TransactionalSpout只能有一个,它将所有tuple分为一个一个的batch,而且保证同一个batch的
transactionid始终一样。BatchBolt处理batch在一起的tuples。对于每一个tuple调用execut
e方法,而在整个batch处理完成的时候调用finishBatch方法。如果BatchBolt被标记成Committer,则只能
在commit阶段调用finishBolt方法。一个batch的commit阶段由storm保证只在前一个batch成功提交之后才
会执行。并且它会重试直到topology里面的所有bolt在commit完成提交。TransactionalTopology隐
藏了anchor/ack框架,它提供一个不同的机制来fail一个batch,从而使得这个batch被replay。5.2Trid
ent介绍Trident是Storm之上的高级抽象,提供了joins,grouping,aggregations,fuction
s和filters等接口。如果你使用过Pig或Cascading,对这些接口就不会陌生。Trident将stream中的tupl
es分成batches进行处理,API封装了对这些batches的处理过程,保证tuple只被处理一次。处理batches中间结果
存储在TridentState对象中。Trident事务性原理这里不详细介绍,有兴趣的读者请自行查阅资料。参考:http://x
umingming.sinaapp.com/736/twitter-storm-transactional-topolgoy/ht
tp://xumingming.sinaapp.com/73...sactional-topolgoy/http://xumi
ngming.sinaapp.com/811/twitter-storm-code-analysis-coordinated-bo
lt/http://xumingming.sinaapp.com/81...s-coordinated-bolt/https:
//github.com/nathanmarz/storm/wiki/Trident-tutorialhttps://github
.com/nathanmarz/storm/wiki/Trident-tutorialhttp://jishu.zol.com.c
n/2848.htmlhttp://jishu.zol.com.cn/2848.html5.2.1TwitterStorm:
TransactionalTopolgoy简介概述Storm通过保证每个tuple至少被处理一次来提供http://xuming
ming.sinaapp.com/127/twitter-storm%e5%a6%82%e4%bd%95%e4%bf%9d%e8%
af%81%e6%b6%88%e6%81%af%e4%b8%8d%e4%b8%a2%e5%a4%b1/可靠的数据处理。关于这一点最
常被问到的问题就是“既然tuple可能会被重写发射(replay),那么我们怎么在storm上面做统计个数之类的事情呢?stor
m有可能会重复计数吧?”Storm0.7.0引入了TransactionalTopology,它可以保证每个tuple”被且
仅被处理一次”,这样你就可以实现一种非常准确,非常可扩展,并且高度容错方式来实现计数类应用。跟http://xumingming
.sinaapp.com/756/twitter-storm-drpc/DistributedRPC类似,transactio
naltopology其实不能算是storm的一个特性,它其实是用storm的底层原语spout,bolt,topology
,stream等等抽象出来的一个特性。这篇文章解释了事务性topology是怎样的一种抽象,怎样使用它的api,同时也讨论了有关
它实现的一些细节。概念让我们一步步地建立transactionaltopology的抽象。我们先提出一种最简单的抽象方式,然后
一步步的完善改进,最后介绍storm代码里面所使用的抽象方式。第一个设计:最简单的抽象方法事务性topology背后的核心概念是
要在处理数据的提供一个强顺序性。这种强顺序性最简单的表现、同时也是我们第一个设计就是:我们每次只处理一个tuple,除非这个tu
ple处理成功,否则我们不去处理下一个tuple。每一个tuple都跟一个transactionid相关联。如果这个tuple处
理失败了,然后需要重写发射,那么它会被重新发射—并且附着同样的transactionid。这里说的trasactionid
其实就是一个数字,来一个tuple,它就递增一个。所以第一个tuple的transactionid是1,第二个tuple的t
ransactionid是2,等等等等。tuple的强顺序性使得我们即使在tuple重发的时候也能够实现“一次而且只有一次”的语
义。让我们看个例子:比如你想统一个stream里面tuple的总数。那么为了保证统计数字的准确性,你在数据库里面不但要保存tup
le的个数,还要保存这个数字所对应的最新的transactionid。当你的代码要到数据库里面去更新这个数字的时候,你要判断
只有当新的transactionid跟数据库里面保存的transactionid不一样的时候才去更新。考虑两种情况:数据库里面
的transactionid跟当前的transactionid不一样:由于我们transaction的强顺序性,我们知道当前
的tuple肯定没有统计在数据库里面。所以我们可以安全地递增这个数字,并且更新这个transactionid.数据库里面的tra
nsactionid一样:那么我们知道当前tuple已经统计在数据库里面了,那么可以忽略这个更新。这个tuple肯定之前在更新
了数据库之后,反馈给storm的时候失败了(ack超时之类的)。这个逻辑以及事务的强顺序性保证数据库里面的个数(count)即使在
tuple被重发的时候也是准确的。这个主意(保存count+transaction-id)是Kafka的开发者在http://
incubator.apache.org/kafka/design.html这个设计文档里面提出来的。更进一步来说,这个topol
ogy可以在一个事务里面更新很多不同的状态,并且可以到达”一次而且只有一次的逻辑”。如果有任何失败,那么已经成功的更新你再去更新它
会忽略,失败的更新你去再次更新它则会接受。比如,如果你在处理一个url流,你可以更新每个url的转发次数,同时更新每个domai
n下url的转发次数。这个简单设计有一个很大的问题,那就是你需要等待一个tuple完全处理成功之后才能去处理下一个tuple。这
个性能是非常差的。这个需要大量的数据库调用(只要每个tuple一个数据库调用),而且这个设计也没有利用到storm的并行计算能力
,所以它的可扩展能力是非常差的。第二个设计与每次只处理一个tuple的简单方案相比,一个更好的方案是每个transaction
里面处理一批tuple。所以如果你在做一个计数应用,那么你每次更新到总数里面的是这一整个batch的tuple数量。如果这个ba
tch失败了,那么你重新replay这整个batch。相应地,我们不是给每个tuple一个transactionid而是给整个
batch一个transactionid,batch与batch之间的处理是强顺序性的,而batch内部是可以并行的。下面这个
是设计图:所以如果你每个batch处理1000个tuple的话,那么你的应用将会少1000倍的数据库调用。同时它利用了storm
的并行计算能力(每个batch内部可以并行)虽然这个设计比第一个设计好多了,它仍然不是一个完美的方案。topology里面的wo
rker会花费大量的时间等待计算的其它部分完成。比如看下面的这个计算。在bolt1完成它的处理之后,它需要等待剩下的bolt
去处理当前batch,直到发射下一个batch。第三个设计(storm采用的设计)一个我们需要意识到的比较重要的问题是,为了实现
transactional的特性,在处理一批tuples的时候,不是所有的工作都需要强顺序性的。比如,当做一个全局计数应用的时候,
整个计算可以分为两个部分。计算这个batch的局部数量。把这个batch的局部数量更新到数据库里面去。其中第二步在多个batch
之前需要保证强的顺序性,但是第一步并不许要,所以我们可以把第一步并行化。所以当第一个batch在更新它的个数进入数据库的时候,
第2到10个batch可以开始计算它们的局部数量了。Storm通过把一个batch的计算分成两个阶段来实现上面所说的原理:proc
essing阶段:这个阶段很多batch可以并行计算。commit阶段:这个阶段各个batch之间需要有强顺序性的保证。所以第
二个batch必须要在第一个batch成功提交之后才能提交。这两个阶段合起来称为一个transaction。许多batch可以在p
rocessing阶段的任何时刻并行计算,但是只有一个batch可以处在commit阶段。如果一个batch在processing
或者commit阶段有任何错误,那么整个transaction需要被replay。设计细节当使用TransactionalTo
pologies的时候,storm为你做下面这些事情:1)管理状态:Storm把所有实现TransactionalTopo
logies所必须的状态保存在zookeeper里面。这包括当前transactionid以及定义每个batch的一些元数据。
2)协调事务:Storm帮你管理所有事情,以帮你决定在任何一个时间点是该proccessing还是该committing。3
)错误检测:Storm利用acking框架来高效地检测什么时候一个batch被成功处理了,被成功提交了,或者失败了。Storm
然后会相应地replay对应的batch。你不需要自己手动做任何acking或者anchoring—storm帮你搞定所有事情
。4)内置的批处理API:Storm在普通bolt之上包装了一层API来提供对tuple的批处理支持。Storm管理所有的协调
工作,包括决定什么时候一个bolt接收到一个特定transaction的所有tuple。Storm同时也会自动清理每个transa
ction所产生的中间数据。5)最后,需要注意的一点是TransactionalTopologies需要一个可以完全重发(re
play)一个特定batch的消息的队列系统(MessageQueue)。https://github.com/robey/ke
strelKestrel之类的技术做不到这一点。而http://incubator.apache.org/kafka/index.
htmlApache的Kafka对于这个需求来说是正合适的。https://github.com/nathanmarz/storm
-contribstorm-contrib里面的https://github.com/nathanmarz/storm-contr
ib/tree/master/storm-kafkastorm-kafka实现了这个。一个基本的例子你可以通过使用http://n
athanmarz.github.com/storm/doc-0.7.0/backtype/storm/transactional
/TransactionalTopologyBuilder.htmlTransactionalTopologyBuilder来创建
transactionaltopology.下面就是一个transactionaltopology的定义,它的作用是计算输
入流里面的tuple的个数。这段代码来自storm-starter里面的https://github.com/nathanmarz
/storm-starter/blob/master/src/jvm/storm/starter/TransactionalGlo
balCount.javaTransactionalGlobalCount。http://xumingming.sinaapp.c
om/736/twitter-storm-transactional-topolgoy/帮助12345678MemoryTrans
actionalSpoutspout=newMemoryTransactionalSpout(?DATA,newFie
lds("word"),PARTITION_TAKE_PER_BATCH);TransactionalTopologyBuild
erbuilder=newTransactionalTopologyBuilder(?"global-count","s
pout",spout,3);builder.setBolt("partial-count",newBatchCount(
),5)?.shuffleGrouping("spout");builder.setBolt("sum",newUpdate
GlobalCount())?.globalGrouping("partial-count");TransactionalTopo
logyBuilder接受如下的参数这个transactiontopology的idspout在整个topology里面的id。
一个transactionalspout。一个可选的这个transactionalspout的并行度。topology的id是
用来在zookeeper里面保存这个topology的当前进度的,所以如果你重启这个topology,它可以接着前面的进度继续执
行。一个transactiontopology里面有一个唯一的TransactionalSpout,这个spout是通过Tra
nsactionalTopologyBuilder的构造函数来制定的。在这个例子里面,MemoryTransactionalSpo
ut被用来从一个内存变量里面读取数据(DATA)。第二个参数制定数据的fields,第三个参数指定每个batch的最大tuple
数量。关于如何自定义TransactionalSpout我们会在后面介绍。现在说说bolts。这个topology并行地计算tu
ple的总数量。第一个bolt:BatchBolt,随机地把输入tuple分给各个task,然后各个task各自统计局部数量。第二
个bolt:UpdateBlobalCount,用全局grouping来从汇总这个batch的总的数量。然后再把总的数量更新到数
据库里面去。下面是BatchCount的定义:http://xumingming.sinaapp.com/736/twitter-
storm-transactional-topolgoy/帮助0102030405060708091011121314151617
1819202122232425262728publicstaticclassBatchCountextendsBase
BatchBolt{?Object_id;?BatchOutputCollector_collector;??int_co
unt=0;??@Override?publicvoidprepare(Mapconf,TopologyContext
context,?BatchOutputCollectorcollector,Objectid){?_collector
=collector;?_id=id;?}??@Override?publicvoidexecute(Tupletu
ple){?_count++;?}??@Override?publicvoidfinishBatch(){?_collec
tor.emit(newValues(_id,_count));?}??@Override?publicvoiddecla
reOutputFields(OutputFieldsDeclarerdeclarer){?declarer.declare(
newFields("id","count"));?}}storm会为每个batch创建这个一个BatchCount对象。而这
些BatchCount是运行在BatchBoltExecutor里面的。而https://github.com/nathanmar
z/storm/blob/master/src/jvm/backtype/storm/coordination/BatchBolt
Executor.javaBatchBoltExecutor负责创建以及清理这个对象的实例。这个对象的prepare方法接收如下参
数:包含stormconfig信息的map。TopologyContextOutputCollector这个batch的id。而
在TransactionalTopologies里面,这个id则是一个http://nathanmarz.github.com
/storm/doc-0.7.0/backtype/storm/transactional/TransactionAttempt.
htmlTransactionAttempt对象。这个batchbolt的抽象在DRPC里面也可以用,只是id的类型不一样而已
。BatchBolt其实真的接收一个id类型的参数—它是一个java模板类,所以如果你只是想在transactioinalt
opology里面使用这个BatchBolt,你可以这样定义:http://xumingming.sinaapp.com/736/
twitter-storm-transactional-topolgoy/帮助123publicabstractclassB
aseTransactionalBolt?extendsBaseBatchBolt{}
在transactiontopology里面发射的所有的tuple都必须以TransactionAttempt作为第一个fiel
d,然后storm可以根据这个field来判断哪些tuple属于一个batch。所以你在发射tuple的时候需要满足这个条件。T
ransactionAttempt包含两个值:一个transactionid,一个attemptid。transaction
id的作用就是我们上面介绍的对于每个batch是唯一的,而且不管这个batchreplay多少次都是一样的。attemptid
是对于每个batch唯一的一个id,但是对于统一个batch,它replay之后的attemptid跟replay之前就不一样
了,我们可以把attemptid理解成replay-times,storm利用这个id来区别一个batch发射的tuple的
不同版本。transactionid对于每个batch加一,所以第一个batch的transactionid是”1″,第二
个batch是”2″,以此类推。execute方法会为batch里面的每个tuple执行一次,你应该把这个batch里面的状态保持
在一个本地变量里面。对于这个例子来说,它在execute方法里面递增tuple的个数。最后,当这个bolt接收到某个batch
的所有的tuple之后,finishBatch方法会被调用。这个例子里面的BatchCount类会在这个时候发射它的局部数量到它
的输出流里面去。下面是UpdateGlobalCount类的定义。http://xumingming.sinaapp.com/73
6/twitter-storm-transactional-topolgoy/帮助010203040506070809101112
13141516171819202122232425262728293031323334353637383940414243444
54647publicstaticclassUpdateGlobalCount?extendsBaseTransactio
nalBolt?implementsICommitter{?TransactionAttempt_attempt;?Batc
hOutputCollector_collector;??int_sum=0;??@Override?publicvoi
dprepare(Mapconf,?TopologyContextcontext,?BatchOutputCollector
collector,?TransactionAttemptattempt){?_collector=collector;
?_attempt=attempt;?}??@Override?publicvoidexecute(Tupletuple
){?_sum+=tuple.getInteger(1);?}??@Override?publicvoidfinishBat
ch(){?Valueval=DATABASE.get(GLOBAL_COUNT_KEY);?Valuenewval;?
if(val==null||?!val.txid.equals(_attempt.getTransactionId()))
{?newval=newValue();?newval.txid=_attempt.getTransactionId()
;?if(val==null){?newval.count=_sum;?}else{?newval.count=_s
um+val.count;?}?DATABASE.put(GLOBAL_COUNT_KEY,newval);?}else
{?newval=val;?}?_collector.emit(newValues(_attempt,newval.cou
nt));?}??@Override?publicvoiddeclareOutputFields(OutputFieldsDe
clarerdeclarer){?declarer.declare(newFields("id","sum"));?}}U
pdateGlobalCount是TransactionalTopologies相关的类,所以它继承自BaseTransact
ionalBolt。在execute方法里面,?UpdateGlobalCount累积这个batch的计数,比较有趣的是fini
shBatch方法。首先,注意这个bolt实现了ICommitter接口。这告诉storm要在这个事务的commit阶段调用fi
nishBatch方法。所以对于finishBatch的调用会保证强顺序性(顺序就是transactionid的升序),而相对
来说execute方法在任何时候都可以执行,processing或者commit阶段都可以。另外一种把bolt标识为commite
r的方法是调用TransactionalTopologyBuilder的setCommiterBolt来添加Bolt(而不是set
Bolt)。UpdateGlobalCount里面finishBatch方法的逻辑是首先从数据库中获取当前的值,并且把数据库里面的
transactionid与当前这个batch的transactionid进行比较。如果他们一样,那么忽略这个batch。否
则把这个batch的结果加到总结果里面去,并且更新数据库。关于transactionaltopology的更深入的例子可以卡看s
torm-starter里面的https://github.com/nathanmarz/storm-starter/blob/m
aster/src/jvm/storm/starter/TransactionalWords.javaTransactionalW
ords类,这个类里面会在一个事务里面更新多个数据库。TransactionalTopologyAPI这一节介绍Transa
ctiontopologyAPIBolts在一个transactionaltopology里面最多有三种类型的bolt:ht
tp://nathanmarz.github.com/storm/doc-0.7.0/backtype/storm/topolog
y/base/BaseBasicBolt.htmlBasicBolt:这个bolt不跟batch的tuple打交道,它只基于单个
tuple的输入来发射新的tuple。http://nathanmarz.github.com/storm/doc-0.7.0/b
acktype/storm/topology/base/BaseBatchBolt.htmlBatchBolt:这个bolt处理
batch在一起的tuples。对于每一个tuple调用execute方法。而在整个batch处理完成的时候调用finishBat
ch方法被标记成Committer的BatchBolt:和普通的BatchBolt的唯一的区别是finishBatch这个方法被
调用的时机。作为committer的BatchBolt的finishBatch方法在commit阶段调用。一个batch的comm
it阶段由storm保证只在前一个batch成功提交之后才会执行。并且它会重试直到topology里面的所有bolt在commit
完成提交。有两个方法可以让一个普通BatchBolt变成committer:1)实现http://nathanmarz.git
hub.com/storm/doc-0.7.0/backtype/storm/transactional/ICommitter.h
tmlICommitter接口2)通过TransactionalTopologyBuilder的setCommitterBolt方法把BatchBolt添加到topology里面去。Processingphasevs.commitphaseinbolts为了搞清除processing阶段与commit阶段的区别,让我们看个例子:在这个topology里面只有用红线标出来的是committers。在processing阶段,boltA会处理从spout发射出来的整个batch。并且发射tuple给boltB和boltC。BoltB是一个committer,所以它会处理所有的tuple,但是不会调用finishBatch方法。BoltC同样也不会调用finishBatch方法,它的原因是:它不知道它有没有从BoltB接收到所有的tuple。(因为BoltB还在等着事务提交)最后BoltD会接收到BoltC在调用execute方法的时候发射的所有的tuple。当batch提交的时候,BoltB上的finishBatch被调用。BoltC现在可以判断它接收到了所有的tuple,所以可以调用finishBatch了。最后BoltD接收到了它的所有的tuple所以就调用finishBatch了。要注意的是,虽然BoltD是一个committer,它在接收到整个batch的tuple之后不需要等待第二个commit信号。因为它是在commit阶段接收到的整个batch,它会调用finishBatch来完成整个事务。Acking注意,你不需要显式地去做任何的acking或者anchoring。storm在背后都做掉了。(storm对transactionaltopolgies里面的acking机制进行了高度的优化)Failingatransaction在使用普通bolt的时候,你可以通过调用OutputCollector的fail方法来fail这个tuple所在的tuple树。由于TransactionalTopologies把acking框架从用户的视野里面隐藏掉了,它提供一个不同的机制来fail一个batch(从而使得这个batch被replay)。只要抛出一个http://nathanmarz.github.com/storm/doc-0.7.0/backtype/storm/topology/FailedException.htmlFailedException就可以了。跟普通的异常不一样,这个异常只会导致当前的batch被replay,而不会使整个进程crash掉。TransactionalspoutTransactionalSpout接口跟普通的Spout接口完全不一样。一个TransactionalSpout的实现一个batch一个batch的tuple,而且必须保证同一个batch的transactionid始终一样。在transactionaltopology中运行的时候,transactionalspout看起来是这样的一个结构:在图的左边的coordinator是一个普通的storm的spout—它一直为事务的batch发射tuple。Emitter则像一个普通的stormbolt,它负责为每个batch实际发射tuple。emitter以allgrouping的方式订阅coordinator的”batchemit”流。由于TransactionalSpout发射的tuple可能需要会被replay,因此需要具有幂等性(否则多次replay同一个tuple会使得最后的结果不对),为了实现幂等性,需要保存TransactionalSpout的少量的状态,这个状态是保存在ZooKeeper里面的。关于如何实现一个TransactionalSpout的细节可以参见http://nathanmarz.github.com/storm/doc-0.7.0/backtype/storm/transactional/ITransactionalSpout.htmlJavadoc。PartitionedTransactionalSpout一种常见的TransactionalSpout是那种从多个queuebroker夺取数据然后再发射的tuple。比如https://github.com/nathanmarz/storm-contrib/blob/master/storm-kafka/src/jvm/storm/kafka/TransactionalKafkaSpout.javaTransactionalKafkaSpout是这样工作的。IPartitionedTransactionalSpout把这些管理每个分区的状态以保证可以replay的幂等性的工作都自动化掉了。更多可以参考http://nathanmarz.github.com/storm/doc-0.7.0/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.htmlJavadoc配置TransactionalTopologies有两个重要的配置:Zookeeper:默认情况下,transactionaltopology会把状态信息保存在主zookeeper里面(协调集群的那个)。你可以通过这两个配置来指定其它的zookeeper:”transactional.zookeeper.servers”和“transactional.zookeeper.port“。同时活跃的batch数量:你必须设置同时处理的batch数量。你可以通过”topology.max.spout.pending”来指定,如果你不指定,默认是1。实现TransactionalTopologies的实现是非常优雅的。管理提交协议,检测失败并且串行提交看起来很复杂,但是使用storm的原语来进行抽象是非常简单的。transactionaltopology里面的spout是一个子topology,它由一个spout和一个bolt组成。spout是协调者,它只包含一个task。bolt是发射者bolt以allgrouping的方式订阅协调者的输出。元数据的序列化用的是kryo。协调者使用acking框架来决定什么时候一个batch被成功执行完成,然后去决定一个batch什么时候被成功提交。状态信息被以RotatingTransactionalState的形式保存在zookeeper里面了。commitingbolts以allgrouping的方式订阅协调者的commit流。CoordinatedBolt被用来检测一个bolt是否收到了一个特定batch的所有tuple。这一点上面跟DRPC里面是一样的。对于commitingbolt来说,他会一直等待,知道从coordinator的commit流里面接收到一个tuple之后,它才会调用finishBatch方法。所以在没有从coordinator的commit流接收到一个tuple之前,committingbolt不可能调用finishBolt方法。
献花(0)
+1
(本文系关平藏书首藏)