欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

关于高并发下kafka producer send异步发送耗时问题的分析

程序员文章站 2023-01-16 21:11:51
最近开发网关服务的过程当中,需要用到kafka转发消息与保存日志,在进行压测的过程中由于是多线程并发操作kafka producer 进行异步send,发现send耗时有时会达到几十毫秒的阻塞,很大程度上上影响了并发的性能,而在后续的测试中发现单线程发送反而比多线程发送效率高出几倍。所以就对kafk ......

最近开发网关服务的过程当中,需要用到kafka转发消息与保存日志,在进行压测的过程中由于是多线程并发操作kafka producer 进行异步send,发现send耗时有时会达到几十毫秒的阻塞,很大程度上上影响了并发的性能,而在后续的测试中发现单线程发送反而比多线程发送效率高出几倍。所以就对kafka api send 的源码进行了一下跟踪和分析,在此总结记录一下。

首先看springboot下 kafka producer 的使用

在config中进行配置,向ioc容器中注入defaultkafkaproducerfactory生产者工厂的实例

    @bean
    public producerfactory<object, object> producerfactory() {
        return new defaultkafkaproducerfactory<>(producerconfigs());
    }

创建producer

this.producer = producerfactory.createproducer();

大家都知道springboot下ioc容器管理的实例默认都是单例模式;而defaultkafkaproducerfactory本身也是一个单例工厂

    @override
    public producer<k, v> createproducer() {
        if (this.transactionidprefix != null) {
            return createtransactionalproducer();
        }
        if (this.producer == null) {
            synchronized (this) {
                if (this.producer == null) {
                    this.producer = new closesafeproducer<k, v>(createkafkaproducer());
                }
            }
        }
        return this.producer;
    }

我们创建的producer也是个单例。

接下来就是具体的发送,用过kafka的小伙伴都知道producer.send是个异步操作,会返回一个future<recordmetadata> 类型的结果。那么为什么单线程和多线程send效率会较大的差距呢,我们进入kafkaproducer内部看下producer.send的具体源码实现来找下答案

