Flink的函数有哪些

Flink的函数有哪些

这篇文章主要介绍了Flink的函数有哪些,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

1. Map:将数据流中的数据进行一个转化,形成一个新的数据流,消费一个元素,并且产生一个元素

Flink的函数有哪些

具体代码实现

packagecom.wudl.core;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/***@versionv1.0*@ProjectNameFlinklearning*@ClassNameWordMap*@DescriptionTODOmap算子实例*@Date2020/10/2910:15*/publicclassWordMap{/***@paramargs*Map函数的用法*映射:将数据流中的数据进行一个转化,形成一个新的数据流,消费一个元素,并且产生一个元素*参数:Lambda表达式或者,newMapFunction实现类*返回值:DataStream*/publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setMaxParallelism(1);env.socketTextStream("10.204.125.140",8899).map(newMapFunction<String,String>(){@OverridepublicStringmap(Strings)throwsException{String[]split=s.split(",");returnsplit[0]+"---"+split[1];}}).print();env.execute();}}

2. FlatMap:

将数据流中的整体拆分成一个一个的个体使用,消费一个元素并产生零到多个元素

packagecom.wudl.core;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;importjava.util.Arrays;importjava.util.List;/***@versionv1.0*@ProjectNameFlinklearning*@ClassNameTransformFlatMap*@DescriptionTODOFlatMap**FlatMap:是一种扁平的映射,将数据流中的整体拆分成为一个个的个体使用,消费后的元素产生零到多个元素****@Authorwudl*@Date2020/10/2910:46***函数FlatMap*将数据流中的整体拆分成一个一个的个体使用,消费一个元素并产生零到多个元素*参数:lambda表达式或者是FlatFunction的实现类*返回值:DataStream****/publicclassTransformFlatMap{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//DataStreamSource<List<Integer>>listDs=env.fromCollection(Arrays.asList(//Arrays.asList(1,2,3),//Arrays.asList(3,4,5),//Arrays.asList(8,9,0)//));//listDs.flatMap(newFlatMapFunction<List<Integer>,Integer>(){//@Override//publicvoidflatMap(List<Integer>list,Collector<Integer>collector)throwsException{////for(Integernumber:list){//collector.collect(number+100);//}////}//}).print();DataStreamSource<String>strDs=env.socketTextStream("10.204.125.140",8899);strDs.flatMap(newFlatMapFunction<String,String>(){@OverridepublicvoidflatMap(Strings,Collector<String>collector)throwsException{String[]split=s.split(",");collector.collect(split[0]+split[1]);}}).print();env.execute();}}

第三种:Filter 对数据流的过滤根据指定的规则将满足条件的(true) 的数据保留, 不瞒住条件的(false) 将丢弃

packagecom.wudl.core;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/***@versionv1.0*@ProjectNameFlinklearning*@ClassNameTransformFilter*@DescriptionTODO流的过滤*@Date2020/11/510:26*/publicclassTransformFilter{/***函数中Filter中过滤*过滤:根据指定的规则将满足条件的(true)的数据保留,不瞒住条件的(false)将丢弃*返回值:DataStream*/publicstaticvoidmain(String[]args)throwsException{//1.获取上下文的环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();//2.设置并行度env.setParallelism(1);//3.获取数据流DataStreamSource<String>SourceDs=env.socketTextStream("10.204.125.140",8899);//4.过滤数据流DataStream<String>filter=SourceDs.filter(newFilterFunction<String>(){@Overridepublicbooleanfilter(Stringvalue)throwsException{String[]split=value.split(",");returnsplit[1].length()>3;}});filter.print();env.execute();}}

感谢你能够认真阅读完这篇文章,希望小编分享的“Flink的函数有哪些”这篇文章对大家有帮助,同时也希望大家多多支持恰卡编程网,关注恰卡编程网行业资讯频道,更多相关知识等着你来学习!

发布于 2021-12-28 22:21:17
收藏
分享
海报
0 条评论
58
上一篇:Caffe中的损失函数怎么用 下一篇:如何使用Python实现计数器Counter
目录

    0 条评论

    本站已关闭游客评论,请登录或者注册后再评论吧~

    忘记密码?

    图形验证码