欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

springboot项目中使用netty+websocket 实现消息推送(带校验用户是否登陆功能)

程序员文章站 2022-07-03 17:06:31
...

maven 引入包:

<!--    websocket    -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.36.Final</version>
</dependency>

 netty 启动server类:

package com.minivision.user.manage.websocket;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * webSocket启动server <br>
 * 
 * @author yangxiaodong<br>
 * @version 1.0<br>
 * @taskId <br>
 * @CreateDate 2019年7月22日 <br>
 */

@Component
public class NettyServer {
    /**
     * 调测日志
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketHandler.class);

    /**
     * websocket协议名 <br>
     */
    private static final String WEBSOCKET_PROTOCOL = "WebSocket";

    /**
     * 端口号 <br>
     */
    @Value("${websocket.netty.port:58080}")
    private int port;

    /**
     * wesocket路径 <br>
     */
    @Value("${websocket.netty.path:/websocket}")
    private String websocketPath;

    /**
     * websokcet处理器 <br>
     */
    @Autowired
    private WebSocketHandler webSocketHandler;

    /**
     * permissionWebSocketHandler <br>
     */
    @Autowired
    private PermissionWebSocketHandler permissionWebSocketHandler;

    /**
     * bossGroup <br>
     */
    private EventLoopGroup bossGroup;

    /**
     * parentGroup <br>
     */
    private EventLoopGroup group;

    /**
     * 启动: <br>
     * 
     * @author yangxiaodong<br>
     * @throws InterruptedException
     * @taskId <br>
     * @throws InterruptedException <br>
     */
    @PostConstruct
    public void start() throws InterruptedException {
        bossGroup = new NioEventLoopGroup();
        group = new NioEventLoopGroup();
        ServerBootstrap sb = new ServerBootstrap();
        sb.option(ChannelOption.SO_BACKLOG, 1024);// 配置TCP参数,握手字符串长度设置
        sb.group(group, bossGroup) // group辅助客户端的tcp连接请求 bossGroup负责与客户端之前的读写操作
                .channel(NioServerSocketChannel.class) // 配置客户端的channel类型
                .localAddress(this.port)// 绑定监听端口
                .childHandler(new ChannelInitializer<SocketChannel>() { // 绑定客户端连接时候触发操作
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // websocket协议本身是基于http协议的,所以这边也要使用http解编码器
                        ch.pipeline().addLast(new HttpServerCodec());
                        ch.pipeline().addLast(new ObjectEncoder());
                        // 以块的方式来写的处理器
                        ch.pipeline().addLast(new ChunkedWriteHandler());
                        ch.pipeline().addLast(new HttpObjectAggregator(8192));
                        ch.pipeline().addLast(permissionWebSocketHandler);
                        ch.pipeline().addLast(new WebSocketServerProtocolHandler(websocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
                        ch.pipeline().addLast(webSocketHandler);
                    }
                });
        // 服务器异步创建绑定
        sb.bind().sync();
        LOGGER.info("websocket server start");
    }

    /**
     * 释放线程池资源: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @throws InterruptedException <br>
     */
    @PreDestroy
    private void destory() throws InterruptedException {
        if (null != bossGroup) {
            bossGroup.shutdownGracefully().sync();
        }
        if (null != group) {
            group.shutdownGracefully().sync();
        }
    }
}

 用户登陆验证码处理器:

通过ws后面携带的参数,进行用户登陆校验,

思路:获取到ws参数后,处理连接地址,把携带的参数去掉,否则,channel会自动断开的

package com.minivision.user.manage.websocket;

import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.minivision.user.manage.util.UrlUtil;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;

/**
 * websocket 用户登陆验证码处理器 <br>
 * 
 * @author yangxiaodong<br>
 * @version 1.0<br>
 * @taskId <br>
 * @CreateDate 2019年8月1日 <br>
 */
@Component
@Sharable
public class PermissionWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    /**
     * 调测日志
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(PermissionWebSocketHandler.class);

    /**
     * wesocket路径 <br>
     */
    @Value("${websocket.netty.path:/websocket}")
    private String websocketPath;

