欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

spring boot整合spring-kafka实现发送接收消息实例代码

程序员文章站 2023-12-04 14:47:23
前言 由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我...

前言

由于我们的新项目使用的是spring-boot,而又要同步新项目中建的数据到老的系统当中.原来已经有一部分的同步代码,使用的是kafka. 其实只是做数据的同步,我觉得选mq没必要使用kafka.首先数据量不大,其实搞kafka又要搞集群,zk.只是用做一些简单数据同步的话,有点大材小用.

没办法,咱只是个打工的,领导让搞就搞吧.刚开始的时候发现有一个spring-integration-kafka,描述中说是基于spring-kafka做了一次重写.但是我看了官方文档.实在是搞的有点头大.功能一直没实现.文档写的也不是很漂亮,也可能是刚起步,有很多的问题.我这里只能放弃了,使用了spring-kafka.

实现方法

pom.xml文件如下

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0"
   xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
   xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelversion>4.0.0</modelversion>
 
 <groupid>org.linuxsogood.sync</groupid>
 <artifactid>linuxsogood-sync</artifactid>
 <version>1.0.0-snapshot</version>
 
 <parent>
  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter-parent</artifactid>
  <version>1.4.0.release</version>
 </parent>
 
 <properties>
  <java.version>1.8</java.version>
  <!-- 依赖版本 -->
  <mybatis.version>3.3.1</mybatis.version>
  <mybatis.spring.version>1.2.4</mybatis.spring.version>
  <mapper.version>3.3.6</mapper.version>
  <pagehelper.version>4.1.1</pagehelper.version>
 </properties>
 
 <dependencies>
  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-web</artifactid>
  </dependency>
  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-jdbc</artifactid>
  </dependency>
  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-aop</artifactid>
  </dependency>
  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-freemarker</artifactid>
  </dependency>
  <!--<dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-integration</artifactid>
   <scope>compile</scope>
  </dependency>
  <dependency>
   <groupid>org.springframework.integration</groupid>
   <artifactid>spring-integration-kafka</artifactid>
   <version>2.0.1.release</version>
   <scope>compile</scope>
  </dependency>
  <dependency>
   <groupid>org.springframework.integration</groupid>
   <artifactid>spring-integration-core</artifactid>
   <version>4.3.1.release</version>
   <scope>compile</scope>
  </dependency>-->
  <dependency>
   <groupid>org.springframework.kafka</groupid>
   <artifactid>spring-kafka</artifactid>
   <version>1.1.0.release</version>
  </dependency>
  <!--<dependency>
   <groupid>org.springframework.kafka</groupid>
   <artifactid>spring-kafka-test</artifactid>
   <version>1.1.0.release</version>
  </dependency>-->
  <dependency>
   <groupid>junit</groupid>
   <artifactid>junit</artifactid>
   <version>4.12</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupid>org.assertj</groupid>
   <artifactid>assertj-core</artifactid>
   <version>3.5.2</version>
  </dependency>
  <dependency>
   <groupid>org.hamcrest</groupid>
   <artifactid>hamcrest-all</artifactid>
   <version>1.3</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupid>org.mockito</groupid>
   <artifactid>mockito-all</artifactid>
   <version>1.9.5</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupid>org.springframework</groupid>
   <artifactid>spring-test</artifactid>
   <version>4.2.3.release</version>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupid>org.springframework.boot</groupid>
   <artifactid>spring-boot-starter-test</artifactid>
   <scope>test</scope>
  </dependency>
  <dependency>
   <groupid>mysql</groupid>
   <artifactid>mysql-connector-java</artifactid>
  </dependency>
  <dependency>
   <groupid>com.microsoft.sqlserver</groupid>
   <artifactid>sqljdbc4</artifactid>
   <version>4.0.0</version>
  </dependency>
  <dependency>
   <groupid>com.alibaba</groupid>
   <artifactid>druid</artifactid>
   <version>1.0.11</version>
  </dependency>
 
  <!--mybatis-->
  <dependency>
   <groupid>org.mybatis</groupid>
   <artifactid>mybatis</artifactid>
   <version>${mybatis.version}</version>
  </dependency>
  <dependency>
   <groupid>org.mybatis</groupid>
   <artifactid>mybatis-spring</artifactid>
   <version>${mybatis.spring.version}</version>
  </dependency>
  <!--<dependency>
   <groupid>org.mybatis.spring.boot</groupid>
   <artifactid>mybatis-spring-boot-starter</artifactid>
   <version>1.1.1</version>
  </dependency>-->
  <!-- mybatis generator -->
  <dependency>
   <groupid>org.mybatis.generator</groupid>
   <artifactid>mybatis-generator-core</artifactid>
   <version>1.3.2</version>
   <scope>compile</scope>
   <optional>true</optional>
  </dependency>
  <!--分页插件-->
  <dependency>
   <groupid>com.github.pagehelper</groupid>
   <artifactid>pagehelper</artifactid>
   <version>${pagehelper.version}</version>
  </dependency>
  <!--通用mapper-->
  <dependency>
   <groupid>tk.mybatis</groupid>
   <artifactid>mapper</artifactid>
   <version>${mapper.version}</version>
  </dependency>
  <dependency>
   <groupid>com.alibaba</groupid>
   <artifactid>fastjson</artifactid>
   <version>1.2.17</version>
  </dependency>
 </dependencies>
 <repositories>
  <repository>
   <id>repo.spring.io.milestone</id>
   <name>spring framework maven milestone repository</name>
   <url>https://repo.spring.io/libs-milestone</url>
  </repository>
 </repositories>
 <build>
  <finalname>mybatis_generator</finalname>
  <plugins>
   <plugin>
    <groupid>org.mybatis.generator</groupid>
    <artifactid>mybatis-generator-maven-plugin</artifactid>
    <version>1.3.2</version>
    <configuration>
     <verbose>true</verbose>
     <overwrite>true</overwrite>
    </configuration>
   </plugin>
   <plugin>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-maven-plugin</artifactid>
    <configuration>
     <mainclass>org.linuxsogood.sync.starter</mainclass>
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>

