flink深度解析(Flink操练二十之并行度使用讲解)
package one;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @program: Flink_learn
* @description: 并行度的设置
* 针对每个算子设置的并行度的优先级高于全局并行度
* 本程序需要两个任务插槽
* @author: Mr.逗
* @create: 2021-09-14 15:40
**/
public class Example3 {
public static void main(String[] args) {
// 获取流处理的运行时环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行任务的数量为1
// 需要1个任务插槽
env.setParallelism(1);
//读取数据源
// 并行度设置为1
DataStreamSource<String> stream = env.fromElements("hello world", "hello world").setParallelism(1);
// map操作
// 这里使用的flatMap方法
// map: 针对流中的每一个元素,输出一个元素
// flatMap:针对流中的每一个元素,输出0个,1个或者多个元素
// 并行度设置为2
SingleOutputStreamOperator<WordWithCount> mappedStream = stream
// 输入泛型:String; 输出泛型:WordWithCount
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String v, Collector<WordWithCount> out) throws Exception {
String[] words = v.split(" ");
for (String w : words) {
// 使用collect方法向下游发送数据
out.collect(new WordWithCount(w, 1L));
}
}
}).setParallelism(2);
//分组shuffle
// 第一个泛型:流中元素的泛型
// 第二个泛型:key的泛型
KeyedStream<WordWithCount, String> keyedStream = mappedStream.keyBy(new KeySelector<WordWithCount, String>() {
@Override
public String getKey(WordWithCount v) throws Exception {
return v.word;
}
});
// reduce操作
// reduce会维护一个累加器
// 第一条数据到来,作为累加器输出
// 第二条数据到来,和累加器进行聚合操作,然后输出累加器
// 累加器和流中元素的类型是一样的
SingleOutputStreamOperator<WordWithCount> reduce = keyedStream.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount v1, WordWithCount v2) throws Exception {
return new WordWithCount(v1.word, v1.count v2.count);
}
});
//输出
reduce.print();
String name = Example3.class.getName();
try {
env.execute(name);
}catch (Exception e)
{
e.printStackTrace();
}
}
// POJO类
// 1. 必须是公有类
// 2. 所有字段必须是public
// 3. 必须有空构造器
// 模拟了case class
public static class WordWithCount {
public String word;
public Long count;
public WordWithCount() {
}
public WordWithCount(String word, Long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{"
"word='" word '\''
", count=" count
'}';
}
}
}
,今天小编就来说说关于flink深度解析?下面更多详细答案一起来看看吧!
flink深度解析
1、代码实现逻辑
package one;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @program: Flink_learn
* @description: 并行度的设置
* 针对每个算子设置的并行度的优先级高于全局并行度
* 本程序需要两个任务插槽
* @author: Mr.逗
* @create: 2021-09-14 15:40
**/
public class Example3 {
public static void main(String[] args) {
// 获取流处理的运行时环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行任务的数量为1
// 需要1个任务插槽
env.setParallelism(1);
//读取数据源
// 并行度设置为1
DataStreamSource<String> stream = env.fromElements("hello world", "hello world").setParallelism(1);
// map操作
// 这里使用的flatMap方法
// map: 针对流中的每一个元素,输出一个元素
// flatMap:针对流中的每一个元素,输出0个,1个或者多个元素
// 并行度设置为2
SingleOutputStreamOperator<WordWithCount> mappedStream = stream
// 输入泛型:String; 输出泛型:WordWithCount
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String v, Collector<WordWithCount> out) throws Exception {
String[] words = v.split(" ");
for (String w : words) {
// 使用collect方法向下游发送数据
out.collect(new WordWithCount(w, 1L));
}
}
}).setParallelism(2);
//分组shuffle
// 第一个泛型:流中元素的泛型
// 第二个泛型:key的泛型
KeyedStream<WordWithCount, String> keyedStream = mappedStream.keyBy(new KeySelector<WordWithCount, String>() {
@Override
public String getKey(WordWithCount v) throws Exception {
return v.word;
}
});
// reduce操作
// reduce会维护一个累加器
// 第一条数据到来,作为累加器输出
// 第二条数据到来,和累加器进行聚合操作,然后输出累加器
// 累加器和流中元素的类型是一样的
SingleOutputStreamOperator<WordWithCount> reduce = keyedStream.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount v1, WordWithCount v2) throws Exception {
return new WordWithCount(v1.word, v1.count v2.count);
}
});
//输出
reduce.print();
String name = Example3.class.getName();
try {
env.execute(name);
}catch (Exception e)
{
e.printStackTrace();
}
}
// POJO类
// 1. 必须是公有类
// 2. 所有字段必须是public
// 3. 必须有空构造器
// 模拟了case class
public static class WordWithCount {
public String word;
public Long count;
public WordWithCount() {
}
public WordWithCount(String word, Long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{"
"word='" word '\''
", count=" count
'}';
}
}
}
WordWithCount{word='hello', count=1}
WordWithCount{word='world', count=1}
WordWithCount{word='hello', count=2}
WordWithCount{word='world', count=2}
免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com