欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

redis用list做消息队列的实现示例

程序员文章站 2023-01-04 17:38:58
目录生产消息服务消费消息服务,定时任务日志测试leftpush消息入队,rightpop对应,消息出队。rightpop(redisconstant.mq_list, 0l, timeunit.sec...

leftpush消息入队,rightpop对应,消息出队。

rightpop(redisconstant.mq_list, 0l, timeunit.seconds)阻塞出队,0表示永久阻塞

生产消息服务

@service
public class redisservice {
    @autowired
    private redistemplate<string, string> redistemplate;


    public object publish() {
        orderdto dto = new orderdto();
        dto.setid(1);
        dto.setcreatetime(new date());
        dto.setmoney("12.34");
        dto.setorderno("orderno1");
        string s = json.tojsonstring(dto);

        listoperations<string, string> listoperations = redistemplate.opsforlist();
        //leftpush和rightpop对应,左边入队,右边出队
        listoperations.leftpush(redisconstant.mq_list, s);

        //因为出队是阻塞读取的,所以上一步入队后,数据立刻就被驱走了,下一步size=0
        long size = listoperations.size(redisconstant.mq_list);
        list<string> list = new arraylist<>();
        if (size != null && size > 0) {
             list = listoperations.range(redisconstant.mq_list, 0, size - 1);
        }
        return list;

    }
}

测试

@restcontroller
@requestmapping("redislist")
public class redislistcontroller {

    @autowired
    private redisservice redisservice;

    @getmapping("publish")
    public object publish() {
        return redisservice.publish();
    }
}

消费消息服务,定时任务

@component
public class redisconsumetask {
    @autowired
    private redisservice redisservice;

    @tasklock(redisconstant.consume_redis_list)
    @scheduled(cron = "0/10 * * * * ?")
    public void consumemqlist() {
        redisservice.consumemqlist();
    }
}

@service
@slf4j
public class redisservice {

    @autowired
    private redistemplate<string, string> redistemplate;

    public void consumemqlist() {
        listoperations<string, string> listoperations = redistemplate.opsforlist();
        //0时间,表示阻塞永久
        //待机一小时后,再次发消息,消费不了了,阻塞有问题啊。还得轮寻啊
        //string s = listoperations.rightpop(redisconstant.mq_list, 0l, timeunit.seconds);
        string s = listoperations.rightpop(redisconstant.mq_list);
        if (s == null) {
            return;
        }

        log.info("{} = {}", redisconstant.mq_list, s);

        orderdto dto = json.parseobject(s, orderdto.class);
        log.info("dto = {}", dto);
    }
}

日志

@component
@aspect
public class tasklockaop {

    @autowired
    private redislockregistry redislockregistry;

    @around("execution(@tasklock * * (..))")
    public object taskaround(proceedingjoinpoint pjp) throws throwable {

        tasklock taskannotation = ((methodsignature)pjp.getsignature()).getmethod().getannotation(tasklock.class);

        string lockkey = taskannotation.value();
        lock lock = redislockregistry.obtain(lockkey);
        try {
            lock.trylock(30l, timeunit.seconds);
            system.out.println("任务开始, " + lockkey + ", " + new date());

            return pjp.proceed();

        } finally {
            lock.unlock();
            system.out.println("任务结束, " + lockkey + ", " + new date());
        }
    }
}

测试

http://localhost:9040/redislist/publish

["{“createtime”:1574394538430,“id”:1,“money”:“12.34”,“orderno”:“orderno1”}"]

redis用list做消息队列的实现示例

下面一直阻塞,任务开始了,不收到消息,永远不会结束。
阻塞有问题,改用轮询了。

先启动发送消息服务,发送消息。后启动消费消息服务,可以消费消息。这一点,比发布订阅要稳定。

关联项目https://github.com/mingwulipo/cloud-demo.git

到此这篇关于redis用list做消息队列的实现示例的文章就介绍到这了,更多相关redis list消息队列内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!