spark作业怎么实现
spark作业怎么实现
这篇“spark作业怎么实现”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“spark作业怎么实现”文章吧。
将sample.log的数据发送到Kafka中,经过SparkStreaming处理,将数据格式变为以下形式:commandid|houseid|gathertime|srcip|destip|srcport|destport|domainname|proxytype|proxyip|proxytype|title|content|url|logid在发送到kafka的另一个队列中要求:1、sample.log=>读文件,将数据发送到kafka队列中2、从kafka队列中获取数据(0.10接口不管理offset),变更数据格式3、处理后的数据在发送到kafka另一个队列中分析1使用课程中的redis工具类管理offset2读取日志数据发送数据到topic13消费主题,将数据的分割方式修改为竖线分割,再次发送到topic2
1.OffsetsWithRedisUtils
packagehome.oneimportjava.utilimportorg.apache.kafka.common.TopicPartitionimportorg.apache.spark.streaming.kafka010.OffsetRangeimportredis.clients.jedis.{Jedis,JedisPool,JedisPoolConfig}importscala.collection.mutableobjectOffsetsWithRedisUtils{//定义Redis参数privatevalredisHost="linux123"privatevalredisPort=6379//获取Redis的连接privatevalconfig=newJedisPoolConfig//最大空闲数config.setMaxIdle(5)//最大连接数config.setMaxTotal(10)privatevalpool=newJedisPool(config,redisHost,redisPort,10000)privatedefgetRedisConnection:Jedis=pool.getResourceprivatevaltopicPrefix="kafka:topic"//Key:kafka:topic:TopicName:groupidprivatedefgetKey(topic:String,groupid:String)=s"$topicPrefix:$topic:$groupid"//根据key获取offsetsdefgetOffsetsFromRedis(topics:Array[String],groupId:String):Map[TopicPartition,Long]={valjedis:Jedis=getRedisConnectionvaloffsets:Array[mutable.Map[TopicPartition,Long]]=topics.map{topic=>valkey=getKey(topic,groupId)importscala.collection.JavaConverters._//将获取到的redis数据由Java的map转换为scala的map,数据格式为{key:[{partition,offset}]}jedis.hgetAll(key).asScala.map{case(partition,offset)=>newTopicPartition(topic,partition.toInt)->offset.toLong}}//归还资源jedis.close()offsets.flatten.toMap}//将offsets保存到Redis中defsaveOffsetsToRedis(offsets:Array[OffsetRange],groupId:String):Unit={//获取连接valjedis:Jedis=getRedisConnection//组织数据offsets.map{range=>(range.topic,(range.partition.toString,range.untilOffset.toString))}.groupBy(_._1).foreach{case(topic,buffer)=>valkey:String=getKey(topic,groupId)importscala.collection.JavaConverters._//同样将scala的map转换为Java的map存入redis中valmaps:util.Map[String,String]=buffer.map(_._2).toMap.asJava//保存数据jedis.hmset(key,maps)}jedis.close()}}
KafkaProducer
packagehome.oneimportjava.util.Propertiesimportorg.apache.kafka.clients.producer.{KafkaProducer,ProducerConfig,ProducerRecord}importorg.apache.kafka.common.serialization.StringSerializerimportorg.apache.log4j.{Level,Logger}importorg.apache.spark.rdd.RDDimportorg.apache.spark.{SparkConf,SparkContext}objectKafkaProducer{defmain(args:Array[String]):Unit={Logger.getLogger("org").setLevel(Level.ERROR)valconf=newSparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")valsc=newSparkContext(conf)//读取sample.log文件数据vallines:RDD[String]=sc.textFile("data/sample.log")//定义kafkaproducer参数valprop=newProperties()//kafka的访问地址prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux121:9092")//key和value的序列化方式prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])//将读取到的数据发送到mytopic1lines.foreachPartition{iter=>//初始化KafkaProducervalproducer=newKafkaProducer[String,String](prop)iter.foreach{line=>//封装数据valrecord=newProducerRecord[String,String]("mytopic1",line)//发送数据producer.send(record)}producer.close()}}}
3.HomeOne
packagehome.oneimportjava.util.Propertiesimportorg.apache.kafka.clients.consumer.{ConsumerConfig,ConsumerRecord}importorg.apache.kafka.clients.producer.{KafkaProducer,ProducerConfig,ProducerRecord}importorg.apache.kafka.common.serialization.{StringDeserializer,StringSerializer}importorg.apache.log4j.{Level,Logger}importorg.apache.spark.SparkConfimportorg.apache.spark.streaming.dstream.InputDStreamimportorg.apache.spark.streaming.kafka010._importorg.apache.spark.streaming.{Seconds,StreamingContext}objectHomeOne{vallog=Logger.getLogger(this.getClass)defmain(args:Array[String]):Unit={Logger.getLogger("org").setLevel(Level.ERROR)valconf=newSparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")valssc=newStreamingContext(conf,Seconds(5))//需要消费的topicvaltopics:Array[String]=Array("mytopic1")valgroupid="mygroup1"//定义kafka相关参数valkafkaParams:Map[String,Object]=getKafkaConsumerParameters(groupid)//从Redis获取offsetvalfromOffsets=OffsetsWithRedisUtils.getOffsetsFromRedis(topics,groupid)//创建DStreamvaldstream:InputDStream[ConsumerRecord[String,String]]=KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,//从kafka中读取数据ConsumerStrategies.Subscribe[String,String](topics,kafkaParams,fromOffsets))//转换后的数据发送到另一个topicdstream.foreachRDD{rdd=>if(!rdd.isEmpty){//获取消费偏移量valoffsetRanges:Array[OffsetRange]=rdd.asInstanceOf[HasOffsetRanges].offsetRanges//处理数据发送到topic2rdd.foreachPartition(process)//将offset保存到RedisOffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges,groupid)}}//启动作业ssc.start()//持续执行ssc.awaitTermination()}//将处理后的数据发送到topic2defprocess(iter:Iterator[ConsumerRecord[String,String]])={iter.map(line=>parse(line.value)).filter(!_.isEmpty).foreach(line=>sendMsg2Topic(line,"mytopic2"))}//调用kafka生产者发送消息defsendMsg2Topic(msg:String,topic:String):Unit={valproducer=newKafkaProducer[String,String](getKafkaProducerParameters())valrecord=newProducerRecord[String,String](topic,msg)producer.send(record)}//修改数据格式,将逗号分隔变成竖线分割defparse(text:String):String={try{valarr=text.replace("<<<!>>>","").split(",")if(arr.length!=15)return""arr.mkString("|")}catch{casee:Exception=>log.error("解析数据出错!",e)""}}//定义kafka消费者的配置信息defgetKafkaConsumerParameters(groupid:String):Map[String,Object]={Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"linux121:9092",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.GROUP_ID_CONFIG->groupid,ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG->(false:java.lang.Boolean),)}//定义生产者的kafka配置defgetKafkaProducerParameters():Properties={valprop=newProperties()prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux121:9092")prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])prop}}
2
/*假设机场的数据如下:1,"SFO"2,"ORD"3,"DFW"机场两两之间的航线及距离如下:1,2,18002,3,8003,1,1400用GraphX完成以下需求:求所有的顶点求所有的边求所有的triplets求顶点数求边数求机场距离大于1000的有几个,有哪些按所有机场之间的距离排序(降序),输出结果*/
代码:
importorg.apache.spark.{SparkConf,SparkContext}importorg.apache.spark.graphx.{Edge,Graph,VertexId}importorg.apache.spark.rdd.RDDobjectTwoHome{defmain(args:Array[String]):Unit={//初始化valconf=newSparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")valsc=newSparkContext(conf)sc.setLogLevel("warn")//初始化数据valvertexArray:Array[(Long,String)]=Array((1L,"SFO"),(2L,"ORD"),(3L,"DFW"))valedgeArray:Array[Edge[Int]]=Array(Edge(1L,2L,1800),Edge(2L,3L,800),Edge(3L,1L,1400))//构造vertexRDD和edgeRDDvalvertexRDD:RDD[(VertexId,String)]=sc.makeRDD(vertexArray)valedgeRDD:RDD[Edge[Int]]=sc.makeRDD(edgeArray)//构造图valgraph:Graph[String,Int]=Graph(vertexRDD,edgeRDD)//所有的顶点println("所有顶点:")graph.vertices.foreach(println)//所有的边println("所有边:")graph.edges.foreach(println)//所有的tripletsprintln("所有三元组信息:")graph.triplets.foreach(println)//求顶点数valvertexCnt=graph.vertices.count()println(s"总顶点数:$vertexCnt")//求边数valedgeCnt=graph.edges.count()println(s"总边数:$edgeCnt")//机场距离大于1000的println("机场距离大于1000的边信息:")graph.edges.filter(_.attr>1000).foreach(println)//按所有机场之间的距离排序(降序)println("降序排列所有机场之间距离")graph.edges.sortBy(-_.attr).collect().foreach(println)}}
以上就是关于“spark作业怎么实现”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注恰卡编程网行业资讯频道。
推荐阅读
-
Hello Spark! | Spark,从入门到精通
-
Spark Streaming+Kafka的嵌入方式及应用
-
怎么让spark sql写mysql时支持update操作
-
Spark SQL怎么用
SparkSQL怎么用这篇文章主要介绍“SparkSQL怎么用...
-
Spark的相关问题有哪些
Spark的相关问题有哪些这篇文章主要介绍“Spark的相关问题有...
-
Spark SQL中怎么创建DataFrames
-
Redis怎么让Spark提速
-
Spark 3.0怎么实现event logs滚动
Spark3.0怎么实现eventlogs滚动本篇内容介绍了“...
-
Spark是怎样工作的
-
Spark Graphx怎么求社交网络中的最大年纪追求者
SparkGraphx怎么求社交网络中的最大年纪追求者本篇内容介...