FlinkSQL-基于jdbc的自定义Catalog 本文将介绍如何在FlinkSQL中使用基于jdbc的自定义Catalog。
什么是Catalog? Catalog是一个用于管理数据库和表的元数据存储系统。在FlinkSQL中,Catalog用于描述Flink集群中的数据库和表。FlinkSQL支持多种类型的Catalog,如默认基于内存的GenericInMemoryCatalog,基于Hive的HiveCatalog、基于JDBC的MySqlCatalog和PostgresCatalog等。
GenericInMemoryCatalog:基于内存,所有数据库和表信息都存储在内存中,重启后即消失 HiveCatalog:数据存储在hive的元数据中,需要部署hive服务才能用 MySqlCatalog和PostgresCatalog: 基于jdbc,只能读取表结构,对应的jdbc数据库中有哪些表就只能用哪些表,无法创建。 自定义Catalog 在FlinkSQL中,除了使用默认的Catalog外,我们还可以通过实现自定义的Catalog来管理数据源和表。自定义Catalog可以更灵活地管理数据库和表,自定义持久化逻辑,满足不同场景下的需求。本文将介绍如何使用基于jdbc的自定义Catalog。
实现 首先,我们需要实现一个继承自JdbcCatalog的自定义Catalog。JdbcCatalog是Flink SQL中内置的一个基于JDBC的Catalog实现,我们可以通过继承该类,重写其中的方法来实现自定义的Catalog。
自定义Catalog需要继承JdbcCatalog类,并重写其中的方法来实现自定义的逻辑。在打开和关闭Catalog时,我们可以实现自定义的初始化和资源释放逻辑。在listTables和getTable方法中,我们需要实现自定义的表列表查询和表查询逻辑。除此之外,还可以重写其他方法来满足不同的需求。
设计表结构 基于mysql,如果要使用其他数据库相应调整ddl即可 数据库(Database):
CREATE TABLE `metadata_database` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键id', `create_person` varchar(100) DEFAULT '' COMMENT '创建人', `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_person` varchar(100) DEFAULT '' COMMENT '最后更新人', `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `dr` tinyint(2) DEFAULT '0' COMMENT '逻辑删除标记 0 未删除 1 已删除', `database_name` varchar(100) DEFAULT NULL COMMENT '数据库名', `comment` varchar(200) DEFAULT NULL COMMENT '备注信息', PRIMARY KEY (`id`) USING BTREE, UNIQUE KEY `uni_database_name` (`database_name`, `type`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 ROW_FORMAT = DYNAMIC COMMENT ='元数据_数据库 数据表(Table):
CREATE TABLE `metadata_table` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键id', `create_person` varchar(100) DEFAULT '' COMMENT '创建人', `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_person` varchar(100) DEFAULT '' COMMENT '最后更新人', `update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `dr` tinyint(2) DEFAULT '0' COMMENT '逻辑删除标记 0 未删除 1 已删除', `database_name` varchar(100) DEFAULT NULL COMMENT '数据库名', `database_id` bigint(20) DEFAULT NULL COMMENT '数据库id', `table_name` varchar(100) NOT NULL, `comment` varchar(200) DEFAULT NULL COMMENT '备注', PRIMARY KEY (`id`) USING BTREE, UNIQUE KEY `idx_table_name` (`table_name`, `database_name`, `type`), KEY `idx_database_id` (`database_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 ROW_FORMAT = DYNAMIC COMMENT ='元数据_数据表'; 字段信息(Column)
...