欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka部署及使用(单节点/集群) kafkasinkzookeeperBrokerConsumer 

程序员文章站 2022-05-11 19:20:43
...

从kafka-0.9版本及以后,kafka的消费者组和offset信息就不存zookeeper了,而是存到broker服务器上

三种安装Kafka的方式,分别为:单节点单Broker部署、单节点多Broker部署、集群部署(多节点多Broker)。实际生产环境中使用的是第三种方式,以集群的方式来部署Kafka。
Kafka强依赖ZK,如果想要使用Kafka,就必须安装ZK,Kafka中的消费偏置信息、kafka集群、topic信息会被存储在ZK中。有人可能会说我在使用Kafka的时候就没有安装ZK,那是因为Kafka内置了一个ZK,一般我们不使用它。

一、Kafka 单节点部署

Kafka中单节点部署又分为两种,一种为单节点单Broker部署,一种为单节点多Broker部署。因为是单节点的Kafka,所以在安装ZK时也只需要单节点即可。

 

安装完Zookeeper后,输入命令启动后,jps中并没有查看到QuorumPeerMain进程

1.Kafka 单节点单Broker部署及使用

部署架构

http://kafka.apache.org/quickstart

配置Kafka

参考官网:http://kafka.apache.org/quickstart

> bin/zookeeper-server-start.sh config/zookeeper.properties 启动自带的zookeeper

nohup ./bin/zookeeper-server-start.sh ./config/zookeeper.properties >> /tmp/kafka-logs/zookeeper.out  2>&1 &

1、下载Kafka并解压

  • 下载:

  • 解压:

Kafka目录介绍

  • /bin 操作kafka的可执行脚本,还包含windows下脚本

  • /config 配置文件所在目录

  • /libs 依赖库目录

  • /logs 日志数据目录,目录kafka把server端日志分为5种类型,分为:server,request,state,log-cleaner,controller

配置zookeeper

请参考zookeeper

配置kafka

1)进入kafka安装工程根目录编辑config/server.properties

kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect,kafka server端config/server.properties参数说明和解释如下:

broker.id=0
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=2
log.retention.hours=168

log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=localhost:2181

zookeeper.connection.timeout.ms=1000000

2、启动Kafka

  • 启动

进入kafka目录,敲入命令 bin/kafka-server-start.sh config/server.properties &
nohup ./bin/kafka-server-start.sh ./config/server.properties > /tmp/kafka-logs/kafka.out 2>&1 &
  • 检测2181与9092端口

netstat -tunlp|egrep "(2181|9092)"
tcp        0      0 :::2181                     :::*                        LISTEN      19787/java          
tcp        0      0 :::9092                     :::*                        LISTEN      28094/java 

查看端口lsof -i:2181

netstat -tunlp|grep 2181 #查看zookeeper端口

说明:

Kafka的进程ID为28094,占用端口为9092

QuorumPeerMain为对应的zookeeper实例,进程ID为19787,在2181端口监听

[2019-04-12 01:25:26,175] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-04-12 01:25:26,176] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2019-04-12 01:25:26,176] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-04-12 01:25:26,200] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2019-04-12 01:25:26,211] INFO [SocketServer brokerId=0] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)
[2019-04-12 01:25:26,214] INFO Kafka version: 2.2.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-04-12 01:25:26,214] INFO Kafka commitId: 05fcfde8f69b0349 (org.apache.kafka.common.utils.AppInfoParser)
[2019-04-12 01:25:26,215] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

配置环境变量/etc/profile

# Kafka Environment Variable
export KAFKA_HOME=/opt/applications/kafka
export PATH=$PATH:$KAFKA_HOME/bin

3、单机连通性测试

创建topic

方法一:
执行linux命令:

kafka 0.9版本及以前
bin/kafka-topics.sh --create --topic test --replication-factor 1 --partitions 1 --zookeeper localhost:2181

