Storm开发细节是什么
Storm开发细节是什么
这篇文章主要介绍“Storm开发细节是什么”,在日常操作中,相信很多人在Storm开发细节是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Storm开发细节是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
packagetest;importjava.io.IOException;importjava.util.Map;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importstorm.copyFromClass.TestWordSpout;importcom.esotericsoftware.minlog.Log;importbacktype.storm.Config;importbacktype.storm.LocalCluster;importbacktype.storm.task.OutputCollector;importbacktype.storm.task.TopologyContext;importbacktype.storm.topology.BasicOutputCollector;importbacktype.storm.topology.OutputFieldsDeclarer;importbacktype.storm.topology.TopologyBuilder;importbacktype.storm.topology.base.BaseBasicBolt;importbacktype.storm.topology.base.BaseRichBolt;importbacktype.storm.tuple.Tuple;//测试目的,在这里我们需要测试一下当前Spout不断产生数据的过程publicclasstestWordSpoutTopology{publicstaticclassTestSimpleBoltextendsBaseBasicBolt{@Overridepublicvoidexecute(Tupleinput,BasicOutputCollectorcollector){System.out.println(input.toString());}@OverridepublicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){System.out.println("Methoddeclare");}}publicstaticvoidmain(String[]args)throwsIOException{//首先,我们必须建立一个新的TopologyBuilderTopologyBuilderbuilder=newTopologyBuilder();//其次,我们需要配置如下的组件:1Spout,2Boltbuilder.setSpout("word-emit-byThread",newTestWordSpout());//在这个Spout之中,我们约定将【word-emit-byThread】Spout组件发射的元祖进行shuffleGroupingbuilder.setBolt("word-show",newTestSimpleBolt()).shuffleGrouping("word-emit-byThread");Configconfig=newConfig();config.setDebug(false);//最后进行本地提交LocalClustercluster=newLocalCluster();cluster.submitTopology("simple",config,builder.createTopology());}}
以上,
testWordSpoutTopology
是我们运行的主类
packagestorm.copyFromClass;importbacktype.storm.Config;importbacktype.storm.topology.OutputFieldsDeclarer;importjava.util.Map;importbacktype.storm.spout.SpoutOutputCollector;importbacktype.storm.task.TopologyContext;importbacktype.storm.topology.base.BaseRichSpout;importbacktype.storm.tuple.Fields;importbacktype.storm.tuple.Values;importbacktype.storm.utils.Utils;importjava.util.HashMap;importjava.util.Random;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;//publicclassTestWordSpoutextendsBaseRichSpout{publicstaticLoggerLOG=LoggerFactory.getLogger(TestWordSpout.class);boolean_isDistributed;SpoutOutputCollector_collector;publicTestWordSpout(){this(true);}publicTestWordSpout(booleanisDistributed){_isDistributed=isDistributed;}publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){_collector=collector;}publicvoidclose(){}//发送publicvoidnextTuple(){Utils.sleep(100);finalString[]words=newString[]{"张兵","吴哥","仝志维","前辈","禅师"};finalRandomrand=newRandom();finalStringword=words[rand.nextInt(words.length)];_collector.emit(newValues(word));}//在这里,我们没有进行ACKpublicvoidack(ObjectmsgId){}//在这里,我们没有进行failpublicvoidfail(ObjectmsgId){}publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){declarer.declare(newFields("word"));}@OverridepublicMap<String,Object>getComponentConfiguration(){if(!_isDistributed){Map<String,Object>ret=newHashMap<String,Object>();ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM,1);returnret;}else{returnnull;}}}
结果:
请注意在这里,我们的Stream 默认的id为空
到此,关于“Storm开发细节是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注恰卡编程网网站,小编会继续努力为大家带来更多实用的文章!
推荐阅读
-
storm实时排序TopN怎么使用
-
Storm怎么写一个爬虫
Storm怎么写一个爬虫这篇文章主要讲解了“Storm怎么写一个爬...
-
怎么用Storm IPResolutionBolt写爬虫
怎么用StormIPResolutionBolt写爬虫本篇内容介...
-
Storm RandomURLSpout怎么使用
StormRandomURLSpout怎么使用本篇内容介绍了“S...
-
如何用Storm来写一个Crawler的工具
-
Storm的Acker机制是什么
Storm的Acker机制是什么这篇文章主要讲解了“Storm的A...
-
storm drpc怎么定义
stormdrpc怎么定义本篇内容介绍了“stormdrpc怎...
-
storm使用要注意哪些点
storm使用要注意哪些点这篇文章主要讲解了“storm使用要注意...
-
storm topology优化思路是什么
stormtopology优化思路是什么本篇内容主要讲解“sto...
-
Storm的Grouping有哪些
Storm的Grouping有哪些这篇文章主要介绍“Storm的G...