Flink - Window窗口使用详解1(基本价绍、TimeWindow、CountWindow)
作者:hangge | 2025-03-04 08:42
Flink 认为批处理是流处理的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而 Window 就是从流处理到批处理的一个桥梁。通常来讲,Window 是一种可以把无界数据切割为有界数据块的手段。


(2)非按键分区(Non-Keyed Windows)
(2)举个例子:
(2)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入 hello hangge.com:
(4)由于使用的是滚动窗口,idea 控制台可以看到输出内容如下:
(2)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
(4)由于使用的是滑动窗口,idea 控制台可以看到输出内容如下:
例如:对流中的所有元素进行计数是不可能的,因为通常流是无限的(无界的)。所以,流上的聚合需要由 Window 来划定范围,比如 “计算过去 5 分钟” 或者 “最后 100 个元素的和”。
一、基本介绍
1,窗口分类
(1)按照驱动类型划分类
- 时间类型窗口(TimeWindow):以时间点来定义窗口的开始(start time)和结束(end time),截取出来的就是某一时间段的数据。
- 计数窗口(CountWindow):基于元素的个数来截取数据,到达固定数据就触发计算并关闭窗口
(2)按照窗口分配数据的规则分类
- Tumbling Windows:滚动窗口,表示窗口内的数据没有重叠

- Sliding Windows:滑动窗口,表示窗口内的数据有重叠

