欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink 1.11.1:table sql Kafka Connector支持Upsert写入

程序员文章站 2022-06-16 16:42:38
...

flink版本:1.11.1

目的

在使用flink table sql的情况下,使kafka connector sink支持upsert正常写入

测试的代码

val fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tblEnv = StreamTableEnvironment.create(senv,fsSettings)

val createTable =
  """
    |CREATE TABLE ck1 (
    |    id VARCHAR,
    |    create_date TIMESTAMP,
    |    write_date TIMESTAMP,
    |    code VARCHAR
    |)
    |WITH (
    |    'connector' = 'kafka',
    |    'topic' = 'action.log',
    |    'scan.startup.mode' = 'latest-offset',
    |    'properties.bootstrap.servers' = '192.168.10.16:9092',
    |    'properties.group.id' = 'testGroup',
    |    'format' = 'json'
    |)
  """.stripMargin
 

val createTable1 =
  """
    |CREATE TABLE ck2 (
    |    id VARCHAR,
    |    create_date TIMESTAMP,
    |    write_date TIMESTAMP,
    |    code VARCHAR
    |)
    |WITH (
    |    'connector' = 'kafka',
    |    'topic' = 'action.log.log',
    |    'scan.startup.mode' = 'latest-offset',
    |    'properties.bootstrap.servers' = '192.168.10.16:9092',
    |    'properties.group.id' = 'testGroup1',
    |    'format' = 'json'
    |)
  """.stripMargin
  
tblEnv.executeSql(createTable)
tblEnv.executeSql(createTable1)


val nt_order_detail =
  """
    |CREATE TABLE ck_interval(
    |    id VARCHAR primary key,
    |    creat_date TIMESTAMP,
    |    code VARCHAR
    |)
    |WITH (
    |    'connector' = 'kafka',
    |    'topic' = 'test_ck.public.ck_interval',
    |    'scan.startup.mode' = 'latest-offset',
    |    'properties.bootstrap.servers' = '192.168.10.16:9092',
    |    'properties.group.id' = 'testGroup',
    |    'format' = 'csv'
    |)
  """.stripMargin

tblEnv.executeSql(nt_order_detail)


var sqlQuery =
  """
    |insert into ck_interval
    |SELECT
    |id,create_date,code from(
    |SELECT p.id,p.create_date
    |,b.code from
    |(SELECT id,create_date,write_date,code from ck1) AS p
    	|LEFT JOIN
    |(SELECT id,create_date,write_date,code from ck2) AS b
    |  ON b.id = p.id
    |  )a
    |""".stripMargin

tblEnv.executeSql(sqlQuery)

flink本身的Kafka Connector

flink本身的Kafka Connector的在执行upsert写入时会报错,如下:

org.apache.flink.table.api.TableException: 
Table sink 'default_catalog.default_database.ck_interval' doesn't support consuming update and delete 
changes which is produced by node Join(joinType=[LeftOuterJoin], where=[(id0 = id)], select=[id, create_date, id0, code], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])

Flink 1.11.1:table sql Kafka Connector支持Upsert写入

重新定义一个支持Upsert的Kafka Connector Sink

分别copy生成:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java -------->>>>>> flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/NTKafkaDynamicUpsertSinkBase.java

flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryBase.java -------->>>>>> flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/NTKafkaDynamicUpsertTableFactoryBase.java

flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java -------->>>>>> flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/NTKafkaDynamicUpsertSink.java

flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java -------->>>>>> flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/NTKafkaDynamicUpsertTableFactory.java

修改:

NTKafkaDynamicUpsertTableFactory.java

public static final String IDENTIFIER = "nt_kafka";

这里定义新的connector-name

NTKafkaDynamicUpsertSinkBase.java

	@Override
	public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
		// UPSERT mode
		ChangelogMode.Builder builder = ChangelogMode.newBuilder();
		for (RowKind kind : requestedMode.getContainedKinds()) {
			if (kind != RowKind.UPDATE_BEFORE) {
				builder.addContainedKind(kind);
			}
		}
		return builder.build();
	}

这里定义kafka的写入为upsert模式

其它两个文件没有特定修改,只需要调整引用即可

编译打包

编译打包前,修改

flink-connectors/flink-connector-kafka/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory

添加:

org.apache.flink.streaming.connectors.kafka.table.NTKafkaDynamicUpsertTableFactory

编译打包:


> mvn package -Dmaven.test.skip=true -Dcheckstyle.skip=true

替换jar包并测试

1、替换flink lib下的jar包

flink-connector-kafka_2.11-1.11.1.jar

2、测试

val nt_order_detail =
  """
    |CREATE TABLE ck_interval(
    |    id VARCHAR primary key,
    |    creat_date TIMESTAMP,
    |    code VARCHAR
    |)
    |WITH (
    |    'connector' = 'nt_kafka',
    |    'topic' = 'test_ck.public.ck_interval',
    |    'scan.startup.mode' = 'latest-offset',
    |    'properties.bootstrap.servers' = '192.168.10.16:9092',
    |    'properties.group.id' = 'testGroup',
    |    'format' = 'csv'
    |)
  """.stripMargin

3、任务提交成功:
Flink 1.11.1:table sql Kafka Connector支持Upsert写入
4、测试数据:
Flink 1.11.1:table sql Kafka Connector支持Upsert写入

相关标签: Flink flink