Flink - 流处理、批处理快速入门教程(含Java、Scala样例)
作者:hangge | 2025-02-14 08:44
Flink 代码原生支持直接在 IDEA 中运行,便于本地调试。同时,Flink 支持 Java 语言和 Scala 语言,本文使用这两种语言来演示 Flink 的使用。
一、准备工作
1,创建项目
如果想要使用 Scala 语言编写程序的话,开发工具要安装 Scala 插件并进行相关配置以实现对 Scala 的支持,具体可以参考我之前写的文章:
注意:Scala 版本要与我们下面的依赖匹配,比如我们这里就要使用 2.12 版本的 Scala。否则运行 flink 程序时会报错。
2,添加依赖
我们创建一个 Maven 项目,然后在 pom.xml 文件中,添加 Flink 实时计算相关的依赖。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.16.0</version>
</dependency>
二、流处理样例
1,需求说明
- 通过 socket 实时产生一些单词
- 使用 flink 实时接收数据
- 对指定时间窗口内(例如:2 秒)的数据进行 WordCount 聚合统计
- 并且把时间窗口内计算的结果打印出来。
2,样例代码
(1)下面是使用 Scala 语言的代码样例:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object SocketWindowWordCountScala {
def main(args: Array[String]): Unit = {
//获取运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//连接socket获取输入数据
val text = env.socketTextStream("192.168.121.128", 9999)
//处理数据
//注意:必须要添加这一行隐式转换的代码,否则下面的flatMap方法会报错
import org.apache.flink.api.scala._
val wordCount = text.flatMap(_.split(" "))//将每一行数据根据空格切分单词
.map((_,1))//每一个单词转换为tuple2的形式(单词,1)
.keyBy(tup=>tup._1)//官方推荐使用keyselector选择器选择数据
.window(TumblingProcessingTimeWindows.of(Time.seconds(2))) // 使用窗口方法
.sum(1)// 使用sum或者reduce都可以
//使用一个线程执行打印操作
wordCount.print().setParallelism(1)
//执行程序
env.execute("SocketWindowWordCountScala")
}
}
(2)下面是 Java 版的样例代码,看起来确实比 Scala 复杂许多:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.util.Collector;
public class SocketWindowWordCountJava {
public static void main(String[] args) throws Exception{
//获取运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入数据
DataStreamSource<String> text = env.socketTextStream("192.168.121.128", 9999);
//处理数据
SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = text
.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(word);
}
}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {
public Tuple2<String, Integer> map(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
public String getKey(Tuple2<String, Integer> tup) throws Exception {
return tup.f0;
}
})
.window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
.sum(1);
//使用一个线程执行打印操作
wordCount.print().setParallelism(1);
//执行程序
env.execute("SocketWindowWordCountJava");
}
}
3,运行测试
(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入一些文本数据:

(3)idea 控制台可以看到 Flink 程序将会实时处理输入的文本数据并输出结果:

三、批处理样例
1,需求说明
统计 HDFS 上指定文件中单词出现的总次数
2,准备工作
由于是要读取 HDFS 上的文件,需要将 hadoop-client 的依赖添加到项目中,否则会提示不支持 hdfs 这种文件系统。
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.0</version>
</dependency>
3,样例代码
(1)下面是 Scala 语言的样例代码:
import org.apache.flink.api.scala.ExecutionEnvironment
object BatchWordCountScala {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
val inputPath = "hdfs://192.168.121.128:9000/hello.txt"
val outPath = "hdfs://192.168.121.128:9000/out"
//读取文件中的数据
val text = env.readTextFile(inputPath)
//处理数据
import org.apache.flink.api.scala._
val wordCount = text.flatMap(_.split(" "))
.map((_, 1))
.groupBy(0)
.sum(1)
.setParallelism(1)
//将结果数据保存到文件中
wordCount.writeAsCsv(outPath,"\n"," ")
//执行程序
env.execute("BatchWordCountScala")
}
}
(2)下面是 Java 语言的样例代码,功能是一样的,就是代码会更复杂些。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchWordCountJava {
public static void main(String[] args) throws Exception{
//获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
String inputPath = "hdfs://192.168.121.128:9000/hello.txt";
String outPath = "hdfs://192.168.121.128:9000/out";
//读取文件中的数据
DataSource<String> text = env.readTextFile(inputPath);
//处理数据
DataSet<Tuple2<String, Integer>> wordCount = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}).groupBy(0)
.sum(1)
.setParallelism(1);
//将结果数据保存到文件中
wordCount.writeAsCsv(outPath,"\n"," ");
//执行程序
env.execute("BatchWordCountJava");
}
}
4,运行测试
(1)首先我们在 HDFS 上准备一个测试文件 hello.txt,内容如下:
hello hangge.com welcome to hangge.com hello world bye world
(2)执行 Flink 程序,成功之后到 hdfs 上查看结果,可以看到统计结果已成功生成。
hdfs dfs -cat /out
全部评论(0)