欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Redis的发布订阅与keyspaces-notification实现定时通知

程序员文章站 2022-06-28 17:07:53
Redis的发布订阅与MQ消息队列是一样的,客户端需要首先订阅消息,此事件是阻塞的,发布者发布消息后,订阅了此消息的所有订阅者都会收到进行消费。1.自定义发布订阅#同时订阅了三个频道,一旦其中一个接收到消息就会进行数据的展示subscribe java php c++#发布消息publish java springboot#订阅所有通配的频道psubscribe java* php*2.内部事件发布订阅2.1 业务场景分析业务场景举例:订单超时未支付,实现自动关闭,如何实现?使用...

Redis的发布订阅与MQ消息队列是一样的,客户端需要首先订阅消息,此事件是阻塞的,发布者发布消息后,订阅了此消息的所有订阅者都会收到进行消费。

1.自定义发布订阅

#同时订阅了三个频道,一旦其中一个接收到消息就会进行数据的展示
subscribe java php c++
#发布消息
publish java springboot
#订阅所有通配的频道
psubscribe java* php*

2.内部事件发布订阅

2.1 业务场景分析

业务场景举例:

订单超时未支付,实现自动关闭,如何实现?

  1. 使用Quartz任务调度框架,定时调度方式来检查,如果使用5分钟为单位轮询一次,这样的话假如订单60分钟超时,那么我们可能到(60+5)分钟-1秒才检测到。Quartz更适合周期性的任务,所以在这里显然不适用。
  2. Timer Java原生的定时器,以秒为单位进行检测,可以少量使用,如果数据量大,性能是瓶颈。
  3. Quartz+Timer形式来作:5分钟做一次大循环,在5分钟内查出超时的订单,然后创建多线程通过timer完成订单定时检查,实现比较复杂。

所以如果能够主动的通知的形式来做过期提醒就好了。
恰好redis给我们提供了这样的一个监听机制,当键过期了给我们一个通知。Redis 2.8.X版本后,推出了Keyspace Notifications特性,类似数据库的trigger触发器。
keyspace notifications是基于sub/pub发布订阅机制的,可以接收对数据库中影响key操作的所有事件:比如del、set、expire(过期事件)。。。

2.2 接收的事件类型

有两种:keyspace,keyevent
keyspace: 是key触发的事件的具体操作
keyevent: 是事件影响的键名

#pub这个动作是系统自动发布的
127.0.0.1:6379>del mykey
#数据库0会发布以下两个信息,__这是两个英文状态下的下划线
publish __keyspace@0__:mykey del
publish __keyevent@0__:del mykey
2.3 开启系统通知

redis.conf配置文件里

# 这都是配置项
#  K     Keyspace events, published with __keyspace@<db>__ prefix.
#  E     Keyevent events, published with __keyevent@<db>__ prefix.
#  g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
#  $     String commands
#  l     List commands
#  s     Set commands
#  h     Hash commands
#  z     Sorted set commands
#  x     Expired events (events generated every time a key expires)
#  e     Evicted events (events generated when a key is evicted for maxmemory)
#  A     Alias for g$lshzxe, so that the "AKE" string means all the events.

开启事件通知

#开关在redis.conf配置里
notify-keyspace-events ""  #默认是空字符串,是关闭状态
notify-keyspace-events "KEx"  #配置文件里只配置了space和event的expried事件,就只自动发布这个事件。
2.4 订阅key的编写

通知事件的发生是由Redis自动触发的,不需要手动pub,只需要进行sub订阅即可

订阅语法

#主动订阅key的过期通知
subscribe __keyspace@0__:keyname   #具体的key对应的影响
subscribe __keyevent@0__:expired    #具体的事件影响的key
#如果我们有很多数据库,可以通配订阅
psubscribe __key*@*__:* 

3 springboot集成

POM依赖

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
    </dependencies>

yaml配置

spring:
  redis:
	 host: 127.0.0.1
 	 port: 6379

第一种方式:

service

@Service
public class MessageReceiver {
    //接收消息的方法
    public void receiveMessage(String message){
        //message接收到的过期key
        System.out.println("Redis 监听:"+message);
    }
}

配置类configuration

@Configuration
public class RedisMessageConfig {

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅触发的通道
        container.addMessageListener(listenerAdapter,
                new PatternTopic("__keyevent@0__:expired"));
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(MessageReceiver receiver){
        return new MessageListenerAdapter(receiver,"receiveMessage");
    }
}

第二种方式:继承MessageListener
service
继承KeyspaceEventMessageListener可以订阅到所有的事件,set,del,expire

@Service
public class RedisNormalCmd extends KeyspaceEventMessageListener {

	public RedisNormalCmd(RedisMessageListenerContainer listenerContainer) {
		super(listenerContainer);
	}

	@Override
	protected void doHandleMessage(Message message) {
		System.out.println(message);
	}
}

继承KeyExpirationEventMessageListener只能订阅到键过期的事件

@Service
public class RedisTask  extends KeyExpirationEventMessageListener {

	private Logger logger = LoggerFactory.getLogger(RedisTask.class);

	public RedisTask(RedisMessageListenerContainer listenerContainer) {
		super(listenerContainer);
	}

	@Override
	public void onMessage(Message message, byte[] pattern) {
		String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
		//过期的key
		String key = new String(message.getBody(),StandardCharsets.UTF_8);
		logger.info("redis key 过期:pattern={},channel={},key={}",new String(pattern),channel,key);

	}

}

配置类configuration

@Configuration
public class RedisTimerConfiguration {
	private Logger logger = LoggerFactory.getLogger(RedisTimerConfiguration.class);
	@Autowired
	private RedisConnectionFactory redisConnectionFactory;
	@Bean
	public RedisMessageListenerContainer redisMessageListenerContainer() {
		RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
		redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
		return redisMessageListenerContainer;
	}
}

本文地址:https://blog.csdn.net/Hi_alan/article/details/109531036

相关标签: redis