欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

开源组件系列(4):分布式消息队列(Kafka)

程序员文章站 2022-07-14 22:52:31
...

目录

(一)消息队列概述

(二)Kafka基本架构

(三)Kafka组件介绍

(四)Kafka关键技术点

(五)Kafka示例任务


(一)消息队列概述

不论是系统产生的数据日志,还是对应的数据系统,从来都不是单一的对应关系,而是多种数据日志对应多套数据系统的复杂关联。假设我们将采集的数据日志直接传输到目标数据系统中,一旦因为业务扩展而产生的新的数据系统建设需求,那么依赖关系势必变得非常混乱。

开源组件系列(4):分布式消息队列(Kafka)

传统日志采集方式存在如下几个问题:

1.数据日志与数据系统之间的耦合度太高,当数据日志或者数据系统需要扩展时,需要修改非常多的依赖关系;

2.数据日志的产生速度与数据系统的处理速度不对等,如果遇到秒杀等场景,很容易引起系统崩溃;

3.依赖关系众多导致并发很高,对于网络压力影响很大,容易成为系统瓶颈。

 

为了降低数据日志和数据系统之间的耦合性,因此设计了消息队列,成为两者之间的“中间件”。

开源组件系列(4):分布式消息队列(Kafka)

以Kafka为代表的消息队列有如下几方面的有点:

1.缓冲数据压力:当数据日志短时间内增加较多时,消息队列能够将数据系统无法处理的部分缓冲期来,防止系统压力过大;

2.降低耦合度:消息队列支持生产者/消费者模式,支持下游订阅数据,如果需要新增数据日志或数据系统,只需要修改配置文件,不需要修改系统代码;

3.优秀的扩展性:消息队列多采用分布式架构设计,数据经过分片同时写到多个节点中,避免单节点的瓶颈问题,并在秒杀等场景时提供动态扩展能力;

4.良好的容错性:数据日志在Kafka中会持久化到磁盘上,并通过分布式的多副本策略来避免数据丢失。

 

(二)Kafka基本架构

 

Kafka作为一个集群中间件,需要运行在一台或者多台服务器上,Kafka通过Topic对存储的流数据进行分类,每条记录中包含一个Key,一个Value和一个Timestamp。在运行时,Producer将数据写入到Broker中,由Broker负责构建分布式的消息存储系统了,将消息划分为多个Topic,然后再由Consumer从Broker读取数据并进行处理。Kafka采用了push – pull的架构,即收到数据后,直接将数据push给对应的Broker,再由Consumer从Broker中将数据pull出来。

开源组件系列(4):分布式消息队列(Kafka)

 

(三)Kafka组件介绍

 

Kafka主要由Producer、Broker、Consumer及Zookeeper组成。相关组件的介绍如下:

1.Producer

由用户使用Kafka相关的SDK进行开发,Producer负责将数据发送给Broker。在Kafka中,每条数据被称为一个“消息”,由“三元组”组成。“三元组”包括:Topic、Key及Message。

(1)Topic:表示该条消息所述的Topic,是一种逻辑上的切分概念,一个Topic可以分给多个不同的Broker;

(2)Key:表示该条消息的主键,Kafka会根据每条数据的Key将消息分到不同的分区(Partition)中,默认是哈希取模的算法,用户也可以自行定义相关的分区算法;

(3)Message:表示该条消息的值,通常为字节数组,也可以使用String、JSON、Avro、Thrif、Protobuf等结构。

2.Broker

在分布式的Kafka中,出于容错的考虑,Broker一般有多个,负责接收Producer和Consumer的请求,并将消息持久化到本次磁盘。Broker以Topic为单位将消息分成不同的分区(Partition),每个分区可以有多个副本,通过数据冗余的方式来实现容错。当分区(Partition)存在多个副本时,其中会有一个Leader,对外提供读写请求,其他的都是Follower,不提供读写服务,只是同步Leader数据,并且在Leader出问题时,选出一个成为新的Leader。这种容错方式与Mysql的主备比较类似。

Broker能够保证统一Topic下的同一Partition内消息是有序的,但无法保证Partition之间全局有序。这意味着,Comsumer在消费某个Topic下的消息时,可能得到与写入顺序不同的消息序列。

开源组件系列(4):分布式消息队列(Kafka)

Brokder以追加的方式将消息写到磁盘中,并且每个分区中的消息被赋予了唯一整数标识,称之为偏移量(Offset)。Broker只提供基于Offset的读取方式,并不会维护各个Consumer当前已消费的Offset值,而是由Consumer各自维护当前读取的进度。Broker中保存的数据是有有效期的,比如7天,一旦超过了有效期,对应的数据将被释放以释放磁盘空间。只要数据在有效期内,Consumer可以重复读取而不受限制。

开源组件系列(4):分布式消息队列(Kafka)

3.Consumer

负责从Broker中拉取消息并进行处理,每个Consumer维护最后一个已读消息的Offset,并在下次请求开始时从这个Offset开始读取消息,这种机制使得Broker的吞吐效率很高。值得注意的是,Kafka允许多个Consumer构成一个Consumer Group,共同读取一个Topic中的数据。

