Flink - Table API快速入门教程1(安装配置,文件的读写样例)
作者:hangge | 2025-02-28 08:41
Flink 针对标准的流处理和批处理提供了两种关系型 API,Table API 和 SQL。Table API 允许用户以一种很直观的方式进行 select 、filter 和 join 操作。Flink SQL 基于 Apache Calcite 实现标准 SQL。针对批处理和流处理可以提供相同的处理语义和结果。本文首先通过样例演示 Flink Table API 的使用。
(2)由于部分 table 相关的代码是用 Scala 实现的,所以,下面这个依赖也是必须的。
(2)下面是使用 Java 语言实现同样的功能:
(2)运行程序,可以看到控制台输出内容如下:

1,安装配置
(1)如果我们想要使用 Table API 的话,需要在 pom.xml 添加如下的依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.16.0</version>
</dependency>
(2)由于部分 table 相关的代码是用 Scala 实现的,所以,下面这个依赖也是必须的。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.16.0</version>
</dependency>
2,样例代码
(1)下面我们使用 scala 语言编写一个 Table API 使用样例,从文本文件中读取表格数据,然后筛选后将结果输出至本地文件中。
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.api.scala._
object TableAPIAndSQLOpScala {
def main(args: Array[String]): Unit = {
//获取TableEnvironment
val sSettings = EnvironmentSettings.newInstance.inStreamingMode().build()
val sTableEnv = TableEnvironment.create(sSettings)
//创建输入表
/**
* connector.type:指定connector的类型
* connector.path:指定文件或者目录地址
* format.type:文件数据格式化类型,现在只支持csv格式
* 注意:SQL语句如果出现了换行,行的末尾可以添加空格或者\n都可以,最后一行不用添
*/
sTableEnv.executeSql("" +
"create table myTable(\n" +
"id int,\n" +
"name string\n" +
") with (\n" +
"'connector.type' = 'filesystem',\n" +
"'connector.path' = 'D:/temp/input.txt',\n" +
"'format.type' = 'csv'\n" +
")")
//使用Table API实现数据查询和过滤等操作
/*import org.apache.flink.table.api._
val result = sTableEnv.from("myTable")
.select($"id",$"name")
.filter($"id" > 1)*/
//使用SQL实现数据查询和过滤等操作
val result = sTableEnv.sqlQuery("select id,name from myTable where id > 1")
//输出结果到控制台
result.execute.print()
//创建输出表
sTableEnv.executeSql("" +
"create table newTable(\n" +
"id int,\n" +
"name string\n" +
") with (\n" +
"'connector.type' = 'filesystem',\n" +
"'connector.path' = 'D:/temp/output',\n" +
"'format.type' = 'csv'\n" +
")")
//输出结果到表newTable中
result.executeInsert("newTable")
}
}
- 注意:针对 SQL 建表语句的写法还有一种比较清晰的写法。
sTableEnv.executeSql(
"""
|create table myTable(
|id int,
|name string
|) with (
|'connector.type' = 'filesystem',
|'connector.path' = 'D:/temp/input.txt',
|'format.type' = 'csv'
|)
|""".stripMargin)
(2)下面是使用 Java 语言实现同样的功能:
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class TableAPIAndSQLOpJava {
public static void main(String[] args) {
//获取TableEnvironment
EnvironmentSettings sSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment sTableEnv = TableEnvironment.create(sSettings);
//创建输入表
sTableEnv.executeSql("create table myTable(\n" +
"id int,\n" +
"name string\n" +
") with (\n" +
"'connector.type' = 'filesystem',\n" +
"'connector.path' = 'D:/temp/input.txt',\n" +
"'format.type' = 'csv'\n" +
")");
//使用Table API实现数据查询和过滤等操作
/*Table result = sTableEnv.from("myTable")
.select($("id"), $("name"))
.filter($("id").isGreater(1));*/
//使用SQL实现数据查询和过滤等操作
Table result = sTableEnv.sqlQuery("select id,name from myTable where id > 1");
//输出结果到控制台
result.execute().print();
//创建输出表
sTableEnv.executeSql("" +
"create table newTable(\n" +
"id int,\n" +
"name string\n" +
") with (\n" +
"'connector.type' = 'filesystem',\n" +
"'connector.path' = 'D:/temp/output',\n" +
"'format.type' = 'csv'\n" +
")");
//输出结果到表newTable中
result.executeInsert("newTable");
}
}
3,运行测试
(1)首先我们在 D:/temp 目录下创建一个 input.txt 文件,文件内容如下:
1,hangge 2,小刘 3,老王
(2)运行程序,可以看到控制台输出内容如下:

(3)同时可以看到 D:\temp\output 目录下生成了相关的输出文件(16 个文件是因为我的 CPU 是 16 核)。当然,由于输出的内容只有两条数据,因此只有 11、16 文件里面各有一条,其它文件都是空。

(4)打开其中的 11 号文件,可以看到里面内容如下:

全部评论(0)