解释filter函数的工作原理(FilterFlinkDataStream)
声明:本文基于Flink1.14.0版本
主要内容
1、Filter使用说明
2、FilterMapFunction说明
3、示例代码
Filter使用说明
DataStream → DataStream
说明:针对数据流中的每一条数据,应用返回boolean类型的函数,如果返回true,则保留这条数据,如果返回false,则丢弃这条数据。
使用:
1、创建实现FilterFunction接口的类,并重写filter方法
2、在DataStream上调用filter方法,传入参数为实现FilterFunction接口的类实例。
FilterMapFunction说明
接口:FilterFunction<T> 参数类型为输入类型
方法:boolean filter(T var1) 方法参数为待处理的数据,返回boolean类型
示例代码
输入字符串长度>5,则返回该字符串
package flink.datastream.operators;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkFilterTest {
public static void main(String[] args) throws Exception {
//1、创建运行时环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2、加载socket数据源
DataStream<String> ds1 = env.socketTextStream("localhost",9999);
//3、在ds1上应用flatMap,将输入的每一条数据根据分隔符拆分成多条数据
DataStream<String> ds2 = ds1.filter(new FilterFunc());
//4、打印ds2
ds2.print();
//5、触发程序执行
env.execute();
}
public static class FilterFunc implements FilterFunction<String>{
@Override
public boolean filter(String s) throws Exception {
return s.length()>5;
}
}
}
运行结果
输入数据
输出结果
学习交流,互相促进,文中问题,欢迎指正!!!
,免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com