- Session Window:会话窗口,基于"会话"对数据分组。只能基于时间来定义。最重要的参数就是会话的超时时间,会话窗口的长度不固定,起始时间和结束时间也不确定。
- Global Window:全局窗口,会把相同 key 的所有数据都分配到一个窗口中。这种窗口没有结束的时候,默认是不会触发计算的。需要自己定义触发器。
2,按键分区和非按键分区
(1)按键分区窗口(Keyed Windows )
- 经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。
- 在代码实现上,我们需要先对 DataStream 调用 .keyBy() 进行按键分区,然后再调用 .window() 定义窗口。
stream.keyBy(...)
.window(...)
(2)非按键分区(Non-Keyed Windows)
- 如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。
- 在代码中,直接基于 DataStream 调用 .windowAll() 定义窗口。
stream.windowAll(...)
二、TimeWindow 的使用
1,基本介绍
(1)TimeWindow 是根据时间对数据流切分窗口,TimeWindow 可以支持滚动窗口和滑动窗口。
- 其中 timeWindow(Time.seconds(10)) 方法表示滚动窗口的窗口大小为 10 秒,对每 10 秒内的数据进行聚合计算
- timeWindow(Time.seconds(10),Time.seconds(5)) 方法表示滑动窗口的窗口大小为 10 秒,滑动间隔为 5 秒。就是每隔 5 秒计算前 10 秒内的数据
- 5 秒滚动窗口会生成连续的 5 秒区间:[0, 5)、[5, 10)、[10, 15)。
- 5 秒窗口,每 2 秒滑动一次,会生成以下窗口:[0, 5)、[2, 7)、[4, 9)。
2,滚动窗口使用样例
(1)该样例我们每隔 10 秒计算一次前 10 秒时间窗口内的数据,下面是 scala 语言代码:
package com.imooc.scala.window
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object TimeWindowOpScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("192.168.121.128", 9999)
import org.apache.flink.api.scala._
//TimeWindow之滚动窗口:每隔10秒计算一次前10秒时间窗口内的数据
text.flatMap(_.split(" "))
.map((_,1))
.keyBy(_._1)
//窗口大小
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.sum(1)
.print()
env.execute("TimeWindowOpScala")
}
}
- 下面是使用 Java 语言实现同样的功能:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class TimeWindowOpJava {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从socket读取数
DataStreamSource<String> text = env.socketTextStream("192.168.121.128", 9999);
// 处理流数据
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word: words) {
out.collect(new Tuple2<String, Integer>(word,1));
}
}
})
.keyBy(value -> value.f0)
// 定义滚动窗口,窗口大小为10秒
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.sum(1)
.print();
// 执行作业
env.execute("TimeWindowOpJava");
}
}
nc -lk 9999
(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入 hello hangge.com:
(4)由于使用的是滚动窗口,idea 控制台可以看到输出内容如下:

3,滑动窗口使用样例
(1)该样例我们每隔 5 秒计算一次前 10 秒时间窗口内的数据,下面是 scala 语言代码:
(2)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入 hello hangge.com:
(4)由于使用的是滑动窗口,idea 控制台可以看到输出内容如下:
(2)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
(4)由于使用的是滑动窗口,idea 控制台可以看到输出内容如下:
package com.imooc.scala.window
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object TimeWindowOpScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("192.168.121.128", 9999)
import org.apache.flink.api.scala._
//TimeWindow之滑动窗口:每隔5秒计算一次前10秒时间窗口内的数据
text.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
//第一个参数:窗口大小,第二个参数:滑动间隔
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds( 5)))
.sum(1)
.print()
env.execute("TimeWindowOpScala")
}
}
- 下面是使用 Java 语言实现同样的功能:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class TimeWindowOpJava {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从socket读取数
DataStreamSource<String> text = env.socketTextStream("192.168.121.128", 9999);
// 处理流数据
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word: words) {
out.collect(new Tuple2<String, Integer>(word,1));
}
}
})
.keyBy(value -> value.f0)
//第一个参数:窗口大小,第二个参数:滑动间隔
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.sum(1)
.print();
// 执行作业
env.execute("TimeWindowOpJava");
}
}
nc -lk 9999
(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入 hello hangge.com:
(4)由于使用的是滑动窗口,idea 控制台可以看到输出内容如下:

三、CountWindow 的使用
1,基本介绍
CountWindow 是根据元素个数对数据流切分窗口,CountWindow 也可以支持滚动窗口和滑动窗口。
- 其中 countWindow(5)方法表示滚动窗口的窗口大小是 5 个元素,也就是当窗口中填满 5 个元素的时候,就会对窗口进行计算了
- countWindow(5,1)方法表示滑动窗口的窗口大小是 5 个元素,滑动的间隔为 1 个元素,也就是说每新增 1 个元素就会对前面 5 个元素计算一次
2,滚动窗口使用样例
(1)该样例我们每隔 5 个元素计算一次前 5 个元素,下面是 scala 语言代码:注意:由于我们在这里使用了 keyBy,会先对数据分组。如果某个分组对应的数据窗口内达到了 5 个元素,这个窗口才会被触发执行。
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object CountWindowOpScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("192.168.121.128", 9999)
import org.apache.flink.api.scala._
//CountWindow之滚动窗口:每隔5个元素计算一次前5个元素
text.flatMap(_.split(" "))
.map((_,1))
.keyBy(_._1)
//指定窗口大小
.countWindow(5)
.sum(1)
.print()
env.execute("CountWindowOpScala")
}
}
- 下面是使用 Java 语言实现同样的功能:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class CountWindowOpJava {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("192.168.121.128", 9999);
//CountWindow之滚动窗口:每隔5个元素计算一次前5个元素
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word: words) {
out.collect(new Tuple2<String, Integer>(word,1));
}
}
}).keyBy(value -> value.f0)
//窗口大小
.countWindow(5)
.sum(1)
.print();
env.execute("CountWindowOpJava");
}
}
nc -lk 9999
(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
hi hangge hi man hi hi hi hangge hangge hangge hangge hi hangge
(4)由于使用的是滑动窗口,idea 控制台可以看到输出内容如下:

3,滑动窗口使用样例
(1)该样例我们每隔 1 个元素计算一次前 5 个元素,下面是 scala 语言代码:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object CountWindowOpScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("192.168.121.128", 9999)
import org.apache.flink.api.scala._
//CountWindow之滑动窗口:每隔1个元素计算一次前5个元素
text.flatMap(_.split(" "))
.map((_, 1))
.keyBy(_._1)
//第一个参数:窗口大小,第二个参数:滑动间隔
.countWindow(5, 1)
.sum(1).print()
env.execute("CountWindowOpScala")
}
}
- 下面是使用 Java 语言实现同样的功能:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* CountWindow的使用
* 1:滚动窗口
* 2:滑动窗口
* Created by xuwei
*/
public class CountWindowOpJava {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> text = env.socketTextStream("192.168.121.128", 9999);
//CountWindow之滑动窗口:每隔1个元素计算一次前5个元素
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word: words) {
out.collect(new Tuple2<String, Integer>(word,1));
}
}
}).keyBy(value -> value.f0)
//第一个参数:窗口大小,第二个参数:滑动间隔
.countWindow(5, 1)
.sum(1)
.print();
env.execute("CountWindowOpJava");
}
}
nc -lk 9999
(3)然后运行 Flink 程序。待程序启动后,我们在该终端中输入如下内容:
hi hangge hi man hi hi hi hangge hangge hangge hangge hi hangge
(4)由于使用的是滑动窗口,idea 控制台可以看到输出内容如下:

全部评论(0)