返回 导航

大数据

hangge.com

Flink - DataStream API使用详解3(数据分区:shuffle、rebalance、rescale、broadcast、自定义)

作者:hangge | 2025-02-21 09:04
    在 Flink 中,分区(Partition)算子是数据流处理中非常重要的一环。它用于控制数据流在分布式环境中的分布方式,通过对数据进行分区,可以优化数据处理的性能,并满足特定的业务需求。Flink 提供了多种分区策略,开发者可以根据业务需求选择合适的策略,下面分别进行介绍。
分区方法 操作方式
shuffle 随机对数据流进行分区,根据均匀分布随机划分元素。
rebalance round-robin 方式。使用循环分配分区元素的方法,为每个分区创建相等的负载。
rescale 根据上下游运算符的数量,对元素做一个均匀分配。
broadcast 将输出元素被广播到下一个操作(算子)的每个并行实例。
keyBy key 划分数据流,相同 key 的元素到一个分区上。
forward 将输出元素被转发到下一个操作的本地子任务。
global 将所有的数据都发送到下游 0 号分区中。
自定义分区 使用用户定义的分区程序(Partitioner)为每个元素选择目标任务。

一、Shuffle(随机分区)

1,基本介绍

(1)Shuffle 方法会将上游数据随机分发到下游算子实例的每个分区中,其分区示意图如下所示:

(2)shuffle 底层对应的是 ShufflePartitioner 这个类,这个类里面有一个 selectChannel 函数,这个函数会计算数据将会被发送给哪个分区,里面使用的是 random.nextInt,所以说是随机的。
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
   return random.nextInt(numberOfChannels);
}

2,使用样例

(1)下面是 Scala 代码样例代码:
import org.apache.flink.streaming.api.scala._

object PartitionShuffle {
  def main(args: Array[String]): Unit = {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 从自定义的集合中读取数据
    val stream = env.fromCollection(List(1, 2, 3, 4, 5))

    // 这里只是为了能够将并行度设置为 2
    val stream2 = stream
      .map(v => {
        (v % 2, v)
      }) // 偶数key为0,奇数key为1
      .keyBy(_._1) // 按奇偶进行分区
      .map(v => (v._1, v._2))
      .setParallelism(2)
    println(stream2.parallelism) // 查看并行度

    // 查看随机分区的结果
    stream2.shuffle
      .print("shuffle")
      .setParallelism(2)

    // 触发流程序执行
    env.execute("shuffle分区示例")
  }
}

(2)下面是 Java 代码样例代码,效果和上面是一样的。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;

public class PartitionShuffleJava {

    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从自定义的集合中读取数据
        DataStream<Integer> stream = env.fromCollection(Arrays.asList(1,2,3,4,5));

        // 这里只是为了能够将并行度设置为 2
        DataStream<Tuple2<Integer,Integer>> stream2 = stream
                .map(new MapFunction<Integer, Tuple2<Integer,Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> map(Integer input) throws Exception {
                        return new Tuple2<>(input%2, input);
                    }
                }) // 偶数key为0,奇数key为1
                .keyBy(t -> t.f0) // 按奇偶进行分区
                .map(new MapFunction<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> t)
                            throws Exception {
                        return new Tuple2<>(t.f0, t.f1);
                    }
                })
                .setParallelism(2);
        System.out.println(stream2.getParallelism());		// 查看并行度

        // 查看随机分区的结果
        stream2.shuffle()
                .print("shuffle")
                .setParallelism(2);

        // 触发流程序执行
        env.execute("shuffle分区示例");
    }
}

二、Rebalance(重平衡分区)

1,基本介绍

(1)rebalance 方法会将输出元素以轮循方式均匀地分布到下一个操作(算子)的实例中,其分区示意图如下所示:

(2)这种类型的分区有助于均匀地分布数据。它使用循环分配分区元素的方法,为每个分区创建相等的负载。这种类型的分区对于存在数据倾斜的情况下的性能优化非常有用。

