返回 导航

大数据

hangge.com

Flink SQL - DDL语句使用详解1(读取hdfs文件创建表、计算输出至控制台)

作者:hangge | 2025-04-16 09:03
    DDL 语句主要涉及到创建表、修改表和删除表、但是针对 Flink SQL 而言,在工作中主要涉及的操作就是创建表。下面我将通过案例来演示一下如何使用 Flink SQL 语句创建表。

一、FileSystem(Source) + Print(Sink)案例

1,准备工作

首先我们需要在项目的 pom.xml 中添加 Flink SQL 相关的依赖,以及 file-connectorflink-jsonhadoop 的依赖。
提示:因为这个案例中需要使用 Flink SQL 操作文件,需要用到 file-connectorhadoop 的依赖,并且文件中的数据格式是 json 格式的,还需要用到 flink-json 的依赖。
<!-- Flink clients -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.16.0</version>
</dependency>
<!-- Flink SQL -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<!-- filesystem-connector -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-files</artifactId>
    <version>1.16.0</version>
</dependency>
<!-- flink-json -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>1.16.0</version>
</dependency>
<!-- hadoop依赖 -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.2.0</version>
</dependency>

2,样例代码

(1)下面是 Scala 语言代码,具体逻辑为:
  • 创建表 file_source,使用 filesystem 连接器,表示数据来源是文件系统。
  • 创建表 print_sink,使用 print 连接器,表述输出数据到标准输出流(控制台)。
  • file_source 表读取数据,按 age 字段分组。统计每个 age 的记录数量,结果列为 cnt。查询结果插入到 print_sink 表,最终打印到控制台。
注意:在 FlinkSQL 中创建表的时候,需要注意,表名区分大小写。
import org.apache.flink.table.api._

object FileSystemPrintSQL {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val settings = EnvironmentSettings
      .newInstance()
      //指定执行模式,支持inBatchMode和inStreamingMode
      .inBatchMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    //创建输入表
    val inTableSql =
      """
        |CREATE TABLE file_source(
        |  name STRING,
        |  age INT
        |)WITH(
        |  'connector' = 'filesystem',
        |  'path' = 'hdfs://192.168.121.128:9000/student.json',
        |  'format' = 'json',
        |  'json.fail-on-missing-field' = 'false',
        |  'json.ignore-parse-errors' = 'true'
        |)
        |""".stripMargin
    tEnv.executeSql(inTableSql)

    //创建输出表
    val outTableSql =
      """
        |CREATE TABLE print_sink(
        |  age INT,
        |  cnt BIGINT
        |)WITH(
        |  'connector' = 'print'
        |)
        |""".stripMargin
    tEnv.executeSql(outTableSql)

    //业务逻辑
    val execSql =
      """
        |INSERT INTO print_sink
        |SELECT
        |  age,
        |  COUNT(*) AS cnt
        |FROM file_source
        |GROUP BY age
        |""".stripMargin
    tEnv.executeSql(execSql)
  }
}

(2)下面是实现同样功能的 Java 语言代码:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

public class FileSystemPrintSQLJava {
    public static void main(String[] args) {
        // 创建执行环境
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                // 指定执行模式为批处理模式
                .inBatchMode()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        // 创建输入表
        String inTableSql =
                "CREATE TABLE file_source( " +
                        "  name STRING, " +
                        "  age INT " +
                        ")WITH( " +
                        "  'connector' = 'filesystem', " +
                        "  'path' = 'hdfs://192.168.121.128:9000/student.json', " +
                        "  'format' = 'json', " +
                        "  'json.fail-on-missing-field' = 'false', " +
                        "  'json.ignore-parse-errors' = 'true' " +
                        ")";
        tEnv.executeSql(inTableSql);

        // 创建输出表
        String outTableSql =
                "CREATE TABLE print_sink( " +
                        "  age INT, " +
                        "  cnt BIGINT " +
                        ")WITH( " +
                        "  'connector' = 'print' " +
                        ")";
        tEnv.executeSql(outTableSql);

        // 业务逻辑
        String execSql =
                "INSERT INTO print_sink " +
                        "SELECT " +
                        "  age, " +
                        "  COUNT(*) AS cnt " +
                        "FROM file_source " +
                        "GROUP BY age";
        tEnv.executeSql(execSql);
    }
}

3,运行测试

(1)首先我们在 HDFS 中添加一个 student.json 文件,文件内容如下:
[
  {"name":"Hangge","age":19},
  {"name":"Jack","age":16},
  {"name":"Jessic","age":19},
  {"name":"Mick","age":20},
  {"name":"Joy","age":19},
  {"name":"Say","age":20},
  {"name":"Kahu","age":19},
  {"name":"Hart","age":16}
]
{"name":"Hangge","age":19}
{"name":"Jack","age":16}
{"name":"Jessic","age":19}
{"name":"Mick","age":20}
{"name":"Joy","age":19}
{"name":"Say","age":20}
{"name":"Kahu","age":19}
{"name":"Hart","age":16}

(2)执行程序,由于我们代码中设定的是 Batch 批处理模式,则直接返回最终的结果。可以看到控制台输出内容如下:

(3)如果是 Steaming 流处理模式,会一条一条处理,这样可以看到每次计算的动态结果。我们把代码中的执行模式修改为 inStreamingMode()
//创建执行环境
val settings = EnvironmentSettings
  .newInstance()
  //指定执行模式,支持inBatchMode和inStreamingMode
  .inStreamingMode()
  .build()
val tEnv = TableEnvironment.create(settings)

(4)再次执行程序,可以看到控制台输出内容如下。此时可以看到数据的变化情况,最终结果和批处理的结果是一样的。

附:删除表

1,什么情况下需要删除表?

(1)针对本文的样例场景其实不需要删除表,因为这个表是临时的,只在当前 Flink 任务内部有效,任务执行结果后,我们定义的表就没有了,下次想要使用的话需要重启任务,这样就会自动重新创建表。
(2)如果我们能够将表的元数据信息,也就是表的 schema 信息持久化存储起来,那么删除表就有意义了

2,如何删除表?

如果在 Flink 任务中需要去删除某个表,可以使用 drop table 语句。
tEnv.executeSql("DROP TABLE print_sink")
评论

全部评论(0)

回到顶部