欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Flink 写入Kafka多个topic

程序员文章站 2022-07-14 12:29:56
...

Flink 写入Kafka 到多个Topic 我想到的有两种方法

第一种:分流 

 利用split() 方法,不过现在已经被官方定位弃用的方法 ,给出的代替方案是side output,其实大致一样 都要多产生一个流。side output 在学习窗口算子的超时事件时应该能更清楚的了解(我当时是在这了解到的)。

这种方案的缺点就是每多一个topic 都要多产生一个流再添加一个addSink(),需要修改逻辑。而且还要,如果有100个呢?

addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
addSink()
...

第二种:重写序列化方法

具体的说明看注释 主要代码看serialize方法

package kafka.connection;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;


public class KafkaSerializationTest<T> implements KafkaSerializationSchema<T>, KafkaContextAware<T> {
    private final FlinkKafkaPartitioner<T> partitioner;
    private final SerializationSchema<T> serializationSchema;
    private  String topic;
    private boolean writeTimestamp;
    private int[] partitions;

    /**
     *
     * @param topic
     * @param partitioner 定义写入分区规则 具体参考FlinkFixedPartitioner类
     * @param writeTimestamp
     * @param serializationSchema 定义序列化方法
     */
    public KafkaSerializationTest(String topic, FlinkKafkaPartitioner<T> partitioner, boolean writeTimestamp, SerializationSchema<T> serializationSchema) {
        this.partitioner = partitioner;
        this.serializationSchema = serializationSchema;
        this.topic = topic;
        this.writeTimestamp = writeTimestamp;
    }

    public void open(InitializationContext context) throws Exception {
        this.serializationSchema.open(context);
    }

    public ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp) {
        byte[] serialized = this.serializationSchema.serialize(element);
        Integer partition;
        if (this.partitioner != null) {
            //选择写入的分区  partitions在 setPartitions赋值
            partition = this.partitioner.partition(element, (byte[])null, serialized, this.topic, this.partitions);
        } else {
            partition = null;
        }

        Long timestampToWrite;
        if (this.writeTimestamp) {
            timestampToWrite = timestamp;
        } else {
            timestampToWrite = null;
        }
        topic =this.topic;
        /*
        判断使用哪个Topic
        根绝事件决定Topic, 提取事件中关键字,或者关键信息
        也可以直接拿关键字当作Topic
         */
        if(Integer.parseInt(element.toString().replaceAll("[^\\d]+",""))%2>0){
            topic ="odd";
        }
        return new ProducerRecord(topic, partition, timestampToWrite, (Object)null, serialized);
    }
    //获取Topic提供给setPartitions,用于获取topic的可用分区
    public String getTargetTopic(T element) {
        return topic;
    }
    //获取可用分区
    public void setPartitions(int[] partitions) {
        this.partitions = partitions;
    }

    public void setWriteTimestamp(boolean writeTimestamp) {
        this.writeTimestamp = writeTimestamp;
    }

}
/**
 * 使用
 * new FlinkKafkaProducer<String>(topic_produce, new KafkaSerializationTest3(topic_produce,new FlinkFixedPartitioner(),true,new SerializationSchemaWrapperTest()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
 */

如果有问题或者有更好的方法请留言,我们可以成为对方的小巨人。