(3)rebalance 底层对应的是 RebalancePartitioner 这个类。这个类里面有一个 setupselectChannel 函数,setup 函数会根据分区数初始化一个随机值 nextChannelToSendTo ,然后 selectChannel 函数会使用 nextChannelToSendTo1 和分区数取模,把计算的值再赋给 nextChannelToSendTo ,后面以此类推,其实就可以实现向下游算子实例的多个分区循环发送数据了,这样每个分区获取到的数据基本一致。
public void setup(int numberOfChannels) {
   super.setup(numberOfChannels);

   nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
}

public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
   nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
   return nextChannelToSendTo;
}

2,使用样例

(1)下面是 Scala 代码样例代码:
import org.apache.flink.streaming.api.scala._

object RebalancePartitioner {
  def main(args: Array[String]): Unit = {
    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 从自定义的集合中读取数据
    val stream = env.fromCollection(List(1, 2, 3, 4, 5, 6))

    // 直接打印数据
    stream.rebalance
      .print("rebalance").setParallelism(2)

    env.execute("rebalance分区示例")
  }
}

(2)下面是 Java 代码样例代码,效果和上面是一样的。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;

public class PartitionRebalanceJava {

    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从自定义的集合中读取数据
        DataStream<Integer> stream = env.fromCollection(Arrays.asList(1,2,3,4,5,6));

        // 直接打印数据
        stream.rebalance()
                .print("rebalance").setParallelism(2);

        // 触发流程序执行
        env.execute("rebalance分区示例");
    }
}

三、Rescale(重分区)

1,基本介绍

(1)rescale 方法会将输出元素以轮循方式均匀地分布到下一个操作(算子)的实例子集,其分区示意图如下所示:

(2)在这种分区方法中,Flink 循环将元素划分为下游操作的子集。上游操作向其发送元素的下游操作子集取决于上游和下游操作的并行度。
  • 例如,如果上游操作的并行度为 2,而下游操作的并行度为 4,那么一个上游操作将把元素分配给两个下游操作,而另一个上游操作将分配给另外两个下游操作。
  • 另一方面,如果下游操作的并行度为 2,而上游操作的并行度为 4,那么两个上游操作将分配给一个下游操作,而另外两个上游操作将分配给另一个下游操作。
  • 在上下游算子的并行度不是彼此的倍数的情况下,一个或几个下游操作与上游操作的输入数量不同。

(3)rescale 底层对应的是 RescalePartitioner 这个类。这个类里面有一个 selectChannel 函数,这里面的 numberOfChannels 是分区数量,其实也可以认为是我们所说的算子的并行度,因为一个分区是由一个线程负责处理的,它们两个是一一对应的。
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
   if (++nextChannelToSendTo >= numberOfChannels) {
      nextChannelToSendTo = 0;
   }
   return nextChannelToSendTo;
}

2,使用样例

(1)下面是 Scala 代码样例代码:
import org.apache.flink.streaming.api.scala._

object PartitionRescale {
  def main(args: Array[String]): Unit = {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 从自定义的集合中读取数据
    val stream = env.fromCollection(List(1,2,3,4,5,6,7,8))
    stream.print("before rescale")

    // 直接打印数据
    stream.rescale
      .print("rescale").setParallelism(2)

    // 触发流程序执行
    env.execute("rescale分区示例")
  }
}

(2)下面是 Java 代码样例代码,效果和上面是一样的。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;

public class PartitionRescaleJava {

    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从自定义的集合中读取数据
        DataStream<Integer> stream = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8));
        stream.print("before rescale");

        // 直接打印数据
        stream.rescale()
                .print("rescale").setParallelism(2);

        // 触发流程序执行
        env.execute("rescale分区示例");
    }
}

四、Broadcast(广播分区)

1,基本介绍

(1)broadcast 方法会将输出元素被广播到下一个操作(算子)的每个并行实例,适合用于大数据集 Join 小数据集的场景,可以提高性能。其分区示意图如下所示:

