FlinkSQL Table API和SQL(一)

FlinkSQL Table API和SQL(一),第1张

FlinkSQL Table API和SQL(一) 一、Table API和SQL概述

Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。无论输入是连续的(流式)还是有界的(批处理),在两个接口中指定的查询都具有相同的语义,并指定相同的结果。

Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream API。你可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先用 CEP 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 Gelly 图算法 来处理已经预处理好的数据。

以上摘自Flink 官方文档 https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/overview/

总结如下:

Flink 对批处理和流处理,提供了统一的上层 APITable API 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询Flink 的 SQL 支持基于实现了 SQL 标准的 Apache Calcite

二、程序依赖 1、Table Planner(计划器)介绍

从1.9开始,Flink 提供了两个 Table Planner 实现来执行 Table API 和 SQL 程序:Blink Planner 和 Old Planner,Old Planner 在1.9之前就已经存在了。 Planner 的作用主要是把关系型的 *** 作翻译成可执行的、经过优化的 Flink 任务。两种 Planner 所使用的优化规则以及运行时类都不一样。 它们在支持的功能上也有些差异。

注意:对于生产环境,建议使用在1.11版本之后已经变成默认的Blink Planner。

如下是官网对于Flink相关依赖的一些介绍:

flink-table-common: 公共模块,比如自定义函数、格式等需要依赖的。flink-table-api-java: Table 和 SQL API,使用 Java 语言编写的,给纯 table 程序使用(还在早期开发阶段,不建议使用)flink-table-api-scala: Table 和 SQL API,使用 Scala 语言编写的,给纯 table 程序使用(还在早期开发阶段,不建议使用)flink-table-api-java-bridge: Table 和 SQL API 结合 DataStream/DataSet API 一起使用,给 Java 语言使用。flink-table-api-scala-bridge: Table 和 SQL API 结合 DataStream/DataSet API 一起使用,给 Scala 语言使用。
*flink-table-planner: table Planner 和运行时。这是在1.9之前 Flink 的唯一的 Planner,但是从1.11版本开始我们不推荐继续使用。flink-table-planner-blink: 新的 Blink Planner,从1.11版本开始成为默认的 Planner。flink-table-runtime-blink: 新的 Blink 运行时。flink-table-uber: 把上述模块以及 Old Planner 打包到一起,可以在大部分 Table & SQL API 场景下使用。打包到一起的 jar 文件 flink-table-*.jar 默认会直接放到 Flink 发行版的 /lib 目录下。flink-table-uber-blink: 把上述模块以及 Blink Planner 打包到一起,可以在大部分 Table & SQL API 场景下使用。打包到一起的 jar 文件 flink-table-blink-*.jar 默认会放到 Flink 发行版的 /lib 目录下。 2、程序依赖选择

取决于你使用的编程语言,选择 Java 或者 Scala API 来构建你的 Table API 和 SQL 程序:


  org.apache.flink
  flink-table-api-java-bridge_2.11
  1.12.3
  provided



  org.apache.flink
  flink-table-api-scala-bridge_2.11
  1.12.3
  provided

除此之外,如果你想在 IDE 本地运行你的程序,你需要添加下面的模块,具体用哪个取决于你使用哪个 Planner:


  org.apache.flink
  flink-table-planner_2.11
  1.12.3
  provided



  org.apache.flink
  flink-table-planner-blink_2.11
  1.12.3
  provided

内部实现上,部分 table 相关的代码是用 Scala 实现的。所以,下面的依赖也需要添加到你的程序里,不管是批式还是流式的程序:


  org.apache.flink
  flink-streaming-scala_2.11
  1.12.3
  provided

3、扩展依赖

如果你想实现自定义格式来解析 Kafka 数据,或者自定义函数,下面的依赖就足够了,编译出来的 jar 文件可以直接给 SQL Client 使用:


  org.apache.flink
  flink-table-common
  1.12.3
  provided

4、两种计划器(Planner)的主要区别

Blink 将批处理作业视作流处理的一种特例。严格来说,Table 和 DataSet 之间不支持相互转换,并且批处理作业也不会转换成 DataSet 程序而是转换成 DataStream 程序,流处理作业也一样。Blink 计划器不支持 BatchTableSource,而是使用有界的 StreamTableSource 来替代。旧计划器和 Blink 计划器中 FilterableTableSource 的实现是不兼容的。旧计划器会将 Plannerexpression 下推至 FilterableTableSource,而 Blink 计划器则是将 expression 下推。基于字符串的键值配置选项仅在 Blink 计划器中使用。PlannerConfig 在两种计划器中的实现(CalciteConfig)是不同的。Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),TableEnvironment 和 StreamTableEnvironment 都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。旧计划器目前不支持 catalog 统计数据,而 Blink 支持。 三、Table API 和 SQL 程序的结构 1、基本程序结构

// 创建表的执行环境
StreamTableEnvironment tableEnv = ... 

// 创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable");