orm层使用了mybatis,又使用了通用mapper和分页插件.

kafka消费端配置

import org.linuxsogood.sync.listener.listener;
import org.apache.kafka.clients.consumer.consumerconfig;
import org.apache.kafka.common.serialization.stringdeserializer;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.annotation.enablekafka;
import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory;
import org.springframework.kafka.config.kafkalistenercontainerfactory;
import org.springframework.kafka.core.consumerfactory;
import org.springframework.kafka.core.defaultkafkaconsumerfactory;
import org.springframework.kafka.listener.concurrentmessagelistenercontainer;
 
import java.util.hashmap;
import java.util.map;
 
@configuration
@enablekafka
public class kafkaconsumerconfig {
 
 @value("${kafka.broker.address}")
 private string brokeraddress;
 
 @bean
 kafkalistenercontainerfactory<concurrentmessagelistenercontainer<string, string>> kafkalistenercontainerfactory() {
 concurrentkafkalistenercontainerfactory<string, string> factory = new concurrentkafkalistenercontainerfactory<>();
 factory.setconsumerfactory(consumerfactory());
 factory.setconcurrency(3);
 factory.getcontainerproperties().setpolltimeout(3000);
 return factory;
 }
 
 @bean
 public consumerfactory<string, string> consumerfactory() {
 return new defaultkafkaconsumerfactory<>(consumerconfigs());
 }
 
 @bean
 public map<string, object> consumerconfigs() {
 map<string, object> propsmap = new hashmap<>();
 propsmap.put(consumerconfig.bootstrap_servers_config, this.brokeraddress);
 propsmap.put(consumerconfig.enable_auto_commit_config, false);
 propsmap.put(consumerconfig.auto_commit_interval_ms_config, "100");
 propsmap.put(consumerconfig.session_timeout_ms_config, "15000");
 propsmap.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);
 propsmap.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class);
 propsmap.put(consumerconfig.group_id_config, "firehome-group");
 propsmap.put(consumerconfig.auto_offset_reset_config, "earliest");
 return propsmap;
 }
 
 @bean
 public listener listener() {
 return new listener();
 }
}

生产者的配置.

import org.apache.kafka.clients.producer.producerconfig;
import org.apache.kafka.common.serialization.stringserializer;
import org.springframework.beans.factory.annotation.value;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.springframework.kafka.annotation.enablekafka;
import org.springframework.kafka.core.defaultkafkaproducerfactory;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.kafka.core.producerfactory;
 