开源组件系列(4):分布式消息队列(Kafka)

4.Zookeeper

Zookeeper负责提供分布式的协调服务,所有Broker会向Zookeeper进行注册,并汇报相关状态,使Consumer及时获取这些数据。当一个Consumer宕机后,其他Consumer会通过Zookeeper发现这一故障,并自动分摊对应的数据负载,触发容错机制。

 

(四)Kafka关键技术点

 

1.提供可控的可靠性级别:

Producer可通过两种方式向Broker发送数据:同步或异步,其中异步方式通过批处理来处理数据,大大提高了数据的写入效率。当Producer向Broker发送数据时,可通过设置该数据的应答方式,控制写性能与可靠性级别。当可靠性级别提升时,写性能会下降;反之,可靠性级别下降时,写性能会提高很多。Kafka提供三种消息应答方式:

0:无需对消息进行确认,Producer发送消息后马上返回,无需等待对方写入成功;

1:当Producer发送消息后,需要等到Leader Partition写成功后才会返回,但对应的Follower Partition不一定写成功,这种方式属于性能可靠性比较折中的一种方式,能够在比较高效的情况下,保证数据至少成功写入一个节点;

2:当Producer发送消息后,需要等到所有的Partition写成功后才返回,如果设置的消息副本数大于1,意味着被成功写入了多个节点,可靠性很高,但写性能比较低。

 

2.数据多副本:

Broker允许为每个Topic中的数据存放多个副本,以达到容错的目的。Kafka采用了强一致的数据复制策略。在数据存入时,会首先写入到Leader Partition,之后由Leader Partition将消息同步给其他副本。Broker的负载均衡实际上就是对Leader Partition的负载均衡,即保证Leader Partition在各个Broker上数据尽可能相近。

开源组件系列(4):分布式消息队列(Kafka)

 

3.高效的持久化机制:

为了应对大数据的应用场景,Broker直接将消息持计划到磁盘上而不是内存中,这就要求必须采用非常高效的数据写入和存储方式。由于顺序写入的速度要远高于随机写,因此Kafka用顺序写配合Offset的方式组织数据,能够达到很好的读写速度。

 

4.数据传输优化:

为了优化Broker与Consumer之间的网络数据传输效率,Kafka引入了比较多的优化技术,最典型的是批处理和Zero-copy。

批处理:为了降低单条消息传输带来的网络开销,Broker将多条消息组装在一起,一并发送给Consumer,并且将格式进行了统一设计,保证了数据存储和发送时的一致,避免额外转换带来的开销。

Zero-copy:一条数据在磁盘上从读取到发送需要经过四次拷贝与两次系统调用,四次拷贝顺序依次为:内核态reader buffer – 用户态应用程序buffer – 内核态socket buffer – 网卡NIC buffer,通过Zero-copy优化之后,数据只需要经过三次拷贝便可以发送出去,省去了用户态应用程序buffer的过程。

 

5.可控的消息传递语义:

在消息队列中,根据接受者可能受到的重复消息次数,消息传递语义可以分为三种:

1.at most once:发送者将消息发送给消费者后,立即返回,不关心消费者是否成功收到消息;

2.at least once:发送者将消息发送给消费者后,等待确认,如果未收到确认消息,则会重发消息;

3.exactly once:消费者会且只会收到同一条消息一次,通常有两种方式实现这种语义:两段锁协议和支持幂等操作。

 

(五)Kafka示例任务

 

Step1:下载代码

tar -xzf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0

Step2:启动服务器

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

Step3:创建一个Topic

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

> bin/kafka-topics.sh --list --zookeeper localhost:2181

test

Step4:发送一些消息

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

Step5:启动一个Consumer

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message

Step6:设置多代理集群

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties

config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
 
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic


> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

Java Producer示例

public class Producer {
    public static String topic = "duanjt_test";//定义主题

    public static void main(String[] args) throws InterruptedException {
        Properties p = new Properties();
        p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.23.76:9092,192.168.23.77:9092");//kafka地址,多个地址用逗号分割
        p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);

        try {
            while (true) {
                String msg = "Hello," + new Random().nextInt(100);
                ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, msg);
                kafkaProducer.send(record);
                System.out.println("消息发送成功:" + msg);
                Thread.sleep(500);
            }
        } finally {
            kafkaProducer.close();
        }

    }
}

Java Consumer示例

public class Consumer {
    public static void main(String[] args) {
        Properties p = new Properties();
        p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.23.76:9092");
        p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        p.put(ConsumerConfig.GROUP_ID_CONFIG, "duanjt_test");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(p);
        kafkaConsumer.subscribe(Collections.singletonList(Producer.topic));// 订阅消息

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format("topic:%s,offset:%d,消息:%s", //
                        record.topic(), record.offset(), record.value()));
            }
        }
    }
}

 

相关标签: 数据仓库