// 注册一张表,用于把计算结果输出
tableEnv.connect(...).createTemporaryTable("outputTable");

// 通过 Table API 查询算子,得到一张结果表
Table result = tableEnv.from("inputTable").select(...);

// 通过SQL查询语句,得到一张结果表
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");

// 将结果表写入输出表中
result.insertInto("outputTable");
2、创建 TableEnvironment

TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:

在内部的 catalog 中注册 Table注册外部的 catalog加载可插拔模块执行 SQL 查询注册自定义函数 (scalar、table 或 aggregation)将 DataStream 或 DataSet 转换成 Table持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

Table 总是与特定的 TableEnvironment 绑定。不能在同一条查询中使用不同 TableEnvironment 中的表,例如,对它们进行 join 或 union *** 作。

TableEnvironment 可以通过静态方法 BatchTableEnvironment.create() 或者 StreamTableEnvironment.create() 在 StreamExecutionEnvironment 或者 ExecutionEnvironment 中创建,TableConfig 是可选项。TableConfig可用于配置TableEnvironment或定制的查询优化和转换过程(参见 查询优化)。

请确保选择与你的编程语言匹配的特定的计划器BatchTableEnvironment/StreamTableEnvironment。

如果两种计划器的 jar 包都在 classpath 中(默认行为),你应该明确地设置要在当前程序中使用的计划器。如 “3、 Table API批处理和流处理” 中介绍使用。

3、 两种方式(Planner)创建 TableEnvironment

如第二节第4点所说,新版Planner-Blink 将批处理作业视作流处理的一种特例

public static void main(String[] args) {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 1.1 基于老版本planner的流处理
        EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance()
                .useOldPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings);

        // 1.2 基于老版本planner的批处理
        ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();

        BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);

        // 2.1 基于新版Blink的流处理
        EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();

        StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings);

        // 2.2 基于新版Blink的批处理
        EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inBatchMode()
                .build();

        StreamTableEnvironment blinkBatchTableEnv = StreamTableEnvironment.create(env, blinkBatchSettings);
    }
四、Table表 1、概述

TableEnvironment 维护着一个由标识符(identifier)创建的表 catalog 的映射。标识符由三个部分组成:catalog 名称、数据库名称以及对象名称。如果 catalog 或者数据库没有指明,就会使用当前默认值。Table 可以是虚拟的(视图 VIEWS)也可以是常规的(表 TABLES)。视图 VIEWS可以从已经存在的Table中创建,一般是 Table API 或者 SQL 的查询结果。表TABLES描述的是外部数据,例如文件、数据库表或者消息队列,也可以直接从DataStream转换而来 2、临时表(Temporary Table)和永久表(Permanent Table)区别

表可以是临时的,并与单个 Flink 会话(session)的生命周期相关,也可以是永久的,并且在多个 Flink 会话和群集(cluster)中可见。

永久表需要 catalog(例如 Hive metastore)以维护表的元数据。一旦永久表被创建,它将对任何连接到 catalog 的 Flink 会话可见且持续存在,直至被明确删除。

另一方面,临时表通常保存于内存中并且仅在创建它们的 Flink 会话持续期间存在。这些表对于其它会话是不可见的。它们不与任何 catalog 或者数据库绑定但可以在一个命名空间(namespace)中创建。即使它们对应的数据库被删除,临时表也不会被删除。

3、屏蔽(Shadowing)

可以使用与已存在的永久表相同的标识符去注册临时表。临时表会屏蔽永久表,并且只要临时表存在,永久表就无法访问。所有使用该标识符的查询都将作用于临时表。

这可能对实验(experimentation)有用。它允许先对一个临时表进行完全相同的查询,例如只有一个子集的数据,或者数据是不确定的。一旦验证了查询的正确性,就可以对实际的生产表进行查询。

4、创建表Table 4.1 虚拟表

在 SQL 的术语中,Table API 的对象对应于视图(虚拟表)。它封装了一个逻辑查询计划。它可以通过以下方法在 catalog 中创建:

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section

// table is the result of a simple projection query 
Table projTable = tableEnv.from("X").select(...);

// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);

注意: 从传统数据库系统的角度来看,Table 对象与 VIEW 视图非常像。也就是,定义了 Table 的查询是没有被优化的, 而且会被内嵌到另一个引用了这个注册了的 Table的查询中。如果多个查询都引用了同一个注册了的Table,那么它会被内嵌每个查询中并被执行多次, 也就是说注册了的Table的结果不会被共享(注:Blink 计划器的TableEnvironment会优化成只执行一次)。

4.2 Connector Tables

另外一个方式去创建 TABLE 是通过 connector 声明。Connector 描述了存储表数据的外部系统。存储系统例如 Apache Kafka 或者常规的文件系统都可以通过这种方式来声明。

tableEnv
  .connect(...)    	  //    定义表的数据来源,和外部系统建立连接
  .withFormat(...)    //    定义数据格式化方法
  .withSchema(...)    //    定义表结构
  .createTemporaryTable("MyTable");    //    创建临时表