    /**
     * 重新方法,获取url中参数,进行权限验证: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param ctx 上下文
     * @param msg 参数
     * @throws Exception 异常<br>
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            FullHttpRequest request = (FullHttpRequest) msg;
            String uri = request.uri();
            if (StringUtils.isNotEmpty(uri)) {
                Map<String, String> map = UrlUtil.parseUrl(uri);
                // 获取用户token
                String token = map.get("token");
                AttributeKey<WebSocketPerssionVerify> key = AttributeKey.valueOf("perssion");
                WebSocketPerssionVerify webSocketPerssionVerify = ctx.channel().attr(key).get();
                if (null == webSocketPerssionVerify) {
                    webSocketPerssionVerify = new WebSocketPerssionVerify();
                    webSocketPerssionVerify.setToken(token);
                }
                ctx.channel().attr(key).setIfAbsent(webSocketPerssionVerify);
                request.setUri(websocketPath);
                ctx.fireChannelRead(request.retain());
            }
        }
        ctx.fireChannelRead(msg);
    }

    /**
     * 工程出现异常的时候调用: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param ctx 处理上下文
     * @param cause 异常
     * @throws Exception <br>
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.error("websocket exception", cause);
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

}

 websocket处理器 :

处理发送消息,ws握手成功后,进行用户是否登陆判断,没有登陆,断开channel

package com.minivision.user.manage.websocket;

import java.util.concurrent.TimeUnit;

import javax.annotation.Resource;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.AttributeKey;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.minivision.user.client.TokenVerifyReq;
import com.minivision.user.client.TokenVerifyResp;
import com.minivision.user.client.TokenVerifyResp.ReplyStatus;
import com.minivision.user.client.api.TokenVerifyServiceFacade;
import com.minivision.user.manage.api.dto.MarkInMailMsgReadReqDTO;
import com.minivision.user.manage.inmail.model.InMailMsgResp;
import com.minivision.user.manage.inmail.model.MarkInMailMsgRead;
import com.minivision.user.manage.inmail.model.QueryUnreadInMailMsg;
import com.minivision.user.manage.inmail.model.SendInMailMsg;

/**
 * websocket处理器 <br>
 * 
 * @author yangxiaodong<br>
 * @version 1.0<br>
 * @taskId <br>
 * @CreateDate 2019年7月22日 <br>
 */

@Component
@Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    /**
     * 调测日志
     */
    private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketHandler.class);

    /**
     * redis工具类模板 <br>
     */
    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    /**
     * 发送站内信service <br>
     */
    @Autowired
    private SendInMailMsgService sendInMailMsgService;

    /**
     * token校验facade <br>
     */
    @Reference
    private TokenVerifyServiceFacade tokenVerifyServiceFacade;

    /**
     * channel被启用的时候触发(在建立连接的时候),服务端监听到客户端活动
     * 
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.info("connect to client");
        // 添加到channelGroup 通道组
        NettyConfig.getChannelGroup().add(ctx.channel());
    }

    /**
     * channel断开时候触发
     * 
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LOGGER.info("disconnect to client");
        // 删除渠道
        NettyConfig.getChannelGroup().remove(ctx.channel());
        removeChannelId(ctx);
    }

    /**
     * 删除channelId: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param ctx 上下文<br>
     */
    private void removeChannelId(ChannelHandlerContext ctx) {
        String userId = getUserId(ctx);
        if (StringUtils.isNotEmpty(userId)) {
            redisTemplate.opsForSet().remove(WebSocketConstant.BOSS_MSG_CHANNELID + userId, ctx.channel().id());
        }
    }

    /**
     * 工程出现异常的时候调用: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param ctx 处理上下文
     * @param cause 异常
     * @throws Exception <br>
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.error("websocket exception", cause);
        removeChannelId(ctx);
        ctx.close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        // 握手成功以后,查询用户未读消息,发送未读消息
        if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
            AttributeKey<WebSocketPerssionVerify> key = AttributeKey.valueOf("perssion");
            WebSocketPerssionVerify webSocketPerssionVerify = ctx.channel().attr(key).get();
            if (null != webSocketPerssionVerify) {
                String token = webSocketPerssionVerify.getToken();
                if (StringUtils.isEmpty(token)) {
                    sendUserNotLoginMsg(ctx);
                    return;
                }
                // 校验token是否登陆
                TokenVerifyReq tokenVerifyReq = new TokenVerifyReq();
                tokenVerifyReq.setToken(token);
                TokenVerifyResp resp = tokenVerifyServiceFacade.verifyToken(tokenVerifyReq);
                ReplyStatus status = resp.getStatus();
                if (!status.equals(ReplyStatus.OK)) {
                    sendUserNotLoginMsg(ctx);
                    return;
                }
                String userId = resp.getUserId();
                redisTemplate.opsForSet().add(WebSocketConstant.BOSS_MSG_CHANNELID + userId, ctx.channel().id());
                redisTemplate.expire(WebSocketConstant.BOSS_MSG_CHANNELID + userId, 1, TimeUnit.DAYS);
                webSocketPerssionVerify.setUserId(userId);
                ctx.channel().attr(key).setIfAbsent(webSocketPerssionVerify);
                // 发送未读消息
                QueryUnreadInMailMsg queryUnreadInMailMsg = new QueryUnreadInMailMsg();
                queryUnreadInMailMsg.setUserId(userId);
                queryUnreadInMailMsg(queryUnreadInMailMsg);
            } else {
                sendUserNotLoginMsg(ctx);
                return;
            }
        }
        // 用于触发用户事件,包含触发读空闲、写空闲、读写空闲
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.ALL_IDLE) {
                Channel channel = ctx.channel();
                removeChannelId(ctx);
                // 关闭无用channel,以防资源浪费
                channel.close();
            }
        }
    }

    /**
     * 发送用户为登陆消息: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param ctx <br>
     */
    public void sendUserNotLoginMsg(ChannelHandlerContext ctx) {
        removeChannelId(ctx);
        ChannelFuture future = ctx.writeAndFlush(new CloseWebSocketFrame(InMailMsgResp.USER_NOT_LOGIN, InMailMsgResp.USER_NOT_LOGIN_MSG));
        future.addListener(ChannelFutureListener.CLOSE);
    }

    /**
     * 获取用户id: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param ctx 上下文
     * @return <br>
     */
    private String getUserId(ChannelHandlerContext ctx) {
        AttributeKey<WebSocketPerssionVerify> key = AttributeKey.valueOf("perssion");
        WebSocketPerssionVerify webSocketPerssionVerify = ctx.channel().attr(key).get();
        if (null != webSocketPerssionVerify) {
            return webSocketPerssionVerify.getUserId();
        }
        return null;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("server receiver client msg:{}", msg.text());
        }
        String text = msg.text();
        if (StringUtils.isNotEmpty(text)) {
            JSONObject parseObject = JSONObject.parseObject(text);
            String actionType = parseObject.getString("actionType");
            if (WebSocketConstant.INMAIL_ACTION_TYPE_SEND.equals(actionType)) {
                // 发送消息
                SendInMailMsg sendInMailMsg = JSONObject.parseObject(text, SendInMailMsg.class);
                sendMsg(sendInMailMsg);
            } else if (WebSocketConstant.INMAIL_ACTION_TYPE_MARK.equals(actionType)) {
                // 标记已读
                MarkInMailMsgRead markInMailMsgRead = JSONObject.parseObject(text, MarkInMailMsgRead.class);
                markMsg(markInMailMsgRead);
            }
        }
    }

    /**
     * 发送消息: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param sendInMailMsg 消息
     */
    private void sendMsg(SendInMailMsg sendInMailMsg) {
        sendInMailMsgService.sendMsg(sendInMailMsg);
    }

    /**
     * 标记消息已读: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param markInMailMsgRead 消息主体
     */
    private void markMsg(MarkInMailMsgRead markInMailMsgRead) {
        MarkInMailMsgReadReqDTO req = new MarkInMailMsgReadReqDTO();
        req.setMsgIds(markInMailMsgRead.getMsgIds());
        req.setUserId(markInMailMsgRead.getUserId());
        sendInMailMsgService.markMsg(markInMailMsgRead);
    }

    /**
     * 查询未读站内信信息: <br>
     * yangxiaodong<br>
     * 
     * @taskId <br>
     * @param queryUnreadInMailMsg 参数<br>
     */
    private void queryUnreadInMailMsg(QueryUnreadInMailMsg queryUnreadInMailMsg) {
        sendInMailMsgService.queryUnreadInMailMsg(queryUnreadInMailMsg);
    }
}

 websocket权限校验实体类:

