valconf = newSparkConf().setAppName("Test").setMaster("local[*]") valsc = newSparkContext(conf) /* Action 算子*/ //集合函数 valrdd1 = sc.parallelize(List(2,1,3,6,5),2) valrdd1_1= rdd1.reduce(_+_) println(rdd1_1) //以数组的形式返回数据集的所有元素 println(rdd1.collect().toBuffer) //返回RDD的元素个数 println(rdd1.count()) //取出对应数量的值 默认降序, 若输入0 会返回一个空数组 println(rdd1.top(3).toBuffer) //顺序取出对应数量的值 println(rdd1.take(3).toBuffer) //顺序取出对应数量的值 默认生序 println(rdd1.takeOrdered(3).toBuffer) //获取第一个值 等价于 take(1) println(rdd1.first()) //将处理过后的数据写成文件(存储在HDFS或本地文件系统) //rdd1.saveAsTextFile("dir/file1") //统计key的个数并生成map k是key名 v是key的个数 valrdd2 = sc.parallelize(List(("key1",2),("key2",1),("key3",3),("key4",6),("key5",5)),2) valrdd2_1: collection.Map[String, Long] = rdd2.countByKey() println(rdd2_1) //遍历数据 rdd1.foreach(x => println(x))
/*其他算子*/ //统计value的个数 但是会将集合中的一个元素看做是一个vluae valvalue: collection.Map[(String, Int), Long] = rdd2.countByValue println(value) //filterByRange:对RDD中的元素进行过滤,返回指定范围内的数据 valrdd3 = sc.parallelize(List(("e",5),("c",3),("d",4),("c",2),("a",1))) valrdd3_1: RDD[(String, Int)] = rdd3.filterByRange("c","e")//包括开始和结束的 println(rdd3_1.collect.toList) //flatMapValues对参数进行扁平化操作,是value的值 valrdd3_2= sc.parallelize(List(("a","1 2"),("b","3 4"))) println( rdd3_2.flatMapValues(_.split(" ")).collect.toList) //foreachPartition 循环的是分区数据 // foreachPartiton一般应用于数据的持久化,存入数据库,可以进行分区的数据存储 valrdd4 = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3) rdd4.foreachPartition(x => println(x.reduce(_+_))) //keyBy 以传入的函数返回值作为key ,RDD中的元素为value 新的元组 valrdd5 = sc.parallelize(List("dog","cat","pig","wolf","bee"),3) valrdd5_1: RDD[(Int, String)] = rdd5.keyBy(_.length) println(rdd5_1.collect.toList) //keys获取所有的key values 获取所有的values println(rdd5_1.keys.collect.toList) println(rdd5_1.values.collect.toList) //collectAsMap 将需要的二元组转换成Map valmap: collection.Map[String, Int] = rdd2.collectAsMap() println(map) |