[wls81@master kafka]$ bin/kafka-topics.sh --create --topic test --replication-factor 1 --partitions 1 --zookeeper localhost:2181
Created topic test.

kafka 0.9版本以后

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

–topic指定topic name 
–partitions指定分区数,这个参数需要根据broker数和数据量决定,正常情况下,每个broker上两个partition最好; 
–replication-factor指定partition的replicas数,建议设置为2;

方法二:
开启自动创建配置:auto.create.topics.enable=true 
使用程序直接往kafka中相应的topic发送数据,如果topic不存在就会按默认配置进行创建。

查看Topic

kafka 0.9版本及以前

bin/kafka-topics.sh --list --zookeeper localhost:2181

[wls81@master kafka]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
test

kafka 0.9版本以后

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

启动2个XSHELL客户端,一个用于生产者发送消息,一个用于消费者接受消息。

  • 运行producer,随机敲入几个字符,相当于把这个敲入的字符消息发送给队列。

bin/kafka-console-producer.sh --broker-list 192.168.1.15:9092 --topic test

说明:早版本的Kafka,--broker-list 192.168.1.15:9092需改为--zookeeper 192.168.1.15:2181

  • 运行consumer,可以看到刚才发送的消息列表。

before kafka 0.9 version

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning  
after kafka 0.9 version
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test--from-beginning 
  • 注意:

producer,指定的Socket(192.168.1.15+9092),说明生产者的消息要发往kafka,也即是broker

consumer, 指定的Socket(192.168.1.15+2181),说明消费者的消息来自zookeeper(协调转发)

总结:–from-beginning参数如果有表示从最开始消费数据,旧的和新的数据都会被消费,而没有该参数表示只会消费新产生的数据

上面的只是一个单个的broker,下面我们来实验一个多broker的集群。

二 搭建一个多个broker的伪集群

刚才只是启动了单个broker,现在启动有3个broker组成的集群,这些broker节点也都是在本机上。

(1)为每一个broker提供配置文件

我们先看看config/server0.properties配置信息:

broker.id=0
listeners=PLAINTEXT://:9092
port=9092
host.name=192.168.1.15
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=5
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=192.168.1.15:2181
zookeeper.connection.timeout.ms=6000
queued.max.requests =500
log.cleanup.policy = delete

server1.properties:

server2.properties:

broker.id=2
listeners=PLAINTEXT://:9094
port=9094
host.name=192.168.1.15
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs2
num.partitions=5
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=192.168.1.15:2181
zookeeper.connection.timeout.ms=6000
queued.max.requests =500
log.cleanup.policy = delete

(2)启动所有的broker

命令如下:

nohup ./bin/kafka-server-start.sh config/server0.properties > /tmp/kafka-logs/kafka0.out 2>&1 &  #启动broker0

nohup ./bin/kafka-server-start.sh config/server1.properties > /tmp/kafka-logs/kafka1.out 2>&1 & #启动broker1

nohup ./bin/kafka-server-start.sh config/server2.properties > /tmp/kafka-logs/kafka2.out 2>&1 & #启动broker2
>/dev/null 2>&1这样的语句

会将标准输出,错误输出都重定向至/dev/null,也就是全部丢弃

查看2181、9092、9093、9094端口

查看端口lsof -i:2181

netstat -tunlp|egrep "(2181|9092|9093|9094)"
tcp        0      0 :::9093                     :::*                        LISTEN      29725/java          
tcp        0      0 :::2181                     :::*                        LISTEN      19787/java          
tcp        0      0 :::9094                     :::*                        LISTEN      29800/java          
tcp        0      0 :::9092                     :::*                        LISTEN      29572/java  

一个zookeeper在2181端口上监听,3个kafka cluster(broker)分别在端口9092,9093,9094监听。

(3)创建topic

0.9之前的版本,创建topic

bin/kafka-topics.sh --create --topic topic_1 --partitions 1 --replication-factor 3  \--zookeeper localhost:2181

查看topic创建情况:

bin/kafka-topics.sh --list --zookeeper localhost:2181 test topic_1 topic_2 topic_3

