花钱的年华

江南白衣,公众号:春天的旁边

Storm笔记

| Filed under 技术

用了一段时间Storm后的笔记。发现可以记的东西不多,证明Storm挺简单的,你只要遵循一些简单的接口与原则,就能写出大规模实时消息处理的程序。

不断更新中,请尽量访问博客原文
 

为什么用Storm

没接触前把Storm想象得很强大,接触后觉得它就那样可有可无,再后来又觉得没有了全部自己做也麻烦。

集群管理:支持应用的部署,工作节点的管理(任务分配、HA、Scalable等),Metrics的收集。

数据流的传输与路由:支持多种数据在各处理节点间自由流动(是同类方案里DAG拓扑最灵活的),基于Netty的高效传输机制,支持轮询、广播、按值分组的路由。

数据高可靠性的保证:支持数据流动了多个节点后,在某个节点的处理失败,可以引发数据从源头开始重传

按Storm的官方说法,你也可以自己搭建许多消息队列和worker组成的网络来实现实时处理,但是:

乏味:你大部份开发时间花费在部署worker,部署中间队列,配置消息发送到哪里。你所关心的实时处理逻辑只占了你的代码很少的比例 。

脆弱:你要自己负责保持每个worker和队列正常工作。

伸缩时痛苦:当worker或队列的消息吞吐量太高时,你需要重新分区,重新配置其它worker,让它们发送消息到新位置。

缺点

核心代码是用Clojure写成,翻看代码非常不便。其实,它现在很多新的外部模块都用Java来写了,另外阿里同学翻写了一个JStorm

 

其他流处理方案

Spark-Streaming:总是有人问为什么不用Spark Stream,首先它是Micro-Batch风格的准实时方案,间隔一般设到500ms。另外,它的消息流拓扑好像没Storm那样可以随便乱入,有时候必须弄个DB或MQ来做中间传输。

Samza: Linkedin的产品,与Storm比,传输基于Apache Kafka,集群管理基于YARN,它只做处理的那块,但多了基于RocksDB的状态管理。但据《大数据日知录》说,受限于Kafka和YARN,它的拓扑也不够灵活。

 

自定义Spout

Storm对可靠消息传输的支持程度,很大程度上依赖于Spout的实现。

并不默认就支持高可靠性的,collector emit的时候要传输msgId,要自己处理ack(msgId)和fail(msgId)函数。而很多spout其实没有这样做,只有Kafka Spout做的比较正规。

默认的,如果三十秒,消息流经的所有下游节点没有都ack完毕,或者有一个节点报fail,则触发fail(msgId)函数。

因为ack/fail的参数只有msgId,这就要Spout想在ack/fail时对上有消息源如Kafka/JMS进行ack/fail,又或者fail时想重发消息,如果需要完整的消息体而不只是msgId才能完成时,要自己把msgId对应的消息存起来(会撑爆内存么,好在Kafka不需要)。

另外,因为每个Spout 是单线程的,会循环的调用nextTuple()的同时,调用ack()或fail()处理结果。所以nextTuple()如果没消息时不要长期阻塞,避免把ack()也阻塞了。同时,Storm自己有个机制在nextTuple总是没消息时Sleep一下避免空循环耗CPU,但参考storm-starter里的spout,还是直接内部等个50ms好了。在JStorm里,就改为了两条分开的线程。

另外,spout有时是每次被调用nextTuple()时主动去pull消息的,有时是被动接收push消息后存放在LinkedBlockingQueue里,netxtTuple()时从Queue里取消息的。Spout突然crash的话,存在Queue里的消息也会丢失,除非上游消息源有ack机制来保障。

Spout还有个Max Pending的配置,如果有太多消息没有ack,它就不会再调nextTuple()。但如果上游消息源是主动Push的,消息还是会源源不断的来,累积在queue里。

 

RichBolt vs BasicBolt

直接用BasicBolt,会在execute()后自动ack/fail Tuple,而RichBolt则需要自行调用ack/fail。

那什么时候使用RichBolt? Bolt不是在每次execute()时立刻产生新消息,需要异步的发送新消息(比如聚合一段时间的数据再发送)时,又或者想异步的ack/fail原消息时就需要。

BasicBolt的prepare()里并没有collector参数,只在每次execute()时传入collector。而RichBolt刚好相反,你可以在初始化时就把collector保存起来,用它在任意时候发送消息。

另外,如果用RichBolt的collector,还要考虑在发送消息时是否带上传入的Tuple,如果不带,则下游的处理节点出错也不会回溯到Spout重发。用BasicBolt则已默认带上。

 

Ack机制

作者是一拍脑袋想到了用20个字节来追踪每条Spout出来的消息被处理的情况,原理是XOR的时候,N XOR N=0,同一个值以任意次序XOR两次会归0,如A XOR B XOR B XOR C XOR A XOR C =0, 在发出Tuple时,就用随机产生的Tuple Id XOR一下。等接收的Bolt ack时,再XOR一下,就会归0。所以当消息以任意的顺序会流经很多节点,产生很多新Tuple,如果都被成功处理,即所有Tuple id都被以任意顺序执行了两次XOR,则这20个字节最后应该重新归0,就可判断全部ack完毕。

另外,重发是从最上游的Spout开始,如果某个bolt的操作是非幂等的,还要想想怎么自己去实现去重。
 

异常处理

