storm drpc怎么定义

storm drpc怎么定义

本篇内容介绍了“storm drpc怎么定义”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

代码:

storm drpc怎么定义

packagemain.java;importmain.java.bolt.RequestCounter;importbacktype.storm.Config;importbacktype.storm.LocalCluster;importbacktype.storm.LocalDRPC;importbacktype.storm.StormSubmitter;importbacktype.storm.drpc.LinearDRPCTopologyBuilder;importbacktype.storm.utils.DRPCClient;/***DRPCexample**@authorsjyu**/publicclassDRPCTopologyMain{publicstaticvoidmain(String[]args)throwsException{//LocalDRPCdrpc=newLocalDRPC();DRPCClientdrpc=newDRPCClient("192.168.1.240",3772);LinearDRPCTopologyBuilderbuilder=newLinearDRPCTopologyBuilder("test_func");builder.addBolt(newRequestCounter(),2);Configconf=newConfig();conf.setDebug(true);StormSubmitter.submitTopology("drpc_test",conf,builder.createRemoteTopology());//LocalClustercluster=newLocalCluster();//cluster.submitTopology("local_cluster",conf,builder.createLocalTopology(drpc));Stringstr=drpc.execute("test_func","thisisatest");//这边drpc的client和server写在一起了,不知道可不可以写在两个进程里,//想像中应该没问题,就像网络编程一样,但是行不行还有待验证。System.out.println(str);}}

packagemain.java.bolt;importbacktype.storm.topology.BasicOutputCollector;importbacktype.storm.topology.OutputFieldsDeclarer;importbacktype.storm.topology.base.BaseBasicBolt;importbacktype.storm.tuple.Fields;importbacktype.storm.tuple.Tuple;importbacktype.storm.tuple.Values;publicclassRequestCounterextendsBaseBasicBolt{//Objectid=newObject();这边好像不能定义一个变量,不然就报错,//不知道是storm的原因还是java本来就不能这样(我觉得我似乎要去学学java了--)@Overridepublicvoidexecute(Tupleinput,BasicOutputCollectorcollector){Stringstr=(String)input.getString(1);collector.emit(newValues(input.getValue(0),str));}@OverridepublicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){declarer.declare(newFields("id","result"));}}

“storm drpc怎么定义”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注恰卡编程网网站,小编将为大家输出更多高质量的实用文章!

发布于 2021-12-23 21:21:26
收藏
分享
海报
0 条评论
43
上一篇:storm使用要注意哪些点 下一篇:Storm的Acker机制是什么
目录

    0 条评论

    本站已关闭游客评论,请登录或者注册后再评论吧~

    忘记密码?

    图形验证码