欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka常用操作命令及生产者与消费者的代码实现

程序员文章站 2024-01-14 11:58:22
...

查看当前服务器中的所有topic
cd /usr/local/kafka/bin
./kafka-topics.sh –list –zookeeper minimaster:2181

创建topic
./kafka-topics.sh –create –zookeeper minimaster:2181 –replication-factor 1 –partitions 1 –topic test2
Kafka常用操作命令及生产者与消费者的代码实现

删除topic
./kafka-topics.sh –delete –zookeeper minimaster:2181 –topic test2
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

通过shell命令发送消息
./kafka-console-producer.sh –broker-list minimaster:9092 –topic test
Kafka常用操作命令及生产者与消费者的代码实现

通过shell消费消息
./kafka-console-consumer.sh –zookeeper minimaster:2181 –from-beginning –topic test

查看消费位置
./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker –zookeeper minimaster:2181 –group testGroup

查看某个Topic的详情
./kafka-topics.sh –topic test –describe –zookeeper minimaster:2181
Kafka常用操作命令及生产者与消费者的代码实现

对分区数进行修改
bin/kafka-topics.sh –zookeeper minimaster –alter –partitions 15 –topic utopic

在IDEA上的代码实现
kafka生产者

package myRPC.qf.itcast.spark

import java.util.Properties

import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

/**
  * 实现一个Producer
  * 1.能够发送数据到kafka集群指定的Topic
  * 2.实现自定义分区器
  */
object KafkaProducer {
  def main(args: Array[String]): Unit = {
    //生产者生产的数据要存储到那个Topic
    val topic = "test2"

    //创建配置文件信息类
    val props: Properties = new Properties()
    //数据序列化编码类型
    props.put("serializer.class","kafka.serializer.StringEncoder")
    //kafka集群列表
    props.put("metadata.broker.list","minimaster:9092,miniSlave1:9092,miniSlave2:9092")
    //设置发送数据是否需要服务端的反馈: 0 1 -1
    //0: producer不会等待broker发送ack
    //1:当leader接收到消息之后发送ack
    //-1:当所有的follower都同步消息成功后发送ack
    props.put("request.required.acks","1")
    //调用分区器
    props.put("partitioner.class","kafka.producer.DefaultPartitioner")
    //    props.put("partitioner.class","com")
    //创建一个生产者对象
    val producer: Producer[String, String] = new Producer(new ProducerConfig(props))
    //模拟生产数据
    for(i <- 1 to 10){
      val msg = s"$i: Producer send data"
      producer.send(new KeyedMessage[String,String](topic,msg))
    }

  }
}

kafka消费者

package myRPC.qf.itcast.spark

import java.util.Properties

import kafka.consumer._
import kafka.message.MessageAndMetadata

import scala.actors.threadpool.{ExecutorService, Executors}
import scala.collection.mutable

class KafkaConsumerTest(val consumer: String,val stream: KafkaStream[Array[Byte],Array[Byte]]) extends Runnable{
  override def run() = {
    val it: ConsumerIterator[Array[Byte],Array[Byte]] = stream.iterator()
    while(it.hasNext()){
      val data: MessageAndMetadata[Array[Byte], Array[Byte]] = it.next()
      val topic: String = data.topic
      val partition: Int = data.partition
      val offset: Long = data.offset
      val msg: String = new String(data.message())
      println(s"Consumer: $consumer,Topic: $topic,Partition: $partition,Offset: $offset,msg: $msg")
    }
  }
}
object KafkaConsumerTest{
  def main(args: Array[String]): Unit = {
    val topic = "test2"

    //用来存储多个Topic
    val topics = new mutable.HashMap[String,Int]()
    topics.put(topic,2)

    //配置文件信息
    val props = new Properties()
    //consumer组id
    props.put("group.id","group1")
    //指定zookeeper的地址,注意在value里逗号后面不能有空格
    props.put("zookeeper.connect","minimaster:2181,miniSlave1:2181,miniSlave2:2181")
    //如果zookeeper没有offset值或offset值超出范围,那么就给个初始的offset
    props.put("auto.offset.reset","smallest")
    //把配置信息封装到ConsumerConfig对象里
    val config: ConsumerConfig = new ConsumerConfig(props)
    //创建Consumer,如果没有数据,会一直线程等待
    val consumer: ConsumerConnector = Consumer.create(config)
    //获取所有Topic的数据流
    val streams: collection.Map[String, List[KafkaStream[Array[Byte], Array[Byte]]]] =
      consumer.createMessageStreams(topics)
    //获取Topic为KafkaSimple的数据流
    val stream: Option[List[KafkaStream[Array[Byte], Array[Byte]]]] = streams.get(topic)
    //创建一个固定大小的线程池
    val pool: ExecutorService = Executors.newFixedThreadPool(3)
    for(i <- 0 until stream.size){
      pool.execute(new KafkaConsumerTest(s"Consumer: $i",stream.get(i)))
    }
  }
}

在IDEA上先运行
KafkaProducer.scala,(开启生产者)显示结果
Kafka常用操作命令及生产者与消费者的代码实现

运行KafkaConsumer.scala,(开启消费者)显示结果:
Kafka常用操作命令及生产者与消费者的代码实现

在Linux上查看结果:
Kafka常用操作命令及生产者与消费者的代码实现

之后,每执行一次producer,在Linux显示上会重复添加相对应的内容