如果希望上游的Spout重发消息,则在BasicBolt中抛出FailedException 或在RichBolt中直接fail掉Tuple。
其他情况下,execute()方法不应该抛出任何异常,或者你是故意抛出异常使得Topology停转。

 

状态管理

状态数据分两种,一种是本地历史数据,不如使用路由规则,使相同用户的数据总是路由到同一个Bolt。 一种是全局的数据。Storm完全不管数据的持久化(Trident那块没用到不算), 《Storm 并非完全适合所有实时应用》 就是吐槽Storm的状态数据管理的。

不像Linkedin的Samza,Bolt如果需要历史数据,一般自己在内存里管理数据(Crash掉或节点的变化导致路由变化就没了哈),或者在本地起一个Redis/Memcached(不能与Bolt一起管理,路由变化的数据迁移,性能也会削弱)

对于全局数据,同样需要Cassandra之类高可扩展的NOSQL来帮忙,但此时延时会更厉害,性能瓶颈也很可能压到了Cassandra上。

 

定时任务

定时聚合数据之类的需求,除了自己在bolt里开定时器,还可以用如下设置,所有Bolt都定时收到一条Tick消息:

conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 300);

如下函数用于判断是Tick还是正常业务消息:

protected static boolean isTickTuple(Tuple tuple) {
return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}

 

拓扑的定义

除了使用Java代码,还可以使用Yaml来动态定义拓扑,见 https://github.com/ptgoetz/flux

并发度的定义及基于命令行的动态扩容见官方文档,另对于worker进程数的建议是Use one worker per topology per machine。

 

序列化

Tuple除了传基本类型与数组,AraayList,HashMap外,也可以传一下Java对象的。Storm使用Kyro作为序列化框架,据测比Hessian什么的都要快和小。但一定注册这些额外的Java对象的类型,否则就会使用Java默认的序列化。

参看官方文档,有两种方式注册类型,一个是storm.yaml文件,一个是Config类的registerSerialization方法。如无特殊需求,直接注册需要序列化的类就可以了,不需要自己实现一个Serializer,Kryo默认的按fields序列化的Serializer已足够。

Spout和Bolt的构造函数只会在submit Topology时调一次,然后序列化起来,直接发给工作节点,工作节点里实例化时不会被调用里,所以复杂的成员变量记得都定义成transient,在open(),prepare()里初始化及连接数据库等资源。

另外,需要实现close()函数清理资源,但该函数不承诺在worker进程被杀时保证被调用。

 

fields grouping的算法

按名称提取fields的值,叠加其hash值,再按当前的可选Task数量取模。所以,动态扩展Task数量,或某Task失效被重建的话,都可能让原来的分配完全乱掉。

 

与其他开源技术的集成

比如External目录里的一堆,storm-contrib 里也有一堆,目前支持Jdbc,Redis,HBase,HDFS,Hive,甚至还有Esper,目标都是通过配置(比如SQL及Input/Output fields),而非代码,或尽量少的代码,实现交互。有时也可以不一定要直接用它们,当成Example Code来看就好了。

另外,与传统的Java应用思路相比,Bolt/Spout与资源连接时,比较难实现共享连接池的概念,连接池一般都是每个Bolt/Spout实例自用的,要正确处理其连接数量。

 

HA的实现

如果Worker进程失效,Supervisor进程会检查 Worker的心跳信息,重新创建。
Supervisor进程,需要用Daemon程序如monit来启动,失效时自动重新启动。
如果整个机器节点失效,Nimbus会在其他节点上重新创建。

Nimbus进程,需要用Daemon程序如monit来启动,失效时自动重新启动。
如果Nimbus进程所在的机器直接倒了,需要在其他机器上重新启动,Storm目前没有内建支持,需要自己写脚本实现。

因为Supervisor和Nimbus在进程内都不保存状态,状态都保存在本地文件和ZooKeeper,因此进程可以随便杀。
即使Nimbus进程不在了,也只是不能部署新任务,有节点失效时不能重新分配而已,不影响已有的线程。
同样,如果Supervisor进程失效,不影响已存在的Worker进程。

Zookeeper本身已经是按至少三台部署的HA架构了。

 

运维管理

Storm UI也是用Clojure写的,比较难改,好在它提供了Restful API,可以与其他系统集成,或基于API重写一个UI。

Metrics的采样率是1/20(topology.stats.sample.rate=0.05),即Storm随机从20个事件里取出一个事件来进行统计,命中的话,counter 直接+20。

在旧版本的Storm使用旧版的ZooKeeper要启动数据清理的脚本,在新版上只要修改ZooKeeper的配置文件zoo.cfg, 默认是24小时清理一次 autopurge.purgeInterval=24。

日志的配置在logback/cluster.xml文件里,Storm的日志,天然的需要Logstash + ElasticSearch的集中式日志方案。

storm.local.dir 要自己建,而且不支持~/ 代表用户根目录。

storm.yaml的默认值在 https://github.com/apache/storm/blob/master/conf/defaults.yaml

 

Tunning

1. 内部传输机制的各种配置,见文档

2. 屏蔽ack机制,当可靠传输并不是最重要时。可以把Acker数量设为0,可以让Spout不要发出msgId,或者bolt发送消息时不传之前的Tuple。

 

资料

文章持续修订,转载请保留原链接: http://calvin1978.blogcn.com/articles/stormnotes.html

有关的...

发表评论

您的电子邮箱不会被公开。

您可以使用这些 HTML 标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>