【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表

发布时间:2023-12-27 17:12:10

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文介绍了通过Table API和SQl创建表的基本用法,并以具体的示例展示其使用。同时在使用Table API和SQL 创建表之前给出了通过Table API操作的基本程序结构示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文需要有kafka的运行环境。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

一、maven依赖

本文或本专题依赖的maven,为了节省篇幅不再每个示例再单独列出,可能存在多余的maven依赖。

<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.17.0</flink.version>
	</properties>

	<dependencies>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-common</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-gateway</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner_2.12</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-uber</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-runtime</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc</artifactId>
			<version>3.1.0-1.17</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.38</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-hive_2.12</artifactId>
			<version>1.17.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hive</groupId>
			<artifactId>hive-exec</artifactId>
			<version>3.1.2</version>
		</dependency>
		<!-- flink连接器 -->
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-connector-kafka</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-compress</artifactId>
			<version>1.24.0</version>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.2</version>
		</dependency>
	</dependencies>

二、示例:基本的程序结构

本示例着重展示基本的程序结构,不涉及复杂的source、transformation和sink。
为了节省篇幅,本示例包含本专题所需要的所有import,下面的示例不再引入。


import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.and;
import static org.apache.flink.table.api.Expressions.lit;
import static org.apache.flink.table.expressions.ApiExpressionUtils.unresolvedCall;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.types.Row;

import com.google.common.collect.Lists;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;


/**
 * @author alanchan
 *
 */
public class TestTableAPIDemo {
	final static List<User> userList = Arrays.asList(
			new User(1L, "alan", 18, 1698742358391L), 
			new User(2L, "alan", 19, 1698742359396L), 
			new User(3L, "alan", 20, 1698742360407L),
			new User(4L, "alanchan", 28, 1698742361409L), 
			new User(5L, "alanchan", 29, 1698742362424L)
			);

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// 0、运行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		// 2、数据源
		DataStream<User> users = env.fromCollection(userList);

		// 3、DataStream 转 Table
		Table sourceTable = tenv.fromDataStream(users);

		// 4、查询
		Table resultQuery = sourceTable.groupBy($("name")).select($("name"), $("age").sum().as("age_sum"));

		// 5、Table 转 DataStream
		DataStream<Tuple2<Boolean, Row>> resultDS = tenv.toRetractStream(resultQuery, Row.class);

		// 6、sink
		resultDS.print();

		// 7、执行
		env.execute();
	}
	
	static void testTableEnvironment() throws Exception {
		// TableEnvironment 的主要职能包括:
		// 注册 Catlog
		// 在内部 Catlog 中注册表
		// 加载可插拔模块
		// 执行 SQL 查询
		// 注册用户自定义函数
		// DataStream 和 Table 之间的转换(在 StreamTableEnvironment 的情况下)
		// 提供更详细的配置选项

		// 设置方式一
		// inStreamingMode/inBatchMode,批处理或流式,默认流式
		// useBlinkPlanner/useOldPlanner/useAnyPlanner,执行计划,默认blink(和flink版本有关),anyplanner不需要显式设置
		EnvironmentSettings env = EnvironmentSettings.newInstance().inStreamingMode().build();
		// EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
		// EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
		// EnvironmentSettings.newInstance().useAnyPlanner().inStreamingMode().build();
		TableEnvironment tenv = TableEnvironment.create(env);

		// 设置方式二,基于 StreamExecutionEnvironment 创建 StreamTableEnvironment 来与 DataStream
		// API 进行相互转换
		// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		// StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		// 表 Table 有两种类型的表,一种是连接器表(Connector Tables) Table,一种是虚拟表(Virtual Tables)
		// VIEW。连接器表一般用来描述外部数据,例如文件、数据库表或者消息队列。虚拟表通常是 Table API 或 SQL 查询的结果,可以基于现有的连接器表
		// Table 对象来创建。

		// 1、连接器 Connector 表
		// 创建 Table 最直观的方式,就是通过连接器(Connector)连接到一个外部系统,然后定义出对应的表结构。
		// 例如我们可以连接到 Kafka 或者文件系统,将存储在这些外部系统的数据以表 Table 的形式定义出来,
		// 这样对表 Table的读写就可以通过连接器转换成对外部系统的读写。连接器表可以直接通过 SQL DDL 方式创建:

		// 2、虚拟表
		// 。。。。。。
	}

	@Data
	@NoArgsConstructor
	@AllArgsConstructor
	public static class User {
		private long id;
		private String name;
		private int age;
		private Long rowtime;
	}
	
}

