如何将数据按指定格式存入zookeeper

如何将数据按指定格式存入zookeeper

这篇文章主要讲解了“如何将数据按指定格式存入zookeeper”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何将数据按指定格式存入zookeeper”吧!

环境:

scala版本:2.11.8

zookeeper版本:3.4.5-cdh6.7.0

packagecom.ruozedata.zkimportjava.util.concurrent.TimeUnitimportorg.apache.curator.framework.CuratorFrameworkFactoryimportorg.apache.curator.framework.recipes.locks.InterProcessMuteximportorg.apache.curator.retry.ExponentialBackoffRetryimportorg.slf4j.LoggerFactoryimportscala.collection.JavaConversions._importscala.collection.mutable/***Createdbyganweion2018/08/21*要求:*1通过storeOffsets方法把数据存入zookeeper中。*存储格式:*/consumers/G322/offsets/ruoze_offset_topic/partition/0*/consumers/G322/offsets/ruoze_offset_topic/partition/1*/consumers/G322/offsets/ruoze_offset_topic/partition/2*2通过obtainOffsets方法把存入的数据读取出来*输出格式:*topic:ruoze_offset_topicpartition:0offset:7*topic:ruoze_offset_topicpartition:1offset:3*topic:ruoze_offset_topicpartition:2offset:5*/objectZkConnectApp{valLOG=LoggerFactory.getLogger(ZkConnectApp.getClass)valclient={valclient=CuratorFrameworkFactory.builder.connectString("172.16.100.31:2181").retryPolicy(newExponentialBackoffRetry(1000,3)).namespace("consumers").build()client.start()client}deflock(path:String)(body:=>Unit){vallock=newInterProcessMutex(client,path)lock.acquire()try{body}finally{lock.release()}}deftryDo(path:String)(body:=>Unit):Boolean={vallock=newInterProcessMutex(client,path)if(!lock.acquire(10,TimeUnit.SECONDS)){LOG.info(s"不能获得锁{$path},已经有任务在运行,本次任务退出")returnfalse}try{LOG.info("获准运行")bodytrue}finally{lock.release()LOG.info(s"释放锁{$path}")}}//zookeeper创建路径defensurePathExists(path:String):Unit={if(client.checkExists().forPath(path)==null){client.create().creatingParentsIfNeeded().forPath(path)}}/***OffsetRange类定义(偏移量对象)*用于存储偏移量*/caseclassOffsetRange(valtopic:String,//主题valpartition:Int,//分区valfromOffset:Long,//起始偏移量valutilOffset:Long//终止偏移量)/***zookeeper存储offset的方法*写入格式:*/consumers/G322/offsets/ruoze_offset_topic/partition/0*/consumers/G322/offsets/ruoze_offset_topic/partition/1*/consumers/G322/offsets/ruoze_offset_topic/partition/2*@paramOffsetsRanges*@paramgroupName*/defstoreOffsets(OffsetsRanges:Array[OffsetRange],groupName:String)={valoffsetRootPath=s"/"+groupNameif(client.checkExists().forPath(offsetRootPath)==null){client.create().creatingParentsIfNeeded().forPath(offsetRootPath)}for(els<-OffsetsRanges){valdata=String.valueOf(els.utilOffset).getBytesvalpath=s"$offsetRootPath/offsets/${els.topic}/partition/${els.partition}"//创建路径ensurePathExists(path)//写入数据client.setData().forPath(path,data)}}/***TopicAndPartition类定义(偏移量key对象)*用于提取偏移量*/caseclassTopicAndPartition(topic:String,//主题partition:Int//分区)/***zookeeper提取offset的方法*@paramtopic*@paramgroupName*@return*/defobtainOffsets(topic:String,groupName:String):Map[TopicAndPartition,Long]={//定义一个空的HashMapvalmaps=mutable.HashMap[TopicAndPartition,Long]()//offset的路径valoffsetRootPath=s"/"+groupName+"/offsets/"+topic+"/partition"//判断路径是否存在valstat=client.checkExists().forPath(s"$offsetRootPath")if(stat==null){println(stat)//路径不存在就将路径打印在控制台,检查路径}else{//获取offsetRootPath路径下一级的所有子目录//我们这里是获取的所有分区valchildren=client.getChildren.forPath(s"$offsetRootPath")//遍历所有的分区for(lines<-children){//获取分区的数据valdata=newString(client.getData().forPath(s"$offsetRootPath/"+lines)).toLong//将topicpartition和数据赋值给mapsmaps(TopicAndPartition(topic,lines.toInt))=data}}//按partition排序后返回map对象maps.toList.sortBy(_._1.partition).toMap}defmain(args:Array[String]){//定义初始化数据valoff1=OffsetRange("ruoze_offset_topic",0,0,7)valoff2=OffsetRange("ruoze_offset_topic",1,0,3)valoff3=OffsetRange("ruoze_offset_topic",2,0,5)valarr=Array(off1,off2,off3)//获取到namespace//println(client.getNamespace)//创建路径//valoffsetRootPath="/G322"//if(client.checkExists().forPath(offsetRootPath)==null){//client.create().creatingParentsIfNeeded().forPath(offsetRootPath)//}//存储值storeOffsets(arr,"G322")//获取值/***输出格式:*topic:ruoze_offset_topicpartition:0offset:7*topic:ruoze_offset_topicpartition:1offset:3*topic:ruoze_offset_topicpartition:2offset:5*/valresult=obtainOffsets("ruoze_offset_topic","G322")for(map<-result){println("topic:"+map._1.topic+"\t"+"partition:"+map._1.partition+"\t"+"offset:"+map._2)}}}

感谢各位的阅读,以上就是“如何将数据按指定格式存入zookeeper”的内容了,经过本文的学习后,相信大家对如何将数据按指定格式存入zookeeper这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是恰卡编程网,小编将为大家推送更多相关知识点的文章,欢迎关注!

发布于 2021-12-22 21:57:12
分享
海报
43
上一篇:JDBC怎么实现验证登录 下一篇:常见浏览器兼容问题有哪些
目录

    忘记密码?

    图形验证码