package com.minivision.user.manage.websocket;

/**
 * websocket权限校验 <br>
 * 
 * @author yangxiaodong<br>
 * @version 1.0<br>
 * @taskId <br>
 * @CreateDate 2019年8月1日 <br>
 */

public class WebSocketPerssionVerify {

    /**
     * 用户登陆token token <br>
     */
    private String token;

    /**
     * 用户id <br>
     */
    private String userId;

    /**
     * get token
     * 
     * @return Returns the token.<br>
     */
    public String getToken() {
        return token;
    }

    /**
     * set token
     * 
     * @param token The token to set. <br>
     */
    public void setToken(String token) {
        this.token = token;
    }

    /**
     * get userId
     * 
     * @return Returns the userId.<br>
     */
    public String getUserId() {
        return userId;
    }

    /**
     * set userId
     * 
     * @param userId The userId to set. <br>
     */
    public void setUserId(String userId) {
        this.userId = userId;
    }

}

 发送消息逻辑代码:

package com.minivision.user.manage.websocket;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;

import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import com.alibaba.dubbo.config.annotation.Reference;
import com.alibaba.fastjson.JSONObject;
import com.minivision.boss.common.util.ModelMapperUtil;
import com.minivision.constants.DigitConst;
import com.minivision.user.manage.api.InMailMessageServiceFacade;
import com.minivision.user.manage.api.dto.AddInMailMsgReqDTO;
import com.minivision.user.manage.api.dto.AddInMailMsgReqDTO.AddInMailMsg;
import com.minivision.user.manage.api.dto.MarkInMailMsgReadReqDTO;
import com.minivision.user.manage.api.dto.QueryUnReadInMailMsgReqDTO;
import com.minivision.user.manage.api.dto.QueryUnReadInMailMsgRespDTO;
import com.minivision.user.manage.api.dto.QueryUnReadInMailMsgRespDTO.InMailMsgInfo;
import com.minivision.user.manage.inmail.model.InMailMsgBody;
import com.minivision.user.manage.inmail.model.MarkInMailMsgRead;
import com.minivision.user.manage.inmail.model.QueryUnreadInMailMsg;
import com.minivision.user.manage.inmail.model.SendInMailMsg;
import com.minivision.user.manage.inmail.model.InMailMsgData;