4.3 创建表示例代码

如下代码是使用connector声明,连接外部文件系统来创建表。因为连接文件系统,读取csv文件,需要引入flink-csv相关以依赖:


  org.apache.flink
  flink-csv
  1.12.3

public static void main(String[] args) throws Exception {
        // 1、创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、设置并行度
        env.setParallelism(1);

        // 3、创建表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 4、表的创建:连接外部文件系统,读取数据
        // 4.1读取文件
        String path = "src/main/resources/sensor.txt";
        tableEnv.connect(new FileSystem().path(path))       //定义到文件系统的连接
                .withFormat(new Csv())                      //定义以csv格式进行数据格式化
                .withSchema(new Schema()                    //定义表结构
                        .field("id", DataTypes.STRING())
                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE()))
                .createTemporaryTable("inputTable");   //创建临时表
                
        Table inputTable = tableEnv.from("inputTable");
        
        //打印inputTable表结构
        inputTable.printSchema();

		//以追加流的形式打印inputTable
        tableEnv.toAppendStream(inputTable, Row.class).print();

        env.execute();
    }
4.4 扩展表标识符

表总是通过三元标识符注册,包括 catalog 名、数据库名和表名。

用户可以指定一个 catalog 和数据库作为 “当前catalog” 和”当前数据库”。有了这些,那么刚刚提到的三元标识符的前两个部分就可以被省略了。如果前两部分的标识符没有指定, 那么会使用当前的 catalog 和当前数据库。用户也可以通过 Table API 或 SQL 切换当前的 catalog 和当前的数据库。

标识符遵循 SQL 标准,因此使用时需要用反引号(`)进行转义。

TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");

Table table = ...;

// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database' 
tableEnv.createTemporaryView("exampleView", table);

// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database' 
tableEnv.createTemporaryView("other_database.exampleView", table);

// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database' 
tableEnv.createTemporaryView("`example.View`", table);

// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database' 
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
5、表的查询

Table API 是关于 Scala 和 Java 的集成语言式查询 API。与 SQL 相反,Table API 的查询不是由字符串指定,而是在宿主语言中逐步构建。Table API 是基于 Table 类的,该类表示一个表(流或批处理),并提供使用关系 *** 作的方法。这些方法返回一个新的 Table 对象,该对象表示对输入 Table 进行关系 *** 作的结果。 一些关系 *** 作由多个方法调用组成,例如 table.groupBy(…).select(),其中 groupBy(…) 指定 table 的分组,而 select(…) 在 table 分组上的投影。有些关系型转换 *** 作,可以由多个方法调用组成,构成链式调用结构

接4.3代码片段,以下代码示例展示了如何利用Table API进行带条件的查询和聚合查询:

 //查询id为sensor_6的报警数据
 Table resultTable = inputTable.select($("id"), $("temp"))
                .filter($("id").isEqual("sensor_6"));
                
 // 聚合统计:按id分组,对id进行count,对温度值temp求平均值
 Table aggTable = inputTable.groupBy($("id"))
                .select($("id"), $("id").count().as("count"), $("temp").avg().as("avgTemp"));
                
 //打印输出
 //resultTable使用追加流方式输出
 tableEnv.toAppendStream(resultTable, Row.class).print("resultTable");
 //aggTable利用撤回流方式输出
 tableEnv.toRetractStream(aggTable, Row.class).print("aggTable");
五、SQL

Flink SQL 是基于实现了SQL标准的 Apache Calcite 的。SQL 查询由常规字符串指定。

接4.3代码片段,以下代码展示了如何指定查询并将结果作为 Table 对象返回:

//使用SQL查询id为sensor_6的报警数据
Table sqlResultTable = tableEnv.sqlQuery("select id , temp from inputTable where id='sensor_6'");

//使用SQL进行聚合统计
Table sqlAggTable = tableEnv.sqlQuery("select id,count(id) as cnt , avg(temp) as avgTemp  from inputTable group by id");

如下的示例展示了如何指定一个查询,将查询的结果插入到已注册的表中。

        String filePath = "src/main/resources/outputTable.txt";
        //创建输出到外部文件系统的outputTable表
        String ddl =
                "create table outputTable (n" +
                        " id STRING,n" +
                        " temp DOUBLEn" +
                        ") WITH (n" +
                        " 'connector.type' = 'filesystem',n" +
                        " 'connector.path' = '" + filePath + "',n" +
                        " 'format.type' = 'csv'n" +
                        ")";
     //执行建表语句
     tableEnv.executeSql(ddl);


    // 将表inputTable的查询结果输出到outputTable
     tableEnv.executeSql(
                "INSERT INTO outputTable " +
                "SELECt id , temp FROM inputTable WHERe id='sensor_1' ");

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5709395.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-17

发表评论

登录后才能评论

评论列表(0条)

    保存