怎么使用Storm

怎么使用Storm

这篇文章主要介绍“怎么使用Storm”,在日常操作中,相信很多人在怎么使用Storm问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”怎么使用Storm”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

项目Pom(Storm jar没有提交到Maven中央仓库,需要在项目中加入下面的仓库地址):

怎么使用Storm

<repositories><repository><id>central</id><name>MavenRepositorySwitchboard</name><layout>default</layout><url>http://maven.oschina.net/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></repository><repository><id>clojars</id><url>https://clojars.org/repo/</url><snapshots><enabled>false</enabled></snapshots><releases><enabled>true</enabled></releases></repository></repositories><dependencies><dependency><groupId>org.yaml</groupId><artifactId>snakeyaml</artifactId><version>1.13</version></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.3.3</version></dependency><dependency><groupId>org.clojure</groupId><artifactId>clojure</artifactId><version>1.5.1</version></dependency><dependency><groupId>storm</groupId><artifactId>storm</artifactId><version>0.9.0.1</version></dependency><dependency><groupId>storm</groupId><artifactId>libthrift7</artifactId><version>0.7.0</version></dependency></dependencies>

下面是一个Storm的HelloWord的例子,代码有删减,熟悉Storm的读者自然能把代码组织成一个完整的例子。

publicstaticvoidmain(String[]args){Configconf=newConfig();conf.put(Config.STORM_LOCAL_DIR,"/Volumes/Study/data/storm");conf.put(Config.STORM_CLUSTER_MODE,"local");//conf.put("storm.local.mode.zmq","false");conf.put("storm.zookeeper.root","/storm");conf.put("storm.zookeeper.session.timeout",50000);conf.put("storm.zookeeper.servers","nowledgedata-n15");conf.put("storm.zookeeper.port",2181);//conf.setDebug(true);//conf.setNumWorkers(2);TopologyBuilderbuilder=newTopologyBuilder();builder.setSpout("words",newTestWordSpout(),2);builder.setBolt("exclaim2",newDefaultStringBolt(),5).shuffleGrouping("words");LocalClustercluster=newLocalCluster();cluster.submitTopology("test",conf,builder.createTopology());}

  • Config.STORM_LOCAL_DIR是配置一个本地路径,Storm会在这个路径写入一些配置信息和临时数据。

  • Config.STORM_CLUSTER_MODE是运行模式,local和distributed两个选项,即本地模式和分布式模式。本地模式在运行时时多线程模拟的,开发测试用;分布式模式在分布式集群下是多进程的,真正的分布式。

  • Storm的Spout和Blot高可用是通过ZooKeeper协调的,storm.zookeeper.root是一个ZooKeeper地址,并且有对应的端口号

  • Debug是测试模式,有更详细的日志信息。

TestWordSpout是一个Storm自带的例子,用来随机的产生<code>new String[] {"nathan", "mike", "jackson", "golda", "bertels"};</code>列表中的字符串,用来提供数据源。

其中DefaultStringBolt的源码:

OutputCollectorcollector;publicvoidprepare(Mapconf,TopologyContextcontext,OutputCollectorcollector){this.collector=collector;}publicvoidexecute(Tupletuple){log.info("revamessage:"+tuple.getString(0));collector.emit(tuple,newValues(tuple.getString(0)+"!!!"));collector.ack(tuple);}

运行日志:

10658 [Thread-29-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 10658 [Thread-31-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 10758 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike 10758 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 10859 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 10859 [Thread-29-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels 10961 [Thread-31-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 10961 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 11061 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 11062 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 11162 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels 11163 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson

数据由一个Storm叫做喷嘴(Spout,也相当一个水龙头,能产生数据的来源端)产生,然后传递给后端一连串的的Blot,最终被转换和消费。而Spout和Blot都是并行的,并行度都可以自己设置(本地运行是靠多线程模拟的)。如:

builder.setSpout("words",newTestWordSpout(),2);builder.setBolt("exclaim2",newDefaultStringBolt(),5)

喷嘴TestWordSpout的并行度是2,DefaultStringBolt的并行度是5.

从日志可以看出,数据经过喷嘴到达预先定于的一个Blot,打印了日志。我测试代码设置的并行度是5,日志中统计,确实是5个线程:

  1. Thread-29-exclaim2

  2. Thread-31-exclaim2

  3. Thread-26-exclaim2

  4. Thread-33-exclaim2

  5. Thread-35-exclaim2

借用OSC网友的话说,Hadoop就是商场里自动升降式的电梯,用户需要排队等待,选按楼层,然后到达;而Storm就像是自动扶梯,扶梯预先设置好运行后,来人就立即运走,目的地是明确的。

Storm按我的理解,Storm和Hadoop是完全不同的,设计上也没有半点拟合的部分。Storm更像是我之前介绍过的Spring Integration,是一个数据流系统。它能把数据按照预设定的流程,把数据做各种转换,传递,分解,合并,最后数据到达后端存储。只不过Storm是可以分布式,而且分布式的能力也是可以自己设置。

Storm的这种特性很适合大数据类的ETL系统开发。

到此,关于“怎么使用Storm”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注恰卡编程网网站,小编会继续努力为大家带来更多实用的文章!

发布于 2021-12-23 21:21:15
收藏
分享
海报
0 条评论
55
上一篇:linux如何查看目录 下一篇:Storm的Grouping有哪些
目录

    0 条评论

    本站已关闭游客评论,请登录或者注册后再评论吧~

    忘记密码?

    图形验证码