返回 导航

大数据

hangge.com

Flink - Window窗口使用详解1(基本价绍、TimeWindow、CountWindow)

作者:hangge | 2025-03-04 08:42
    Flink 认为批处理是流处理的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而 Window 就是从流处理到批处理的一个桥梁。通常来讲,Window 是一种可以把无界数据切割为有界数据块的手段。
例如:对流中的所有元素进行计数是不可能的,因为通常流是无限的(无界的)。所以,流上的聚合需要由 Window 来划定范围,比如 “计算过去 5 分钟” 或者 “最后 100 个元素的和”。

一、基本介绍

1,窗口分类

(1)按照驱动类型划分类
  • 时间类型窗口TimeWindow):以时间点来定义窗口的开始(start time)和结束(end time),截取出来的就是某一时间段的数据。
  • 计数窗口CountWindow):基于元素的个数来截取数据,到达固定数据就触发计算并关闭窗口

(2)按照窗口分配数据的规则分类
  • Tumbling Windows:滚动窗口,表示窗口内的数据没有重叠

  • Sliding Windows:滑动窗口,表示窗口内的数据有重叠

  • Session Window:会话窗口,基于"会话"对数据分组。只能基于时间来定义。最重要的参数就是会话的超时时间,会话窗口的长度不固定,起始时间和结束时间也不确定。
  • Global Window:全局窗口,会把相同 key 的所有数据都分配到一个窗口中。这种窗口没有结束的时候,默认是不会触发计算的。需要自己定义触发器。

2,按键分区和非按键分区

(1)按键分区窗口(Keyed Windows
  • 经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。
  • 在代码实现上,我们需要先对 DataStream 调用 .keyBy() 进行按键分区,然后再调用 .window() 定义窗口。
stream.keyBy(...)
       .window(...)

(2)非按键分区(Non-Keyed Windows)
  • 如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1
  • 在代码中,直接基于 DataStream 调用 .windowAll() 定义窗口。
stream.windowAll(...)

二、TimeWindow 的使用

1,基本介绍

(1)TimeWindow 是根据时间对数据流切分窗口,TimeWindow 可以支持滚动窗口和滑动窗口。
  • 其中 timeWindow(Time.seconds(10)) 方法表示滚动窗口的窗口大小为 10 秒,对每 10 秒内的数据进行聚合计算
  • timeWindow(Time.seconds(10),Time.seconds(5)) 方法表示滑动窗口的窗口大小为 10 秒,滑动间隔为 5 秒。就是每隔 5 秒计算前 10 秒内的数据
(2)举个例子:
  • 5 秒滚动窗口会生成连续的 5 秒区间:[0, 5)[5, 10)、[10, 15)
  • 5 秒窗口,每 2 秒滑动一次,会生成以下窗口:[0, 5)[2, 7)[4, 9)

2,滚动窗口使用样例

(1)该样例我们每隔 10 秒计算一次前 10 秒时间窗口内的数据,下面是 scala 语言代码:
package com.imooc.scala.window

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time


object TimeWindowOpScala {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("192.168.121.128", 9999)

    import org.apache.flink.api.scala._

    //TimeWindow之滚动窗口:每隔10秒计算一次前10秒时间窗口内的数据
    text.flatMap(_.split(" "))
      .map((_,1))
      .keyBy(_._1)
      //窗口大小
      .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
      .sum(1)
      .print()

    env.execute("TimeWindowOpScala")
  }
}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class TimeWindowOpJava {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 从socket读取数
        DataStreamSource<String> text = env.socketTextStream("192.168.121.128", 9999);

        // 处理流数据
        text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
                    throws Exception {
                String[] words = line.split(" ");
                for (String word: words) {
                    out.collect(new Tuple2<String, Integer>(word,1));
                }
            }
        })
        .keyBy(value -> value.f0)
        // 定义滚动窗口,窗口大小为10秒
        .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
        .sum(1)
        .print();

        // 执行作业
        env.execute("TimeWindowOpJava");
    }
}

(2)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入 hello hangge.com
 
(4)由于使用的是滚动窗口,idea 控制台可以看到输出内容如下:

3,滑动窗口使用样例 

(1)该样例我们每隔 5 秒计算一次前 10 秒时间窗口内的数据,下面是 scala 语言代码:
package com.imooc.scala.window

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time


object TimeWindowOpScala {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("192.168.121.128", 9999)

    import org.apache.flink.api.scala._

