欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

alipay-bolt 1.5.3 - NettyServer

程序员文章站 2022-07-05 09:30:58
1、优先使用netty的 EpollServercom.alipay.remoting.rpc.RpcServer#doInit@Overrideprotected void doInit() { if (this.addressParser == null) { this.addressParser = new RpcAddressParser(); } if (this.switches().isOn(GlobalSwitch.SERVE.....

1、优先使用netty的 EpollServer

 

com.alipay.remoting.rpc.RpcServer#doInit


@Override
protected void doInit() {
    if (this.addressParser == null) {
        this.addressParser = new RpcAddressParser();
    }
    if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
        this.connectionEventHandler = new RpcConnectionEventHandler(switches());
        this.connectionManager = new DefaultConnectionManager(new RandomSelectStrategy());
        this.connectionEventHandler.setConnectionManager(this.connectionManager);
        this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
    } else {
        this.connectionEventHandler = new ConnectionEventHandler(switches());
        this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
    }
    initRpcRemoting(); // 初始化协议管理器等。。
    this.bootstrap = new ServerBootstrap();
    this.bootstrap.group(bossGroup, workerGroup)
        .channel(NettyEventLoopUtil.getServerSocketChannelClass()) // 选择server
        .option(ChannelOption.SO_BACKLOG, ConfigManager.tcp_so_backlog())
        .option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr())
        .childOption(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay())
        .childOption(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive());

    // set write buffer water mark
    initWriteBufferWaterMark();

    // init byte buf allocator
    if (ConfigManager.netty_buffer_pooled()) {
        this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    } else {
        this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT)
            .childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
    }

    // enable trigger mode for epoll if need
    NettyEventLoopUtil.enableTriggeredMode(bootstrap); // 设置epoll触发方式

    final boolean idleSwitch = ConfigManager.tcp_idle_switch();
    final int idleTime = ConfigManager.tcp_server_idle();
    final ChannelHandler serverIdleHandler = new ServerIdleHandler();
    final RpcHandler rpcHandler = new RpcHandler(true, this.userProcessors);  // rpc处理器
    this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

        @Override
        protected void initChannel(SocketChannel channel) {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast("decoder", codec.newDecoder()); // 协议解析器
            pipeline.addLast("encoder", codec.newEncoder());
            if (idleSwitch) {
                pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime,
                    TimeUnit.MILLISECONDS));
                pipeline.addLast("serverIdleHandler", serverIdleHandler);
            }
            pipeline.addLast("connectionEventHandler", connectionEventHandler);
            pipeline.addLast("handler", rpcHandler); // rpc处理器
            createConnection(channel);
        }

        /**
         * create connection operation<br>
         * <ul>
         * <li>If flag manageConnection be true, use {@link DefaultConnectionManager} to add a new connection, meanwhile bind it with the channel.</li>
         * <li>If flag manageConnection be false, just create a new connection and bind it with the channel.</li>
         * </ul>
         */
        private void createConnection(SocketChannel channel) {
            Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel));
            if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
                connectionManager.add(new Connection(channel, url), url.getUniqueKey());
            } else {
                new Connection(channel, url);
            }
            channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
        }
    });
}

com.alipay.remoting.util.NettyEventLoopUtil

public class NettyEventLoopUtil {

    /** check whether epoll enabled, and it would not be changed during runtime. */
    private static boolean epollEnabled = ConfigManager.netty_epoll() && Epoll.isAvailable();

    /**
     * Create the right event loop according to current platform and system property, fallback to NIO when epoll not enabled.
     *
     * @param nThreads
     * @param threadFactory
     * @return an EventLoopGroup suitable for the current platform
     */
    public static EventLoopGroup newEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        return epollEnabled ? new EpollEventLoopGroup(nThreads, threadFactory)
            : new NioEventLoopGroup(nThreads, threadFactory);
    }

    /**
     * @return a SocketChannel class suitable for the given EventLoopGroup implementation
     */
    public static Class<? extends SocketChannel> getClientSocketChannelClass() {
        return epollEnabled ? EpollSocketChannel.class : NioSocketChannel.class;
    }

    /**
     * @return a ServerSocketChannel class suitable for the given EventLoopGroup implementation
     */
    public static Class<? extends ServerSocketChannel> getServerSocketChannelClass() {
        return epollEnabled ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
    }

    /**
     * Use {@link EpollMode#LEVEL_TRIGGERED} for server bootstrap if level trigger enabled by system properties,
     *   otherwise use {@link EpollMode#EDGE_TRIGGERED}.
     * @param serverBootstrap server bootstrap
     */
    public static void enableTriggeredMode(ServerBootstrap serverBootstrap) {
        if (epollEnabled) {
            if (ConfigManager.netty_epoll_lt_enabled()) {
                serverBootstrap.childOption(EpollChannelOption.EPOLL_MODE,
                    EpollMode.LEVEL_TRIGGERED);
            } else {
                serverBootstrap
                    .childOption(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
            }
        }
    }
}

 

本文地址:https://blog.csdn.net/zhouzhiande/article/details/111973591

相关标签: rpc Java netty