删除topic:
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topic_1

0.9之后版本,目前使用这个之后版本
创建Topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic topic_new


bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic topic_new

[wls81@master kafka]$ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic topic_new

Topic:topic_new PartitionCount:1 ReplicationFactor:3 Configs:cleanup.policy=delete,segment.bytes=1073741824

Topic: topic_new Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0

 

下面是输出的解释。第一行给出了所有分区的摘要,每一行给出了关于一个分区的信息。因为这个主题只有一个分区,所以只有一行。


“leader”是负责给定分区的所有读写的节点。每个节点都将是分区中随机选择的部分的领导者。

“replicas”是复制此分区日志的节点列表,不管它们是领头节点还是当前活动的节点。

“ISR”是一组“同步”副本。这是副本列表的一个子集,当前处于活动状态并被领导者捕获。

注意,在我的示例中,节点2是主题唯一分区的前导。

(4)发送消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_new

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_new
...
my test message 1
my test message 2
^C

接受消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic topic_new

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic topic_new
...
my test message 1
my test message 2
^C

需要注意,此时producer将topic发布到了3个broker中,现在就有点分布式的概念了。


(5)测试一下容错性
broker 1是领导者,所以让我们杀了它

[wls81@master logs]$ ps aux | grep server2.properties

wls81 6910 2.1 2.5 8201816 416932 pts/2 Sl 00:42 0:15 /

...

[wls81@master logs]$ kill -9 6910

Leadership已切换到其中一个追随者节点1,节点2不再位于同步副本集中:

[wls81@master kafka]$ bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic topic_new

Topic:topic_new PartitionCount:1 ReplicationFactor:3 Configs:cleanup.policy=delete,segment.bytes=1073741824

Topic: topic_new Partition: 0 Leader: 1 Replicas: 2,1,0 Isr: 1,0

但是这些消息仍然可以被使用,即使最初记录这些内容的领导者已经下了

发送消息为

wls81@master kafka]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic_new

>dsaf

>test

>Kill leader2,and then operation

>

消费消息:中间部分为删除Leader2造成的

[wls81@master kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic topic_new

dsaf

test

[2019-04-13 00:55:11,758] WARN [Consumer clientId=consumer-1, groupId=console-consumer-1287] Connection to node 2 (/192.168.1.15:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

[2019-04-13 00:55:11,921] WARN [Consumer clientId=consumer-1, groupId=console-consumer-1287] Connection to node 2 (/192.168.1.15:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

[2019-04-13 00:55:12,126] WARN [Consumer clientId=consumer-1, groupId=console-consumer-1287] Connection to node 2 (/192.168.1.15:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

[2019-04-13 00:55:12,632] WARN [Consumer clientId=consumer-1, groupId=console-consumer-1287] Connection to node 2 (/192.168.1.15:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

[2019-04-13 00:55:13,537] WARN [Consumer clientId=consumer-1, groupId=console-consumer-1287] Connection to node 2 (/192.168.1.15:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

[2019-04-13 00:55:14,446] WARN [Consumer clientId=consumer-1, groupId=console-consumer-1287] Connection to node 2 (/192.168.1.15:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

[2019-04-13 00:55:15,325] WARN [Consumer clientId=consumer-1, groupId=console-consumer-1287] Connection to node 2 (/192.168.1.15:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

[2019-04-13 00:55:16,334] WARN [Consumer clientId=consumer-1, groupId=console-consumer-1287] Connection to node 2 (/192.168.1.15:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Kill leader2,and then operation

 

import/export data

Kafka Connect是Kafka附带的一个工具,用于将数据导入和导出到Kafka。它是一个可扩展的工具,运行连接器,实现与外部系统交互的自定义逻辑。在这个快速启动中,我们将看到如何使用简单的连接器运行Kafka Connect,这些连接器将数据从文件导入到Kafka主题,并将数据从Kafka主题导出到文件

创建测试数据

[wls81@master kafka]$ echo -e "foo\nbar" > test.txt

[wls81@master kafka]$ pwd

/opt/applications/kafka

我们将启动两个以独立模式运行的连接器,这意味着它们在一个单独的、本地的、专用的进程中运行。

我们提供三个配置文件作为参数。第一个始终是Kafka连接进程的配置,包含公共配置,如要连接的Kafka代理和数据的序列化格式。其余的配置文件都指定要创建的连接器。

这些文件包括唯一的连接器名称、要实例化的连接器类以及连接器所需的任何其他配置。

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

输出内容:

[2019-04-14 23:31:33,339] INFO Kafka commitId: 05fcfde8f69b0349 (org.apache.kafka.common.utils.AppInfoParser:110)

[2019-04-14 23:31:33,341] INFO Created connector local-file-sink (org.apache.kafka.connect.cli.ConnectStandalone:110)

[2019-04-14 23:31:33,342] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] Subscribed to topic(s): connect-test (org.apache.kafka.clients.consumer.KafkaConsumer:936)

[2019-04-14 23:31:33,342] INFO WorkerSinkTask{id=local-file-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:301)

[2019-04-14 23:31:33,346] INFO Cluster ID: zHJkKjVvQd6TaV1K97QBYQ (org.apache.kafka.clients.Metadata:365)

[2019-04-14 23:31:33,347] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] Discovered group coordinator 192.168.1.15:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:675)

[2019-04-14 23:31:33,348] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:459)

[2019-04-14 23:31:33,348] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)

[2019-04-14 23:31:33,352] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:491)

[2019-04-14 23:31:33,356] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] Successfully joined group with generation 5 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:455)

[2019-04-14 23:31:33,357] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] Setting newly assigned partitions: connect-test-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:290)