    //TimeWindow之滑动窗口:每隔5秒计算一次前10秒时间窗口内的数据
    text.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)
      //第一个参数:窗口大小,第二个参数:滑动间隔
      .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds( 5)))
      .sum(1)
      .print()

    env.execute("TimeWindowOpScala")
  }
}
  • 下面是使用 Java 语言实现同样的功能:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class TimeWindowOpJava {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 从socket读取数
        DataStreamSource<String> text = env.socketTextStream("192.168.121.128", 9999);

        // 处理流数据
        text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
                    throws Exception {
                String[] words = line.split(" ");
                for (String word: words) {
                    out.collect(new Tuple2<String, Integer>(word,1));
                }
            }
        })
        .keyBy(value -> value.f0)
        //第一个参数:窗口大小,第二个参数:滑动间隔
        .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .sum(1)
        .print();

        // 执行作业
        env.execute("TimeWindowOpJava");
    }
}

(2)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入 hello hangge.com
 
(4)由于使用的是滑动窗口,idea 控制台可以看到输出内容如下:

三、CountWindow 的使用

1,基本介绍

CountWindow 是根据元素个数对数据流切分窗口,CountWindow 也可以支持滚动窗口和滑动窗口。
  • 其中 countWindow(5)方法表示滚动窗口的窗口大小是 5 个元素,也就是当窗口中填满 5 个元素的时候,就会对窗口进行计算了
  • countWindow(5,1)方法表示滑动窗口的窗口大小是 5 个元素,滑动的间隔为 1 个元素,也就是说每新增 1 个元素就会对前面 5 个元素计算一次

2,滚动窗口使用样例

(1)该样例我们每隔 5 个元素计算一次前 5 个元素,下面是 scala 语言代码:
注意:由于我们在这里使用了 keyBy,会先对数据分组。如果某个分组对应的数据窗口内达到了 5 个元素,这个窗口才会被触发执行。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object CountWindowOpScala {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("192.168.121.128", 9999)

    import org.apache.flink.api.scala._
    
    //CountWindow之滚动窗口:每隔5个元素计算一次前5个元素
    text.flatMap(_.split(" "))
      .map((_,1))
      .keyBy(_._1)
      //指定窗口大小
      .countWindow(5)
      .sum(1)
      .print()

    env.execute("CountWindowOpScala")
  }
}
  • 下面是使用 Java 语言实现同样的功能:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class CountWindowOpJava {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("192.168.121.128", 9999);

        //CountWindow之滚动窗口:每隔5个元素计算一次前5个元素
        text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
                    public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
                            throws Exception {
                        String[] words = line.split(" ");
                        for (String word: words) {
                            out.collect(new Tuple2<String, Integer>(word,1));
                        }
                    }
                }).keyBy(value -> value.f0)
                //窗口大小
                .countWindow(5)
                .sum(1)
                .print();

        env.execute("CountWindowOpJava");
    }
}

(2)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
hi hangge
hi man
hi hi hi 
hangge hangge hangge hangge
hi
hangge

(4)由于使用的是滑动窗口,idea 控制台可以看到输出内容如下:

3,滑动窗口使用样例 

(1)该样例我们每隔 1 个元素计算一次前 5 个元素,下面是 scala 语言代码:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object CountWindowOpScala {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("192.168.121.128", 9999)

    import org.apache.flink.api.scala._

    //CountWindow之滑动窗口:每隔1个元素计算一次前5个元素
    text.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(_._1)
      //第一个参数:窗口大小,第二个参数:滑动间隔
      .countWindow(5, 1)
      .sum(1).print()

    env.execute("CountWindowOpScala")
  }
}
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * CountWindow的使用
 * 1:滚动窗口
 * 2:滑动窗口
 * Created by xuwei
 */
public class CountWindowOpJava {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> text = env.socketTextStream("192.168.121.128", 9999);

        //CountWindow之滑动窗口:每隔1个元素计算一次前5个元素
        text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
                    public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
                            throws Exception {
                        String[] words = line.split(" ");
                        for (String word: words) {
                            out.collect(new Tuple2<String, Integer>(word,1));
                        }
                    }
                }).keyBy(value -> value.f0)
                //第一个参数:窗口大小,第二个参数:滑动间隔
                .countWindow(5, 1)
                .sum(1)
                .print();

        env.execute("CountWindowOpJava");
    }
}

(2)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
hi hangge
hi man
hi hi hi 
hangge hangge hangge hangge
hi
hangge

(4)由于使用的是滑动窗口,idea 控制台可以看到输出内容如下:
评论

全部评论(0)

回到顶部