欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink集成Hive之Hive Catalog与Hive Dialect--以Flink1.12为例

程序员文章站 2022-07-14 12:30:32
...

什么是Hive Catalog
我们知道,Hive使用Hive Metastore(HMS)存储元数据信息,使用关系型数据库来持久化存储这些信息。所以,Flink集成Hive需要打通Hive的metastore,去管理Flink的元数据,这就是Hive Catalog的功能。

Hive Catalog的主要作用是使用Hive MetaStore去管理Flink的元数据。Hive Catalog可以将元数据进行持久化,这样后续的操作就可以反复使用这些表的元数据,而不用每次使用时都要重新注册。如果不去持久化catalog,那么在每个session中取处理数据,都要去重复地创建元数据对象,这样是非常耗时的。

如何使用Hive Catalog
HiveCatalog是开箱即用的,所以,一旦配置好Flink与Hive集成,就可以使用HiveCatalog。比如,我们通过FlinkSQL 的DDL语句创建一张kafka的数据源表,立刻就能查看该表的元数据信息。

HiveCatalog可以处理两种类型的表:一种是Hive兼容的表,另一种是普通表(generic table)。其中Hive兼容表是以兼容Hive的方式来存储的,所以,对于Hive兼容表而言,我们既可以使用Flink去操作该表,又可以使用Hive去操作该表。

普通表是对Flink而言的,当使用HiveCatalog创建一张普通表,仅仅是使用Hive MetaStore将其元数据进行了持久化,所以可以通过Hive查看这些表的元数据信息(通过DESCRIBE FORMATTED命令),但是不能通过Hive去处理这些表,因为语法不兼容。

对于是否是普通表,Flink使用is_generic属性进行标识。默认情况下,创建的表是普通表,即is_generic=true,如果要创建Hive兼容表,需要在建表属性中指定is_generic=false。

尖叫提示:

由于依赖Hive Metastore,所以必须开启Hive MetaStore服务

代码中使用Hive Catalog
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

    String name            = "myhive";
    String defaultDatabase = "default";
    String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";

    HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
    tableEnv.registerCatalog("myhive", hive);
    // 使用注册的catalog
    tableEnv.useCatalog("myhive");

Flink SQLCli中使用Hive Catalog
在FlinkSQL Cli中使用Hive Catalog很简单,只需要配置一下sql-cli-defaults.yaml文件即可。配置内容如下:

catalogs:

  • name: myhive
    type: hive
    default-database: default
    hive-conf-dir: /opt/modules/apache-hive-2.3.4-bin/conf
    图片

在FlinkSQL Cli中创建一张kafka表,该表默认为普通表,即is_generic=true

CREATE TABLE user_behavior (
user_id BIGINT, – 用户id
item_id BIGINT, – 商品id
cat_id BIGINT, – 品类id
action STRING, – 用户行为
province INT, – 用户所在的省份
ts BIGINT, – 用户行为发生的时间戳
proctime AS PROCTIME(), – 通过计算列产生一个处理时间列
eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts, ‘yyyy-MM-dd HH:mm:ss’)), – 事件时间
WATERMARK FOR eventTime AS eventTime - INTERVAL ‘5’ SECOND – 定义watermark
) WITH (
‘connector’ = ‘kafka’, – 使用 kafka connector
‘topic’ = ‘user_behavior’, – kafka主题
‘scan.startup.mode’ = ‘earliest-offset’, – 偏移量
‘properties.group.id’ = ‘group1’, – 消费者组
‘properties.bootstrap.servers’ = ‘kms-2:9092,kms-3:9092,kms-4:9092’,
‘format’ = ‘json’, – 数据源格式为json
‘json.fail-on-missing-field’ = ‘true’,
‘json.ignore-parse-errors’ = ‘false’
);
我们可以在Hive客户端中查看该表的元数据信息

hive (default)> desc formatted user_behavior;
Table Parameters:

is_generic true

从上面的元数据信息可以看出,is_generic=true,说明该表是一张普通表,如果在Hive中去查看该表,则会报错。

上面创建的表是普通表,该表不能使用Hive去查询。那么,该如何创建一张Hive兼容表呢?我们只需要在建表的属性中显示指定is_generic=false即可,具体如下:

CREATE TABLE hive_compatible_tbl (
user_id BIGINT, – 用户id
item_id BIGINT, – 商品id
cat_id BIGINT, – 品类id
action STRING, – 用户行为
province INT, – 用户所在的省份
ts BIGINT – 用户行为发生的时间戳
) WITH (
‘connector’ = ‘kafka’, – 使用 kafka connector
‘topic’ = ‘user_behavior’, – kafka主题
‘scan.startup.mode’ = ‘earliest-offset’, – 偏移量
‘properties.group.id’ = ‘group1’, – 消费者组
‘properties.bootstrap.servers’ = ‘kms-2:9092,kms-3:9092,kms-4:9092’,
‘format’ = ‘json’, – 数据源格式为json
‘json.fail-on-missing-field’ = ‘true’,
‘json.ignore-parse-errors’ = ‘false’,
‘is_generic’ = ‘false’
);
当我们在Hive中查看该表的元数据信息时,可以看出:is_generic =false

hive (default)> desc formatted hive_compatible_tbl

相关标签: 笔记