(2)broadcast 底层对应的是 BroadcastPartitioner 这个类。看这个类中的 selectChannel 函数代码的注释,提示广播分区不支持选择 Channel,因为会输出数据到下游的每个 Channel 中,就是发送到下游算子实例的每个分区中。
/**
 * Note: Broadcast mode could be handled directly for all the output channels
 * in record writer, so it is no need to select channels via this method.
 */
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
   throw new UnsupportedOperationException("Broadcast partitioner does not support select channels.");
}

2,使用样例

(1)下面是 Scala 代码样例代码:
import org.apache.flink.streaming.api.scala._

object PartitionBroadcast {
  def main(args: Array[String]): Unit = {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 从自定义的集合中读取数据
    val stream = env.fromCollection(List(1,2,3,4,5))

    // 直接打印数据
    stream.broadcast
      .print("broadcast").setParallelism(2)

    // 触发流程序执行
    env.execute("broadcast分区示例")
  }
}

(2)下面是 Java 代码样例代码,效果和上面是一样的。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;

public class PartitionBroadcastJava {

    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从自定义的集合中读取数据
        DataStream<Integer> stream = env.fromCollection(Arrays.asList(1,2,3,4,5));

        // 直接打印数据
        stream.broadcast()
                .print("broadcast").setParallelism(2);

        // 触发流程序执行
        env.execute("broadcast分区示例");
    }
}

五、Forward(转发分区)

1,基本介绍

(1)forward 方法会将输出元素被转发到下一个操作(算子)的本地子任务,其分区示意图如下所示:

(2)在上下游的算子没有指定分区器的情况下,如果上下游的算子并行度一致,则使用 ForwardPartitioner,否则使用 RebalancePartitioner。对于 ForwardPartitioner,必须保证上下游算子并行度一致,即上有算子与下游算子是 11 的关系,否则会抛出异常。
(3)forward 方法使用 ForwardPartitioner 分区程序来设置 DataStream 的分区,仅将元素转发到本地运行的下游操作(算子)。

2,使用样例

(1)下面是 Scala 代码样例代码:
import org.apache.flink.streaming.api.scala._

object PartitionForward {
  def main(args: Array[String]): Unit = {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 从自定义的集合中读取数据
    val stream = env.fromCollection(List(1,2,3,4,5))

    // 直接打印数据
    stream.map(v=>{v * v})
      .setParallelism(2)
      .forward
      .print("forward")
      .setParallelism(2)

    // 触发流程序执行
    env.execute("forward分区示例")
  }
}

(2)下面是 Java 代码样例代码,效果和上面是一样的。
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;

public class PartitionForwardJava {

    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从自定义的集合中读取数据
        DataStream<Integer> stream = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5));

        // 直接打印数据
        stream.map(new MapFunction<Integer, Integer>() {
                    @Override
                    public Integer map(Integer input) throws Exception {
                        return input * input;
                    }
                })
                .setParallelism(2)
                .forward()
                .print("forward")
                .setParallelism(2);

        // 触发流程序执行
        env.execute("forward分区示例");
    }
}

六、KeyBy(按键分区)

1,基本介绍

(1)keyBy 方法根据 key 的分组索引选择目标通道,将输出元素发送到相对应的下游分区。该方法的分区示意图如下所示:

(2)该方法使用 KeyGroupStreamPartitioner 分区程序来设置 DataStream 的分区。

2,使用样例

(1)下面是 Scala 代码样例代码:
import org.apache.flink.streaming.api.scala._

object PartitionKeyBy {

  def main(args: Array[String]): Unit = {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 从自定义的集合中读取数据
    val stream = env.fromCollection(List(1,2,3,4,5,6))

    // 先转换为(k,v)对,再执行keyBy,然后打印数据
    val stream2 = stream.map(v => {(v%3,v)})
    stream2.setParallelism(2)
      .keyBy(_._1)
      .print("key")

    // 触发流程序执行
    env.execute("keyBy分区示例")
  }
}

(2)下面是 Java 代码样例代码,效果和上面是一样的。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;

public class PartitionKeyByJava {

    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从自定义的集合中读取数据
        DataStream<Integer> stream = env.fromCollection(Arrays.asList(1,2,3,4,5,6));