/**
 * 发送消息service <br>
 * 
 * @author yangxiaodong<br>
 * @version 1.0<br>
 * @taskId <br>
 * @CreateDate 2019年7月27日 <br>
 */
@Service
public class SendInMailMsgService {

    /**
     * redis模板工具类 <br>
     */
    @SuppressWarnings("rawtypes")
    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * inMailMessageServiceFacade <br>
     */
    @Reference
    private InMailMessageServiceFacade inMailMessageServiceFacade;

    /**
     * 发送前方法: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param sendMessage 消息<br>
     */
    private void beforeSend(SendInMailMsg sendMessage) {
        List<InMailMsgBody> contentList = sendMessage.getContentList();
        if (CollectionUtils.isNotEmpty(contentList)) {
            // 添加消息前缀
            contentList.forEach(item -> item.setMsgId(WebSocketConstant.INMAIL_MSG_PREFIX + item.getMsgId()));
        }
    }

    /**
     * 保存消息记录: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param sendMessage 消息<br>
     */
    private void saveMsgRecord(SendInMailMsg sendMessage) {
        AddInMailMsgReqDTO req = new AddInMailMsgReqDTO();
        List<InMailMsgBody> contentList = sendMessage.getContentList();
        if (CollectionUtils.isNotEmpty(contentList)) {
            List<AddInMailMsg> msgList = new ArrayList<>();
            contentList.forEach(item -> {
                AddInMailMsg addInMailMsg = ModelMapperUtil.strictMap(item, AddInMailMsg.class);
                addInMailMsg.setMsgType(sendMessage.getSendType());
                addInMailMsg.setSenderId(sendMessage.getSenderId());
                addInMailMsg.setSendTime(sendMessage.getSendTime());
                addInMailMsg.setReceivers(sendMessage.getReceivers());
                msgList.add(addInMailMsg);
            });
            req.setMsgList(msgList);
            inMailMessageServiceFacade.addInMailMsg(req);
        }
    }