[2019-04-14 23:31:33,364] INFO [Consumer clientId=consumer-1, groupId=connect-local-file-sink] Resetting offset for partition connect-test-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:584)

[2019-04-14 23:31:43,310] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)

[2019-04-14 23:31:43,310] INFO WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:415)

[2019-04-14 23:31:43,317] INFO WorkerSourceTask{id=local-file-source-0} Finished commitOffsets successfully in 7 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:497)

[2019-04-14 23:31:43,341] INFO WorkerSinkTask{id=local-file-sink-0} Committing offsets asynchronously using sequence number 1: {connect-test-0=OffsetAndMetadata{offset=2, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:344)

这些示例配置文件(包含在Kafka中)使用之前启动的默认本地集群配置并创建两个连接器:第一个是源连接器,它从输入文件读取行,并将每个行生成到Kafka主题;第二个是接收连接器,它从Kafka主题读取消息,并将每个消息生成输出文件的行

在启动期间,您将看到许多日志消息,包括一些指示连接器正在被实例化的消息。一旦kafka connect进程启动,源连接器应开始读取test.txt中的行并将其生成到主题连接测试,而接收器连接器应开始读取主题连接测试中的消息并将其写入文件test.sink.txt。

我们可以通过检查输出文件的内容来验证数据已通过整个管道传递:

 

[wls81@master kafka]$ more test.sink.txt

 

foo

 

bar

 

[wls81@master kafka]$ pwd

 

/opt/applications/kafka

 

 

注意,数据存储在Kafka Topic Connect测试中,因此我们还可以运行控制台使用者来查看主题中的数据

[wls81@master kafka]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

{"schema":{"type":"string","optional":false},"payload":"foo"}

{"schema":{"type":"string","optional":false},"payload":"bar"}

连接器继续处理数据,因此我们可以将数据添加到文件中,并看到它在管道中移动:

[wls81@master kafka]$ echo Another line2>> test.txt

[wls81@master kafka]$ echo Another line3>> test.txt

[wls81@master kafka]$

 

[wls81@master kafka]$ tail -f test.sink.txt
foo
bar
fdasd

dfa
test
taet

Another line
Another line2
Another line3