private future<recordmetadata> dosend(producerrecord<k, v> record, callback callback) {
        topicpartition tp = null;
        try {
            //保证主题的元数据可用
            clusterandwaittime clusterandwaittime = waitonmetadata(record.topic(), record.partition(), maxblocktimems);
            long remainingwaitms = math.max(0, maxblocktimems - clusterandwaittime.waitedonmetadatams);
            cluster cluster = clusterandwaittime.cluster;
            byte[] serializedkey;
            try {
                //序列化key
                serializedkey = keyserializer.serialize(record.topic(), record.headers(), record.key());
            } catch (classcastexception cce) {
                throw new serializationexception("can't convert key of class " + record.key().getclass().getname() +
                        " to class " + producerconfig.getclass(producerconfig.key_serializer_class_config).getname() +
                        " specified in key.serializer", cce);
            }
            byte[] serializedvalue;
            try {
                //序列化value
                serializedvalue = valueserializer.serialize(record.topic(), record.headers(), record.value());
            } catch (classcastexception cce) {
                throw new serializationexception("can't convert value of class " + record.value().getclass().getname() +
                        " to class " + producerconfig.getclass(producerconfig.value_serializer_class_config).getname() +
                        " specified in value.serializer", cce);
            }
            //计算出具体的partition 
            int partition = partition(record, serializedkey, serializedvalue, cluster);
            tp = new topicpartition(record.topic(), partition);

            setreadonly(record.headers());
            header[] headers = record.headers().toarray();

            int serializedsize = abstractrecords.estimatesizeinbytesupperbound(apiversions.maxusableproducemagic(),
                    compressiontype, serializedkey, serializedvalue, headers);
            ensurevalidrecordsize(serializedsize);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback
            callback interceptcallback = new interceptorcallback<>(callback, this.interceptors, tp);

            if (transactionmanager != null && transactionmanager.istransactional())
                transactionmanager.maybeaddpartitiontotransaction(tp);
            //向队列容器中添加数据
            recordaccumulator.recordappendresult result = accumulator.append(tp, timestamp, serializedkey,
                    serializedvalue, headers, interceptcallback, remainingwaitms);
            if (result.batchisfull || result.newbatchcreated) {
                log.trace("waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
            // handling exceptions and record the errors;
            // for api exceptions return them in the future,
            // for other exceptions throw directly
        } catch (apiexception e) {
            log.debug("exception occurred during message send:", e);
            if (callback != null)
                callback.oncompletion(null, e);
            this.errors.record();
            this.interceptors.onsenderror(record, tp, e);
            return new futurefailure(e);
        } catch (interruptedexception e) {
            this.errors.record();
            this.interceptors.onsenderror(record, tp, e);
            throw new interruptexception(e);
        } catch (bufferexhaustedexception e) {
            this.errors.record();
            this.metrics.sensor("buffer-exhausted-records").record();
            this.interceptors.onsenderror(record, tp, e);
            throw e;
        } catch (kafkaexception e) {
            this.errors.record();
            this.interceptors.onsenderror(record, tp, e);
            throw e;
        } catch (exception e) {
            // we notify interceptor about all exceptions, since onsend is called before anything else in this method
            this.interceptors.onsenderror(record, tp, e);
            throw e;
        }
    }

这里除了前面做的一些序列化操作和判断,最关键的就是向队列容器中执行添加数据操作

recordaccumulator.recordappendresult result = accumulator.append(tp, timestamp, serializedkey,
                    serializedvalue, headers, interceptcallback, remainingwaitms);

accumulator是recordaccumulator这个类的一个实例,recordaccumulator类是一个队列容器类;它的内部维护了一个concurrentmap,每一个topicpartition都对应一个专属的消息队列。

private final concurrentmap<topicpartition, deque<producerbatch>> batches;

我们进入accumulator.append内部看下具体的实现

public recordappendresult append(topicpartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     header[] headers,
                                     callback callback,
                                     long maxtimetoblock) throws interruptedexception {
        // we keep track of the number of appending thread to make sure we do not miss batches in
        // abortincompletebatches().
        appendsinprogress.incrementandget();
        bytebuffer buffer = null;
        if (headers == null) headers = record.empty_headers;
        try {
            //根据topicpartition拿到对应的批处理队列 
            deque<producerbatch> dq = getorcreatedeque(tp);
            //同步队列,保证线程安全
            synchronized (dq) {
                if (closed)
                    throw new illegalstateexception("cannot send after the producer is closed.");
                //把序列化后的数据放入队列,并返回结果
                recordappendresult appendresult = tryappend(timestamp, key, value, headers, callback, dq);
                if (appendresult != null)
                    return appendresult;
            }

            // we don't have an in-progress record batch try to allocate a new batch
            byte maxusablemagic = apiversions.maxusableproducemagic();
            int size = math.max(this.batchsize, abstractrecords.estimatesizeinbytesupperbound(maxusablemagic, compression, key, value, headers));
            log.trace("allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            buffer = free.allocate(size, maxtimetoblock);
            synchronized (dq) {
                // need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new illegalstateexception("cannot send after the producer is closed.");

                recordappendresult appendresult = tryappend(timestamp, key, value, headers, callback, dq);
                if (appendresult != null) {
                    // somebody else found us a batch, return the one we waited for! hopefully this doesn't happen often...
                    return appendresult;
                }

                memoryrecordsbuilder recordsbuilder = recordsbuilder(buffer, maxusablemagic);
                producerbatch batch = new producerbatch(tp, recordsbuilder, time.milliseconds());
                futurerecordmetadata future = utils.notnull(batch.tryappend(timestamp, key, value, headers, callback, time.milliseconds()));

                dq.addlast(batch);
                incomplete.add(batch);

                // don't deallocate this buffer in the finally block as it's being used in the record batch
                buffer = null;

                return new recordappendresult(future, dq.size() > 1 || batch.isfull(), true);
            }
        } finally {
            if (buffer != null)
                free.deallocate(buffer);
            appendsinprogress.decrementandget();
        }
    }
在getorcreatedeque中我们根据topicpartition从concurrentmap获取对应队列,没有的话就初始化一个。
    private deque<producerbatch> getorcreatedeque(topicpartition tp) {
        deque<producerbatch> d = this.batches.get(tp);
        if (d != null)
            return d;
        d = new arraydeque<>();
        deque<producerbatch> previous = this.batches.putifabsent(tp, d);
        if (previous == null)
            return d;
        else
            return previous;
    }

更关键的是为了保证并发时的线程安全,执行 recordappendresult appendresult = tryappend(timestamp, key, value, headers, callback, dq)时,deque<producerbatch>必然需要同步处理。 

synchronized (dq) {
                if (closed)
                    throw new illegalstateexception("cannot send after the producer is closed.");
                recordappendresult appendresult = tryappend(timestamp, key, value, headers, callback, dq);
                if (appendresult != null)
                    return appendresult;
            }

在这里我们可以看出,多线程高并发情况下,dq会存在比较大的资源竞争,虽然是基于内存的操作,每个线程持有锁的时间极短,但相比单线程情况,高并发情况下线程开辟较多,锁竞争和cpu上下文切换都比较频繁,会造成一定的性能损耗,产生阻塞耗时。

分析到这里你就会发现,其实kafkaproducer这个异步发送是建立在生产者和消费者模式上的,send的真正操作并不是直接异步发送,而是把数据放在一个中间队列中。那么既然有生产者在往内存队列中放入数据,那么必然会有一个专有的线程负责把这些数据真正发送出去。我们通过监控jvm线程信息可以看到,kafkaproducer创建后确实会启动一个守护线程用于消息的发送。

关于高并发下kafka producer send异步发送耗时问题的分析

 

 

 

 

 

 

 

 

 

ok,我们再回到 kafkaproducer中,会看到里面有这样两个对象,sender就是kafka发送数据的后台线程

    private final sender sender;
    private final thread iothread;

在kafkaproducer的构造函数中会启动sender线程

            this.sender = new sender(logcontext,
                    client,
                    this.metadata,
                    this.accumulator,
                    maxinflightrequests == 1,
                    config.getint(producerconfig.max_request_size_config),
                    acks,
                    retries,
                    metricsregistry.sendermetrics,
                    time.system,
                    this.requesttimeoutms,
                    config.getlong(producerconfig.retry_backoff_ms_config),
                    this.transactionmanager,
                    apiversions);
            string iothreadname = network_thread_prefix + " | " + clientid;
            this.iothread = new kafkathread(iothreadname, this.sender, true);
            this.iothread.start();

进入sender内部可以看到这个线程的作用就是一直轮询发送数据。

    public void run() {
        log.debug("starting kafka producer i/o thread.");

        // main loop, runs until close is called
        while (running) {
            try {
                run(time.milliseconds());
            } catch (exception e) {
                log.error("uncaught error in kafka producer i/o thread: ", e);
            }
        }

        log.debug("beginning shutdown of kafka producer i/o thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.
        while (!forceclose && (this.accumulator.hasundrained() || this.client.inflightrequestcount() > 0)) {
            try {
                run(time.milliseconds());
            } catch (exception e) {
                log.error("uncaught error in kafka producer i/o thread: ", e);
            }
        }
        if (forceclose) {
            // we need to fail all the incomplete batches and wake up the threads waiting on
            // the futures.
            log.debug("aborting incomplete batches due to forced shutdown");
            this.accumulator.abortincompletebatches();
        }
        try {
            this.client.close();
        } catch (exception e) {
            log.error("failed to close network client", e);
        }

        log.debug("shutdown of kafka producer i/o thread has completed.");
    }

    /**
     * run a single iteration of sending
     *
     * @param now the current posix time in milliseconds
     */
    void run(long now) {
        if (transactionmanager != null) {
            try {
                if (transactionmanager.shouldresetproducerstateafterresolvingsequences())
                    // check if the previous run expired batches which requires a reset of the producer state.
                    transactionmanager.resetproducerid();

                if (!transactionmanager.istransactional()) {
                    // this is an idempotent producer, so make sure we have a producer id
                    maybewaitforproducerid();
                } else if (transactionmanager.hasunresolvedsequences() && !transactionmanager.hasfatalerror()) {
                    transactionmanager.transitiontofatalerror(new kafkaexception("the client hasn't received acknowledgment for " +
                            "some previously sent messages and can no longer retry them. it isn't safe to continue."));
                } else if (transactionmanager.hasinflighttransactionalrequest() || maybesendtransactionalrequest(now)) {
                    // as long as there are outstanding transactional requests, we simply wait for them to return
                    client.poll(retrybackoffms, now);
                    return;
                }

                // do not continue sending if the transaction manager is in a failed state or if there
                // is no producer id (for the idempotent case).
                if (transactionmanager.hasfatalerror() || !transactionmanager.hasproducerid()) {
                    runtimeexception lasterror = transactionmanager.lasterror();
                    if (lasterror != null)
                        maybeabortbatches(lasterror);
                    client.poll(retrybackoffms, now);
                    return;
                } else if (transactionmanager.hasabortableerror()) {
                    accumulator.abortundrainedbatches(transactionmanager.lasterror());
                }
            } catch (authenticationexception e) {
                // this is already logged as error, but propagated here to perform any clean ups.
                log.trace("authentication exception while processing transactional request: {}", e);
                transactionmanager.authenticationfailed(e);
            }
        }

        long polltimeout = sendproducerdata(now);
        client.poll(polltimeout, now);
    }

通过上面的分析我们可以看出producer.send操作本身其实是个基于内存的存储操作,耗时几乎可以忽略不计,但由于高并发情况下,线程同步会有一定的性能损耗,当然这个损耗在一般的应用场景下几乎是可以忽略不计的,但如果是数据量比较大,高并发的场景下会比较明显。

针对上面的问题分析,这里说下我个人的一些总结:

1、首先避免多线程操作producer发送数据,你可以采用生产者消费者模式把producer.send从你的多线程操作中解耦出来,维护一个你要发送的消息队列,单独开辟一个线程操作;

2、可能有的小伙伴会问,那么多创建几个producer的实例或者维护一个producer池可以吗,我原本也是这个想法,只是在测试中发现效果也不是很理想,我估计是由于创建producer实例过多,导致线程数量也跟着增加,本身的业务线程再加上kafka的线程,线程上下文切换比较频繁,cpu资源压力比较大,效率也不如单线程操作;

3、这个问题其实真是针对api操作来讲的,send操作并不是真正的数据发送,真正的数据发送由守护线程进行;按照kafka本身的设计思想,如果操作本身就成为了你性能的瓶颈,你应该考虑的是集群部署,负载均衡;

4、无锁才是真正的高性能;