三、示例:通过Table API和SQL创建表

本示例是通过Table API 和 SQL 两种方式创建一张kafka表,其中表结构涉及使用kafka的元数据,即kafka的事件时间、分区编号和kafka数据offset。
同时,提供验证方法,即输入数据和预期输出数据。

/**
 * @author alanchan
 *
 */
public class TestTableAPIDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// testCreateTableByAPI();
		testCreateTableBySQL();
	}
	
	static void testCreateTableByAPI() throws Exception {
//		EnvironmentSettings env = EnvironmentSettings.newInstance().inStreamingMode().build();
//		TableEnvironment tenv = TableEnvironment.create(env);
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
       
        //API创建表
        Schema schema = Schema.newBuilder()
                .columnByMetadata("event_time", DataTypes.TIME(3), "timestamp")
                .columnByMetadata("partition", DataTypes.BIGINT(), true)
                .columnByMetadata("offset", DataTypes.BIGINT(), true)
                .column("user_id", DataTypes.BIGINT())
                .column("item_id", DataTypes.BIGINT())
                .column("behavior", DataTypes.STRING())
                .build();
        
        TableDescriptor kafkaDescriptor = TableDescriptor.forConnector("kafka")
                .comment("kafka source table")
                .schema(schema)
                .option(KafkaConnectorOptions.TOPIC, Lists.newArrayList("user_behavior"))
                .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, "192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092")
                .option(KafkaConnectorOptions.PROPS_GROUP_ID, "testGroup")
                .option("scan.startup.mode", "earliest-offset")
                .format("csv")
                .build();
        
        tenv.createTemporaryTable("Alan_KafkaTable", kafkaDescriptor);
        
        //查询
        String sql = "select * from Alan_KafkaTable ";
        Table resultQuery = tenv.sqlQuery(sql);

        DataStream<Tuple2<Boolean, Row>> resultDS =  tenv.toRetractStream(resultQuery, Row.class);
		
        // 6、sink
		resultDS.print();

		// 7、执行
		env.execute();
		//kafka中输入测试数据
//		1,1001,login
//		1,2001,p_read
		
		//程序运行控制台输入如下
//		11> (true,+I[16:32:19.923, 0, 0, 1, 1001, login])
//		11> (true,+I[16:32:32.258, 0, 1, 1, 2001, p_read])
	}

	static void testCreateTableBySQL() throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
        // SQL 创建输入表
       String sourceSql = "CREATE TABLE Alan_KafkaTable (\r\n" + 
       		"  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',\r\n" + 
       		"  `partition` BIGINT METADATA VIRTUAL,\r\n" + 
       		"  `offset` BIGINT METADATA VIRTUAL,\r\n" + 
       		"  `user_id` BIGINT,\r\n" + 
       		"  `item_id` BIGINT,\r\n" + 
       		"  `behavior` STRING\r\n" + 
       		") WITH (\r\n" + 
       		"  'connector' = 'kafka',\r\n" + 
       		"  'topic' = 'user_behavior',\r\n" + 
       		"  'properties.bootstrap.servers' = '192.168.10.41:9092,192.168.10.42:9092,192.168.10.43:9092',\r\n" + 
       		"  'properties.group.id' = 'testGroup',\r\n" + 
       		"  'scan.startup.mode' = 'earliest-offset',\r\n" + 
       		"  'format' = 'csv'\r\n" + 
       		");";
       tenv.executeSql(sourceSql);

		//查询
		String sql = "select * from Alan_KafkaTable ";
		Table resultQuery = tenv.sqlQuery(sql);

		DataStream<Tuple2<Boolean, Row>> resultDS =  tenv.toRetractStream(resultQuery, Row.class);
		
		// 6、sink
		resultDS.print();

		// 7、执行
		env.execute();
		//kafka中输入测试数据
	//		1,1001,login
	//		1,2001,p_read
		
		//程序运行控制台输入如下
	//		11> (true,+I[16:32:19.923, 0, 0, 1, 1001, login])
	//		11> (true,+I[16:32:32.258, 0, 1, 1, 2001, p_read])
	}
	
}

以上,本文介绍了通过Table API和SQl创建表的基本用法,并以具体的示例展示其使用。同时在使用Table API和SQL 创建表之前给出了通过Table API操作的基本程序结构示例。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

文章来源:https://blog.csdn.net/chenwewi520feng/article/details/135059784
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。