        // 直接打印数据
        DataStream<Tuple2<Integer, Integer>> stream2 = stream.map(
                new MapFunction<Integer, Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<Integer, Integer> map(Integer input) throws Exception {
                        return new Tuple2<>(input % 3, input);
                    }
                }
        ).setParallelism(2);
        
        stream2.keyBy(t -> t.f0)
                .print("key");

        // 触发流程序执行
        env.execute("keyBy分区示例");
    }
}

七、Global(全局分区)

1,基本介绍

(1)global 方法将所有数据发送到下游的一个并行子任务中。该方法的分区示意图如下所示:

(2)该方法使用 GlobalPartitioner 分区程序来设置 DataStream 的分区,以便将输出值都转到下一个处理操作符(算子)的第一个实例。使用此设置时要小心,因为它可能会在应用程序中造成严重的性能瓶颈。

2,使用样例

(1)下面是 Scala 代码样例代码:
import org.apache.flink.streaming.api.scala._

object PartitionGlobal {
  def main(args: Array[String]): Unit = {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 从自定义的集合中读取数据
    val stream = env.fromCollection(List(1,2,3,4,5))

    // 直接打印数据
    stream.print()

    // 使用 GLobalPartitioner 之后打印数据
    stream.global
      .print("global")

    // 触发流程序执行
    env.execute("global分区示例")
  }
}

(2)下面是 Java 代码样例代码,效果和上面是一样的。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;

public class PartitionGlobalJava {

    public static void main(String[] args) throws Exception {
        // 设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从自定义的集合中读取数据
        DataStream<Integer> stream = env.fromCollection(Arrays.asList(1,2,3,4,5));

        // 直接打印数据
        stream.print();

        // 使用 GLobalPartitioner 之后打印数据
        stream.global()
                .print("global");

        // 触发流程序执行
        env.execute("global分区示例");
    }
}

八、Custom Partitioning(自定义分区)

1,基本介绍

(1)自定义分区策略的 APICustomPartitionerWrapper。该策略允许开发者自定义规则将上游算子元素发送到下游指定的算子实例中。
(2)使用该分区策略首先我们需要新建一个自定义分区器,然后使用这个自定义分区器进行分区。

2,新建自定义分区器

(1)这里我们创建一个按照数字的奇偶性进行分区的分区器。下面是 Scala 代码样例代码:
import org.apache.flink.api.common.functions.Partitioner

class MyPartitionerScala extends Partitioner[Int]{
  override def partition(key: Int, numPartitions: Int): Int = {
    println("分区总数:"+numPartitions)
    if(key % 2 == 0){//偶数分到0号分区
      0
    }else{//奇数分到1号分区
      1
    }
  }
}

(2)下面是 Java 代码样例代码,效果和上面是一样的。
import org.apache.flink.api.common.functions.Partitioner;

public class MyPartitionerJava implements Partitioner<Integer> {
    @Override
    public int partition(Integer key, int numPartitions) {
        System.out.println("分区总数:" + numPartitions);
        if (key % 2 == 0) { // 偶数分到0号分区
            return 0;
        } else { // 奇数分到1号分区
            return 1;
        }
    }
}

3,使用自定义分区器

(1)接下来我们使用前面创建的自定义分区器。下面是 Scala 代码样例代码:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._

object PartitionCustom {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //注意:默认情况下Fink任务中算子的并行度会读取当前机器的CPU个数
    val text = env.fromCollection(Array(1,2,3,4,5))

    text.map(num => num)
      .partitionCustom(new MyPartitionerScala, num => num)
      .print("custom")

    env.execute("PartitionCustom");
  }
}

(2)下面是 Java 代码样例代码,效果和上面是一样的。
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;

public class PartitionCustomJava {
    public static void main(String[] args) throws Exception {
        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 注意:默认情况下 Flink 任务中算子的并行度会读取当前机器的 CPU 个数
        DataStream<Integer> text = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5));

        // 自定义分区
        text.map(num -> num)
                .partitionCustom(new MyPartitionerJava(), num -> num)
                .print("custom");

        // 执行任务
        env.execute("PartitionCustom");
    }
}
评论

全部评论(0)

回到顶部