Storm框架入门1 Topology构成 和同样是计算框架的Mapreduce相比,Mapreduce集群上运行的是Job,而Storm集群上运行的是Topology。但是Job在运行结束之后会自行结束,Topology却只能被手动的kill掉,否则会一直运行下去。 ...
Storm框架入门
1 Topology构成
和同样是计算框架的Mapreduce相比,Mapreduce集群上运行的是Job,而Storm集群上运行的是Topology。但是Job在运行结束之后会自行结束,Topology却只能被手动的kill掉,否则会一直运行下去。
Storm集群中有两种节点,一种是控制节点(Nimbus节点),另一种是工作节点(Supervisor节点)。所有Topology任务的提交必须在Storm客户端节点上进行(需要配置~/.storm/storm.yaml文件),由Nimbus节点分配给其他Supervisor节点进行处理。Nimbus节点首先将提交的Topology进行分片,分成一个个的Task,并将Task和Supervisor相关的信息提交到zookeeper集群上,Supervisor会去zookeeper集群上认领自己的Task,通知自己的Worker进程进行Task的处理。总体的Topology处理流程图为:
3.1 open方法
当一个Task被初始化的时候会调用此open方法。一般都会在此方法中对发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext初始化。
示例如下:
1 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {2 3 _collector = collector;4 5 }
4.1 prepare方法
此方法和Spout中的open方法类似,为Bolt提供了OutputCollector,用来从Bolt中发送Tuple。Bolt中Tuple的发送可以在prepare方法中、execute方法中、cleanup等方法中进行,一般都是些在execute中。
示例如下:
1 public void prepare(Map conf, TopologyContext context, OutputCollector collector) {2 3 _collector = collector;4 5 }
有几点需要说明的地方:
(1)每个组件(Spout或者Bolt)的构造方法和declareOutputFields方法都只被调用一次。
(2)open方法、prepare方法的调用是多次的。入口函数中设定的setSpout或者setBolt里的并行度参数指的是executor的数目,是负责运行组件中的task的线程 的数目,此数目是多少,上述的两个方法就会被调用多少次,在每个executor运行的时候调用一次。相当于一个线程的构造方法。
(3)nextTuple方法、execute方法是一直被运行的,nextTuple方法不断的发射Tuple,Bolt的execute不断的接收Tuple进行处理。只有这样不断地运行,才会产 生无界的Tuple流,体现实时性。相当于线程的run方法。
(4)在提交了一个topology之后,Storm就会创建spout/bolt实例并进行序列化。之后,将序列化的component发送给所有的任务所在的机器(即Supervisor节 点),在每一个任务上反序列化component。
(5)Spout和Bolt之间、Bolt和Bolt之间的通信,是通过zeroMQ的消息队列实现的。
(6)上图没有列出ack方法和fail方法,在一个Tuple被成功处理之后,需要调用ack方法来标记成功,否则调用fail方法标记失败,重新处理这个Tuple。
6.4 Topology并行度
在Topology的执行单元里,有几个和并行度相关的概念。
(1)worker:每个worker都属于一个特定的Topology,每个Supervisor节点的worker可以有多个,每个worker使用一个单独的端口,它对Topology中的每个component运行一个或者多个executor线程来提供task的运行服务。
(2)executor:executor是产生于worker进程内部的线程,会执行同一个component的一个或者多个task。
(3)task:实际的数据处理由task完成,在Topology的生命周期中,每个组件的task数目是不会发生变化的,而executor的数目却不一定。executor数目小于等于task的数目,默认情况下,二者是相等的。
在运行一个Topology时,可以根据具体的情况来设置不同数量的worker、task、executor,而设置的位置也可以在多个地方。
(1)worker设置:
(1.1)可以通过设置yaml中的topology.workers属性
(1.2)在代码中通过Config的setNumWorkers方法设定
(2)executor设置:
通过在Topology的入口类中setBolt、setSpout方法的最后一个参数指定,不指定的话,默认为1;
(3)task设置:
(3.1) 默认情况下,和executor数目一致;
(3.2)在代码中通过TopologyBuilder的setNumTasks方法设定具体某个组件的task数目;
6.5 终止Topology
通过在Nimbus节点利用如下命令来终止一个Topology的运行:
storm kill topologyName
kill之后,可以通过UI界面查看topology状态,会首先变成KILLED状态,在清理完本地目录和zookeeper集群中的和当前Topology相关的信息之后,此Topology就会彻底消失了。
7 Topology跟踪
Topology提交后,可以在Nimbus节点的web界面查看,默认的地址是http://NimbusIp:8080。
8 Storm应用
上面给出了如何编写Storm框架任务Topology的方法,那么在哪些场景下能够使用Storm框架呢?下面介绍Storm框架的几个典型的应用场景。
(1)利用Storm框架的DRPC进行大量的函数并行调用,即实现分布式的RPC;
(2)利用Storm框架的Transaction Topology,可以进行实时性的批量更新或者查询数据库操作或者应用需要同一批内的消息以及批与批之间的消息并行处理这样的场景,此时Topology中只能有一个TrasactionalSpout;
(3)利用滑动窗口的逻辑结合Storm框架来计算得出某段时间内的售出量最多的产品、购买者最多的TopN地区等;
(4)精确的广告推送,在用户浏览产品的时候,将浏览记录实时性的搜集,发送到Bolt,由Bolt来根据用户的账户信息(如果有的话)完成产品的分类统计,产品的相关性查询等逻辑计算之后,将计算结果推送给用户;
(5)实时日志的处理,Storm可以和一个分布式存储结合起来,实时性的从多个数据源发送数据到处理逻辑Bolts,Bolts完成一些逻辑处理之后,交给分布式存储框架进行存储,此时,Spout可以是多个;
(6)实时性的监控舆论热点,比如针对某个关键词,在用户查询的时候,产生数据源Spout,结合语义分析等,由Bolt来完成查询关键词的统计分析,汇总当前的舆论热点;
(7)数据流的实时聚合操作。
原标题:Storm
关键词:
*特别声明:以上内容来自于网络收集,著作权属原作者所有,如有侵权,请联系我们:
admin#shaoqun.com
(#换成@)。