Flink - 流处理、批处理快速入门教程(含Java、Scala样例)
作者:hangge | 2025-02-14 08:44
Flink 代码原生支持直接在 IDEA 中运行,便于本地调试。同时,Flink 支持 Java 语言和 Scala 语言,本文使用这两种语言来演示 Flink 的使用。
一、准备工作
1,创建项目
如果想要使用 Scala 语言编写程序的话,开发工具要安装 Scala 插件并进行相关配置以实现对 Scala 的支持,具体可以参考我之前写的文章:
注意:Scala 版本要与我们下面的依赖匹配,比如我们这里就要使用 2.12 版本的 Scala。否则运行 flink 程序时会报错。
2,添加依赖
我们创建一个 Maven 项目,然后在 pom.xml 文件中,添加 Flink 实时计算相关的依赖。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.16.0</version> </dependency>
二、流处理样例
1,需求说明
- 通过 socket 实时产生一些单词
- 使用 flink 实时接收数据
- 对指定时间窗口内(例如:2 秒)的数据进行 WordCount 聚合统计
- 并且把时间窗口内计算的结果打印出来。
2,样例代码
(1)下面是使用 Scala 语言的代码样例:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time object SocketWindowWordCountScala { def main(args: Array[String]): Unit = { //获取运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //连接socket获取输入数据 val text = env.socketTextStream("192.168.121.128", 9999) //处理数据 //注意:必须要添加这一行隐式转换的代码,否则下面的flatMap方法会报错 import org.apache.flink.api.scala._ val wordCount = text.flatMap(_.split(" "))//将每一行数据根据空格切分单词 .map((_,1))//每一个单词转换为tuple2的形式(单词,1) .keyBy(tup=>tup._1)//官方推荐使用keyselector选择器选择数据 .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) // 使用窗口方法 .sum(1)// 使用sum或者reduce都可以 //使用一个线程执行打印操作 wordCount.print().setParallelism(1) //执行程序 env.execute("SocketWindowWordCountScala") } }
(2)下面是 Java 版的样例代码,看起来确实比 Scala 复杂许多:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.util.Collector; public class SocketWindowWordCountJava { public static void main(String[] args) throws Exception{ //获取运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //连接socket获取输入数据 DataStreamSource<String> text = env.socketTextStream("192.168.121.128", 9999); //处理数据 SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = text .flatMap(new FlatMapFunction<String, String>() { public void flatMap(String line, Collector<String> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(word); } } }).map(new MapFunction<String, Tuple2<String, Integer>>() { public Tuple2<String, Integer> map(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() { public String getKey(Tuple2<String, Integer> tup) throws Exception { return tup.f0; } }) .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) .sum(1); //使用一个线程执行打印操作 wordCount.print().setParallelism(1); //执行程序 env.execute("SocketWindowWordCountJava"); } }
3,运行测试
(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入一些文本数据:

(3)idea 控制台可以看到 Flink 程序将会实时处理输入的文本数据并输出结果:

三、批处理样例
1,需求说明
统计 HDFS 上指定文件中单词出现的总次数
2,准备工作
由于是要读取 HDFS 上的文件,需要将 hadoop-client 的依赖添加到项目中,否则会提示不支持 hdfs 这种文件系统。
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.0</version> </dependency>
3,样例代码
(1)下面是 Scala 语言的样例代码:
import org.apache.flink.api.scala.ExecutionEnvironment object BatchWordCountScala { def main(args: Array[String]): Unit = { //获取执行环境 val env = ExecutionEnvironment.getExecutionEnvironment val inputPath = "hdfs://192.168.121.128:9000/hello.txt" val outPath = "hdfs://192.168.121.128:9000/out" //读取文件中的数据 val text = env.readTextFile(inputPath) //处理数据 import org.apache.flink.api.scala._ val wordCount = text.flatMap(_.split(" ")) .map((_, 1)) .groupBy(0) .sum(1) .setParallelism(1) //将结果数据保存到文件中 wordCount.writeAsCsv(outPath,"\n"," ") //执行程序 env.execute("BatchWordCountScala") } }
(2)下面是 Java 语言的样例代码,功能是一样的,就是代码会更复杂些。
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class BatchWordCountJava { public static void main(String[] args) throws Exception{ //获取执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); String inputPath = "hdfs://192.168.121.128:9000/hello.txt"; String outPath = "hdfs://192.168.121.128:9000/out"; //读取文件中的数据 DataSource<String> text = env.readTextFile(inputPath); //处理数据 DataSet<Tuple2<String, Integer>> wordCount = text .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(new Tuple2<String, Integer>(word, 1)); } } }).groupBy(0) .sum(1) .setParallelism(1); //将结果数据保存到文件中 wordCount.writeAsCsv(outPath,"\n"," "); //执行程序 env.execute("BatchWordCountJava"); } }
4,运行测试
(1)首先我们在 HDFS 上准备一个测试文件 hello.txt,内容如下:
hello hangge.com welcome to hangge.com hello world bye world
(2)执行 Flink 程序,成功之后到 hdfs 上查看结果,可以看到统计结果已成功生成。
hdfs dfs -cat /out

全部评论(0)