    /**
     * 发送消息
     * 
     * @param sendInMailMsg 发送消息体
     */
    public void sendMsg(SendInMailMsg sendInMailMsg) {
        beforeSend(sendInMailMsg);
        if (WebSocketConstant.SEND_TYPE_ONE_TO_ONE.equals(sendInMailMsg.getSendType())
                || WebSocketConstant.SEND_TYPE_ONE_TO_MANY.equals(sendInMailMsg.getSendType())) {
            send(sendInMailMsg);
        } else if (WebSocketConstant.SEND_TYPE_GROUP.equals(sendInMailMsg.getSendType())) {
            sendToAll(sendInMailMsg);
        }
    }

    /**
     * 查询未: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param queryUnreadInMailMsg <br>
     */
    public void queryUnreadInMailMsg(QueryUnreadInMailMsg queryUnreadInMailMsg) {
        ChannelGroup channelGroup = NettyConfig.getChannelGroup();
        if (null != channelGroup) {
            QueryUnReadInMailMsgReqDTO req = new QueryUnReadInMailMsgReqDTO();
            req.setUserId(queryUnreadInMailMsg.getUserId());
            QueryUnReadInMailMsgRespDTO resp = inMailMessageServiceFacade.queryUnReadInMailMsg(req);
            if (resp.getSuccess()) {
                sendUnreadInMailMsg(channelGroup, resp, queryUnreadInMailMsg);
            }
        }
    }

    /**
     * 发送未读站内信: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param channelGroup channelGroup
     * @param resp 消息resp
     * @param queryUnreadInMailMsg 查询参数 <br>
     */
    @SuppressWarnings("unchecked")
    private void sendUnreadInMailMsg(ChannelGroup channelGroup, QueryUnReadInMailMsgRespDTO resp, QueryUnreadInMailMsg queryUnreadInMailMsg) {
        List<InMailMsgInfo> msgList = resp.getMsgList();
        Set<ChannelId> members = redisTemplate.opsForSet().members(getChannelId(queryUnreadInMailMsg.getUserId()));
        if (CollectionUtils.isNotEmpty(members)) {
            InMailMsgData unReadInMailMsg = new InMailMsgData();
            unReadInMailMsg.setCount(resp.getCount());
            unReadInMailMsg.setMsgList(msgList);
            String jsonString = JSONObject.toJSONString(unReadInMailMsg);
            for (ChannelId channelId : members) {
                Channel channel = channelGroup.find(channelId);
                if (null != channel) {
                    channelGroup.find(channelId).writeAndFlush(new TextWebSocketFrame(jsonString));
                }
            }
        }
    }

    /**
     * 标记消息已读: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param markInMailMsgRead 消息主体
     */
    public void markMsg(MarkInMailMsgRead markInMailMsgRead) {
        MarkInMailMsgReadReqDTO req = new MarkInMailMsgReadReqDTO();
        req.setMsgIds(markInMailMsgRead.getMsgIds());
        req.setUserId(markInMailMsgRead.getUserId());
        inMailMessageServiceFacade.markInMailMsgRead(req);
    }

    /**
     * 发送一对多消息: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param sendMessage 参数<br>
     */
    @SuppressWarnings("unchecked")
    private void send(SendInMailMsg sendMessage) {
        ChannelGroup channelGroup = NettyConfig.getChannelGroup();
        if (null != channelGroup) {
            List<String> receivers = sendMessage.getReceivers();
            if (CollectionUtils.isNotEmpty(receivers)) {
                for (String receive : receivers) {
                    Set<ChannelId> members = redisTemplate.opsForSet().members(getChannelId(receive));
                    if (CollectionUtils.isNotEmpty(members)) {
                        for (ChannelId channelId : members) {
                            if (null != channelId) {
                                sendToUser(channelId, channelGroup, sendMessage);
                            }
                        }
                    }
                }
            }
        }
    }

