Flink提供的8种分区函数
GlobalPartitioner
该分区器会将所有的数据都发送到下游的某个算子实例(subtask id = 0)
1 2 3 4 5 6 7 8 9 10 11 12 13
| public static void global() throws Exception { StreamExecutionEnvironment env = getEnv().setMaxParallelism(8); DataStreamSource<String> dataStream = env.fromElements("hhh", "ggg", "fff", "ddd", "sss", "aaa", "qqq", "www"); dataStream.flatMap(new RichFlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { collector.collect(s + "_**"); } }).setParallelism(2).global().print("global : ");
env.execute(); }
|
ShufflePartitioner
随机选择一个下游算子实例进行发送
1 2 3 4 5 6 7 8
| public static void shuffle() throws Exception { StreamExecutionEnvironment env = getEnv(); DataStreamSource<String> dataStream = env.fromElements("hhh", "ggg", "fff", "ddd", "sss", "aaa", "qqq", "www"); DataStream<String> broadcast = dataStream.shuffle(); broadcast.print("shuffle : "); env.execute(); }
|
BroadcastPartitioner
发送到下游所有的算子实例
1 2 3 4 5 6 7 8
| public static void broadcast() throws Exception { StreamExecutionEnvironment env = getEnv(); DataStreamSource<String> dataStream = env.fromElements("hhh", "ggg", "fff", "ddd", "sss", "aaa", "qqq", "www"); DataStream<String> broadcast = dataStream.broadcast(); broadcast.print("broadcast : "); env.execute(); }
|
RebalancePartitioner
通过循环的方式依次发送到下游的task
1 2 3 4 5 6 7 8 9 10 11 12
| public static void rebalance() throws Exception { StreamExecutionEnvironment env = getEnv().setParallelism(4); DataStreamSource<String> dataStream = env.fromElements("hhh", "ggg", "fff", "ddd", "sss", "aaa", "qqq", "www"); dataStream.map(new RichMapFunction<String, String>() { @Override public String map(String s) throws Exception { return s + "_**"; } }).setParallelism(1).rebalance().print("rebalance : ");
env.execute(); }
|
RescalePartitioner
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
public static void rescale() throws Exception { StreamExecutionEnvironment env = getEnv().setParallelism(4); DataStreamSource<String> dataStream = env.fromElements("hhh", "ggg", "fff", "ddd", "sss", "aaa", "qqq", "www"); dataStream.map(new RichMapFunction<String, String>() { @Override public String map(String s) throws Exception { return s + "_**"; } }).setParallelism(1).rescale().print("rescale : ");
env.execute(); }
|
ForwardPartitioner
发送到下游对应的第一个task,保证上下游算子并行度一致,即上有算子与下游算子是1:1的关系
1 2 3 4 5 6 7 8
| public static void forward() throws Exception { StreamExecutionEnvironment env = getEnv().setParallelism(1); DataStreamSource<String> dataStream = env.fromElements("hhh", "ggg", "fff", "ddd", "sss", "aaa", "qqq", "www"); DataStream<String> broadcast = dataStream.shuffle(); broadcast.print("shuffle : "); env.execute(); }
|
⚠️ 在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用ForwardPartitioner,否则使用RebalancePartitioner,对于ForwardPartitioner,必须保证上下游算子并行度一致,否则会抛出异常。
KeyByPartitioner
根据key的分组索引选择发送到相对应的下游subtask
1 2 3 4 5 6 7 8 9 10 11 12
| public static void keyBy() throws Exception { StreamExecutionEnvironment env = getEnv().setMaxParallelism(8); DataStreamSource<String> dataStream = env.fromElements("hhh", "hhh", "hhh", "hhh", "sss", "sss", "sss", "www"); dataStream.flatMap(new RichFlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { collector.collect(s + "_**"); } }).keyBy(String::toString).print("keyBy : ");
env.execute(); }
|
CustomPartitionerWrapper
通过Partitioner实例的partition方法(自定义的)将记录输出到下游。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public static void custom() throws Exception { StreamExecutionEnvironment env = getEnv().setMaxParallelism(8); DataStreamSource<String> dataStream = env.fromElements("hhhh", "hhhss", "hhh", "hhh", "sss", "sss", "sss", "www"); dataStream.flatMap(new RichFlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { collector.collect(s + "_**"); } }).partitionCustom(new CustomPartitioner(),String::toString) .print("custom :");
env.execute(); }
public static class CustomPartitioner implements Partitioner<String> { @Override public int partition(String key, int numPartitions) { return key.length() % numPartitions; } }
|
Flink的八种分区策略源码解读
Apache Flink 中文文档