返回 导航

大数据

hangge.com

Flink - 流处理、批处理快速入门教程(含Java、Scala样例)

作者:hangge | 2025-02-14 08:44
    Flink 代码原生支持直接在 IDEA 中运行,便于本地调试。同时,Flink 支持 Java 语言和 Scala 语言,本文使用这两种语言来演示 Flink 的使用。

一、准备工作

1,创建项目

    如果想要使用 Scala 语言编写程序的话,开发工具要安装 Scala 插件并进行相关配置以实现对 Scala 的支持,具体可以参考我之前写的文章:
注意Scala 版本要与我们下面的依赖匹配,比如我们这里就要使用 2.12 版本的 Scala。否则运行 flink 程序时会报错。

2,添加依赖

我们创建一个 Maven 项目,然后在 pom.xml 文件中,添加 Flink 实时计算相关的依赖。
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.16.0</version>
</dependency>

二、流处理样例

1,需求说明 

  • 通过 socket 实时产生一些单词
  • 使用 flink 实时接收数据
  • 对指定时间窗口内(例如:2 秒)的数据进行 WordCount 聚合统计
  • 并且把时间窗口内计算的结果打印出来。

2,样例代码

(1)下面是使用 Scala 语言的代码样例:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object SocketWindowWordCountScala {
  def main(args: Array[String]): Unit = {
    //获取运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //连接socket获取输入数据
    val text = env.socketTextStream("192.168.121.128", 9999)

    //处理数据
    //注意:必须要添加这一行隐式转换的代码,否则下面的flatMap方法会报错
    import org.apache.flink.api.scala._
    val wordCount = text.flatMap(_.split(" "))//将每一行数据根据空格切分单词
      .map((_,1))//每一个单词转换为tuple2的形式(单词,1)
      .keyBy(tup=>tup._1)//官方推荐使用keyselector选择器选择数据
      .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) // 使用窗口方法
      .sum(1)// 使用sum或者reduce都可以

    //使用一个线程执行打印操作
    wordCount.print().setParallelism(1)

    //执行程序
    env.execute("SocketWindowWordCountScala")
  }
}

(2)下面是 Java 版的样例代码,看起来确实比 Scala 复杂许多:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.util.Collector;

public class SocketWindowWordCountJava {
    public static void main(String[] args) throws Exception{
        //获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //连接socket获取输入数据
        DataStreamSource<String> text = env.socketTextStream("192.168.121.128", 9999);

        //处理数据
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = text
                .flatMap(new FlatMapFunction<String, String>() {
                    public void flatMap(String line, Collector<String> out) throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(word);
                        }
                    }
                }).map(new MapFunction<String, Tuple2<String, Integer>>() {
                    public Tuple2<String, Integer> map(String word) throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }
                }).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    public String getKey(Tuple2<String, Integer> tup) throws Exception {
                        return tup.f0;
                    }
                })
                .window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
                .sum(1);

        //使用一个线程执行打印操作
        wordCount.print().setParallelism(1);

        //执行程序
        env.execute("SocketWindowWordCountJava");
    }
}

3,运行测试

(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入一些文本数据:

(3)idea 控制台可以看到 Flink 程序将会实时处理输入的文本数据并输出结果:

三、批处理样例

1,需求说明

统计 HDFS 上指定文件中单词出现的总次数

2,准备工作

由于是要读取 HDFS 上的文件,需要将 hadoop-client 的依赖添加到项目中,否则会提示不支持 hdfs 这种文件系统。
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.2.0</version>
</dependency>

3,样例代码

(1)下面是 Scala 语言的样例代码:
import org.apache.flink.api.scala.ExecutionEnvironment

object BatchWordCountScala {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    val inputPath = "hdfs://192.168.121.128:9000/hello.txt"
    val outPath = "hdfs://192.168.121.128:9000/out"

    //读取文件中的数据
    val text = env.readTextFile(inputPath)

    //处理数据
    import org.apache.flink.api.scala._
    val wordCount = text.flatMap(_.split(" "))
      .map((_, 1))
      .groupBy(0)
      .sum(1)
      .setParallelism(1)

    //将结果数据保存到文件中
    wordCount.writeAsCsv(outPath,"\n"," ")
    //执行程序
    env.execute("BatchWordCountScala")
  }
}

(2)下面是 Java 语言的样例代码,功能是一样的,就是代码会更复杂些。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCountJava {
    public static void main(String[] args) throws Exception{
        //获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        String inputPath = "hdfs://192.168.121.128:9000/hello.txt";
        String outPath = "hdfs://192.168.121.128:9000/out";

        //读取文件中的数据
        DataSource<String> text = env.readTextFile(inputPath);

        //处理数据
        DataSet<Tuple2<String, Integer>> wordCount = text
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    public void flatMap(String line, Collector<Tuple2<String, Integer>> out)
                            throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(new Tuple2<String, Integer>(word, 1));
                        }
                    }
                }).groupBy(0)
                .sum(1)
                .setParallelism(1);

        //将结果数据保存到文件中
        wordCount.writeAsCsv(outPath,"\n"," ");
        //执行程序
        env.execute("BatchWordCountJava");
    }
}

4,运行测试

(1)首先我们在 HDFS 上准备一个测试文件 hello.txt,内容如下:
hello hangge.com
welcome to hangge.com
hello world
bye world

(2)执行 Flink 程序,成功之后到 hdfs 上查看结果,可以看到统计结果已成功生成。
hdfs dfs -cat /out
评论

全部评论(0)

回到顶部