    /**
     * 给用户发送消息: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param channelId 渠道id
     * @param channelGroup channel组
     * @param sendMessage 消息<br>
     */
    private void sendToUser(ChannelId channelId, ChannelGroup channelGroup, SendInMailMsg sendMessage) {
        if (null != channelId) {
            Channel channel = channelGroup.find(channelId);
            if (null != channel) {
                InMailMsgData inMailMsgData = new InMailMsgData();
                inMailMsgData.setCount(DigitConst.ONE);
                List<InMailMsgInfo> inMailMsgInfoList = ModelMapperUtil.strictMapList(sendMessage.getContentList(), InMailMsgInfo.class);
                inMailMsgData.setMsgList(inMailMsgInfoList);
                ChannelFuture channelFuture = channel
                        .writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(inMailMsgData)));
                // 获取用户信息
                AttributeKey<WebSocketPerssionVerify> key = AttributeKey.valueOf("perssion");
                WebSocketPerssionVerify webSocketPerssionVerify = channel.attr(key).get();
                String userId = webSocketPerssionVerify.getUserId();
                sendMessage.setReceivers(Arrays.asList(userId));
                saveMsgRecord(sendMessage);
                // 消息发送成功后,保存发送记录
                channelFuture.addListener(future -> {
                    if (future.isSuccess()) {
                        saveMsgRecord(sendMessage);
                    }
                });
            }
        }
    }

    /**
     * 群发消息: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param sendMessage 参数<br>
     */
    private void sendToAll(SendInMailMsg sendMessage) {
        ChannelGroup channelGroup = NettyConfig.getChannelGroup();
        if (null != channelGroup) {
            channelGroup.writeAndFlush(new TextWebSocketFrame(sendMessage.getContentString()));
            saveMsgRecord(sendMessage);
        }
    }

    /**
     * 获取channelId: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @param userId 用户id
     * @return <br>
     */
    private String getChannelId(String userId) {
        StringBuilder sb = new StringBuilder();
        sb.append(WebSocketConstant.BOSS_MSG_CHANNELID).append(userId);
        return sb.toString();
    }

}

 全局的channelgroup,用来存在channel,发送消息

package com.minivision.user.manage.websocket;

import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * netty配置 <br>
 * 
 * @author yangxiaodong<br>
 * @version 1.0<br>
 * @taskId <br>
 * @CreateDate 2019年7月22日 <br>
 */

public class NettyConfig {
    /**
     * 渠道组
     */
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 私有构造方法
     */
    private NettyConfig() {

    }

    /**
     * 获取channel组: <br>
     * 
     * @author yangxiaodong<br>
     * @taskId <br>
     * @return <br>
     */
    public static ChannelGroup getChannelGroup() {
        return channelGroup;
    }
}

 常量类:

package com.minivision.user.manage.websocket;

/**
 * websocket常量类 <br>
 * 
 * @author yangxiaodong<br>
 * @version 1.0<br>
 * @taskId <br>
 * @CreateDate 2019年7月27日 <br>
 */
public class WebSocketConstant {

    /**
     * 发送的方式:1-一对一
     */
    public static final String SEND_TYPE_ONE_TO_ONE = "1";

    /**
     * 发送的方式:2-一对多
     */
    public static final String SEND_TYPE_ONE_TO_MANY = "2";

    /**
     * 发送的方式:3-群发
     */
    public static final String SEND_TYPE_GROUP = "3";

    /**
     * redis websocket 用户id前缀
     */
    public static final String BOSS_SOCKET_USER_ID_PREFIX = "websocket:userid:";

    /**
     * 执行方法:1-注册
     */
    public static final String EXEC_METHOD_LOGIN = "1";

    /**
     * 执行方法:2-发送消息
     */
    public static final String EXEC_METHOD_SEND_MSG = "2";

    /**
     * 执行方法:3-注销
     */
    public static final String EXEC_METHOD_LOGOUT = "3";

    /**
     * 消息id <br>
     */
    public static final String BOSS_MSG_CHANNELID = "msg:channel:";

    /**
     * 站内信前缀
     */
    public static final String INMAIL_MSG_PREFIX = "msg_";

    /**
     * 动作类型:1-登陆 <br>
     */
    public static final String INMAIL_ACTION_TYPE_LOGIN = "1";

    /**
     * 动作类型:2-发送消息<br>
     */
    public static final String INMAIL_ACTION_TYPE_SEND = "2";

    /**
     * 动作类型:3-标记消息已读 <br>
     */
    public static final String INMAIL_ACTION_TYPE_MARK = "3";

    /**
     * 动作类型:4-查询未读站内信 <br>
     */
    public static final String INMAIL_ACTION_TYPE_QUERY_UNREAD_MSG = "4";

    /**
     * 私有构造方法
     */
    private WebSocketConstant() {

    }
}