import java.util.hashmap;
import java.util.map;
 
@configuration
@enablekafka
public class kafkaproducerconfig {
 
 @value("${kafka.broker.address}")
 private string brokeraddress;
 
 @bean
 public producerfactory<string, string> producerfactory() {
 return new defaultkafkaproducerfactory<>(producerconfigs());
 }
 
 @bean
 public map<string, object> producerconfigs() {
 map<string, object> props = new hashmap<>();
 props.put(producerconfig.bootstrap_servers_config, this.brokeraddress);
 props.put(producerconfig.retries_config, 0);
 props.put(producerconfig.batch_size_config, 16384);
 props.put(producerconfig.linger_ms_config, 1);
 props.put(producerconfig.buffer_memory_config, 33554432);
 props.put(producerconfig.key_serializer_class_config, stringserializer.class);
 props.put(producerconfig.value_serializer_class_config, stringserializer.class);
 return props;
 }
 
 @bean
 public kafkatemplate<string, string> kafkatemplate() {
 return new kafkatemplate<string, string>(producerfactory());
 }
}

监听,监听里面,写的就是业务逻辑了,从kafka里面得到数据后,具体怎么去处理. 如果需要开启kafka处理消息的广播模式,多个监听要监听不同的group,即方法上的注解@kafkalistener里的group一定要不一样.如果多个监听里的group写的一样,就会造成只有一个监听能处理其中的消息,另外监听就不能处理消息了.也即是kafka的分布式消息处理方式.

在同一个group里的监听,共同处理接收到的消息,会根据一定的算法来处理.如果不在一个组,但是监听的是同一个topic的话,就会形成广播模式

import com.alibaba.fastjson.json;
import org.linuxsogood.qilian.enums.cupmessagetype;
import org.linuxsogood.qilian.kafka.messagewrapper;
import org.linuxsogood.qilian.model.store.store;
import org.linuxsogood.sync.mapper.storemapper;
import org.linuxsogood.sync.model.storeexample;
import org.apache.commons.lang3.stringutils;
import org.apache.kafka.clients.consumer.consumerrecord;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.kafka.annotation.kafkalistener;

import java.util.list;
import java.util.optional;

public class listener {

 private static final logger logger = loggerfactory.getlogger(listener.class);

 @autowired
 private storemapper storemapper;

 /**
  * 监听kafka消息,如果有消息则消费,同步数据到新烽火的库
  * @param record 消息实体bean
  */
 @kafkalistener(topics = "linuxsogood-topic", group = "sync-group")
 public void listen(consumerrecord<?, ?> record) {
  optional<?> kafkamessage = optional.ofnullable(record.value());
  if (kafkamessage.ispresent()) {
   object message = kafkamessage.get();
   try {
    messagewrapper messagewrapper = json.parseobject(message.tostring(), messagewrapper.class);
    cupmessagetype type = messagewrapper.gettype();
    //判断消息的数据类型,不同的数据入不同的表
    if (cupmessagetype.store == type) {
     proceedstore(messagewrapper);
    }
   } catch (exception e) {
    logger.error("将接收到的消息保存到数据库时异常, 消息:{}, 异常:{}",message.tostring(),e);
   }
  }
 }

 /**
  * 消息是店铺类型,店铺消息处理入库
  * @param messagewrapper 从kafka中得到的消息
  */
 private void proceedstore(messagewrapper messagewrapper) {
  object data = messagewrapper.getdata();
  store cupstore = json.parseobject(data.tostring(), store.class);
  storeexample storeexample = new storeexample();
  string storename = stringutils.isblank(cupstore.getstoreoldname()) ? cupstore.getstorename() : cupstore.getstoreoldname();
  storeexample.createcriteria().andstorenameequalto(storename);
  list<org.linuxsogood.sync.model.store> stores = storemapper.selectbyexample(storeexample);
  org.linuxsogood.sync.model.store convertstore = new org.linuxsogood.sync.model.store();
  org.linuxsogood.sync.model.store store = convertstore.convert(cupstore);
  //如果查询不到记录则新增
  if (stores.size() == 0) {
   storemapper.insert(store);
  } else {
   store.setstoreid(stores.get(0).getstoreid());
   storemapper.updatebyprimarykey(store);
  }
 }

}

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对的支持。