Storm开发细节是什么

Storm开发细节是什么

这篇文章主要介绍“Storm开发细节是什么”,在日常操作中,相信很多人在Storm开发细节是什么问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Storm开发细节是什么”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

packagetest;importjava.io.IOException;importjava.util.Map;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importstorm.copyFromClass.TestWordSpout;importcom.esotericsoftware.minlog.Log;importbacktype.storm.Config;importbacktype.storm.LocalCluster;importbacktype.storm.task.OutputCollector;importbacktype.storm.task.TopologyContext;importbacktype.storm.topology.BasicOutputCollector;importbacktype.storm.topology.OutputFieldsDeclarer;importbacktype.storm.topology.TopologyBuilder;importbacktype.storm.topology.base.BaseBasicBolt;importbacktype.storm.topology.base.BaseRichBolt;importbacktype.storm.tuple.Tuple;//测试目的,在这里我们需要测试一下当前Spout不断产生数据的过程publicclasstestWordSpoutTopology{publicstaticclassTestSimpleBoltextendsBaseBasicBolt{@Overridepublicvoidexecute(Tupleinput,BasicOutputCollectorcollector){System.out.println(input.toString());}@OverridepublicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){System.out.println("Methoddeclare");}}publicstaticvoidmain(String[]args)throwsIOException{//首先,我们必须建立一个新的TopologyBuilderTopologyBuilderbuilder=newTopologyBuilder();//其次,我们需要配置如下的组件:1Spout,2Boltbuilder.setSpout("word-emit-byThread",newTestWordSpout());//在这个Spout之中,我们约定将【word-emit-byThread】Spout组件发射的元祖进行shuffleGroupingbuilder.setBolt("word-show",newTestSimpleBolt()).shuffleGrouping("word-emit-byThread");Configconfig=newConfig();config.setDebug(false);//最后进行本地提交LocalClustercluster=newLocalCluster();cluster.submitTopology("simple",config,builder.createTopology());}}

以上,

testWordSpoutTopology

是我们运行的主类

packagestorm.copyFromClass;importbacktype.storm.Config;importbacktype.storm.topology.OutputFieldsDeclarer;importjava.util.Map;importbacktype.storm.spout.SpoutOutputCollector;importbacktype.storm.task.TopologyContext;importbacktype.storm.topology.base.BaseRichSpout;importbacktype.storm.tuple.Fields;importbacktype.storm.tuple.Values;importbacktype.storm.utils.Utils;importjava.util.HashMap;importjava.util.Random;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;//publicclassTestWordSpoutextendsBaseRichSpout{publicstaticLoggerLOG=LoggerFactory.getLogger(TestWordSpout.class);boolean_isDistributed;SpoutOutputCollector_collector;publicTestWordSpout(){this(true);}publicTestWordSpout(booleanisDistributed){_isDistributed=isDistributed;}publicvoidopen(Mapconf,TopologyContextcontext,SpoutOutputCollectorcollector){_collector=collector;}publicvoidclose(){}//发送publicvoidnextTuple(){Utils.sleep(100);finalString[]words=newString[]{"张兵","吴哥","仝志维","前辈","禅师"};finalRandomrand=newRandom();finalStringword=words[rand.nextInt(words.length)];_collector.emit(newValues(word));}//在这里,我们没有进行ACKpublicvoidack(ObjectmsgId){}//在这里,我们没有进行failpublicvoidfail(ObjectmsgId){}publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){declarer.declare(newFields("word"));}@OverridepublicMap<String,Object>getComponentConfiguration(){if(!_isDistributed){Map<String,Object>ret=newHashMap<String,Object>();ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM,1);returnret;}else{returnnull;}}}

结果:

请注意在这里,我们的Stream 默认的id为空

到此,关于“Storm开发细节是什么”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注恰卡编程网网站,小编会继续努力为大家带来更多实用的文章!

发布于 2021-12-23 21:21:28
收藏
分享
海报
0 条评论
59
上一篇:Storm的Acker机制是什么 下一篇:如何用Storm来写一个Crawler的工具
目录

    0 条评论

    本站已关闭游客评论,请登录或者注册后再评论吧~

    忘记密码?

    图形验证码