你的位置:首页 > 数据库

[数据库]Storm 运行例子


1.建立Java工程

使用idea,添加lib库,拷贝storm中lib到工程中

 

2.拷贝wordcount代码

下载src包,解压找到

apache-storm-0.9.4-src\apache-storm-0.9.4\examples\storm-starter\src\jvm\storm\starter目录下

拷贝WordCountTopology.java内容;

修改python处理方式;

 1 import backtype.storm.Config; 2 import backtype.storm.LocalCluster; 3 import backtype.storm.StormSubmitter; 4 import backtype.storm.task.ShellBolt; 5 import backtype.storm.topology.BasicOutputCollector; 6 import backtype.storm.topology.IRichBolt; 7 import backtype.storm.topology.OutputFieldsDeclarer; 8 import backtype.storm.topology.TopologyBuilder; 9 import backtype.storm.topology.base.BaseBasicBolt;10 import backtype.storm.tuple.Fields;11 import backtype.storm.tuple.Tuple;12 import backtype.storm.tuple.Values;13 import com.bigdata.storm.spout.*;14 15 import java.util.HashMap;16 import java.util.Map;17 /**18  * Created by Edward on 2016/8/17.19 */20 public class MyTest {21 22   public static class SplitSentence extends BaseBasicBolt{23 24     @Override25     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {26       String word = tuple.getString(0);27       String str[] = word.split(" ");28 29       System.out.println("Split Sentence:" + tuple.getSourceStreamId());30       for(int i=0; i<str.length; i++)31       {32         basicOutputCollector.emit(new Values(str[i]));33       }34     }35 36     @Override37     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {38       outputFieldsDeclarer.declare(new Fields("word"));39     }40   }41 42   public static class WordCount extends BaseBasicBolt {43     Map<String, Integer> counts = new HashMap<String, Integer>();44 45     @Override46     public void execute(Tuple tuple, BasicOutputCollector collector) {47       String word = tuple.getString(0);48       Integer count = counts.get(word);49       if (count == null)50         count = 0;51       count++;52       counts.put(word, count);53       System.out.println("Word Count:" + tuple.getSourceStreamId());54       collector.emit(new Values(word, count));55 56     }57 58     @Override59     public void declareOutputFields(OutputFieldsDeclarer declarer) {60       declarer.declare(new Fields("word", "count"));61     }62   }63 64   public static void main(String[] args) throws Exception {65 66     TopologyBuilder builder = new TopologyBuilder();67 68     builder.setSpout("spout", new RandomSentenceSpout(), 5);69 70     builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");71     builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));72 73     Config conf = new Config();74     conf.setDebug(true);75 76 77     if (args != null && args.length > 0) {78       conf.setNumWorkers(3);79 80       StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());81     }82     else {83       conf.setMaxTaskParallelism(3);84 85       LocalCluster cluster = new LocalCluster();86       cluster.submitTopology("word-count", conf, builder.createTopology());87 88       Thread.sleep(50000);89 90       cluster.shutdown();91     }92   }93 }

3.拷贝随机生成spout代码

找到 apache-storm-0.9.4-src\apache-storm-0.9.4\examples\storm-starter\src\jvm\storm\starter\spout

拷贝RandomSentenceSpout.java到工程中

 1 import backtype.storm.spout.SpoutOutputCollector; 2 import backtype.storm.task.TopologyContext; 3 import backtype.storm.topology.OutputFieldsDeclarer; 4 import backtype.storm.topology.base.BaseRichSpout; 5 import backtype.storm.tuple.Fields; 6 import backtype.storm.tuple.Values; 7 import backtype.storm.utils.Utils; 8  9 import java.util.Map;10 import java.util.Random;11 12 public class RandomSentenceSpout extends BaseRichSpout {13   SpoutOutputCollector _collector;14   Random _rand;15 16   @Override17   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {18     _collector = collector;19     _rand = new Random();20   }21 22   @Override23   public void nextTuple() {24     Utils.sleep(10000);25     String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",26         "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };27     String sentence = sentences[_rand.nextInt(sentences.length)];28     Object id = new Object();29     System.out.println(id);30     //id message ID 用来保证可靠性的,如果失败fail 会返回 message id 信息31     _collector.emit(new Values(sentence), id);32   }33 34   @Override35   public void ack(Object id) {36     System.out.println("storm spout ack id = "+id);37   }38 39   @Override40   public void fail(Object id) {41   }42 43   @Override44   public void declareOutputFields(OutputFieldsDeclarer declarer) {45     declarer.declare(new Fields("word"));46   }47 48 }

4.本地运行

在idea中直接点击运行,观察运行过程

 

5.集群运行

将程序打包成jar,然后放到集群中运行。