欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Dubbo性能调优参数及原理

程序员文章站 2022-07-14 15:14:01
...

Dubbo调用模型
Dubbo性能调优参数及原理
常用性能调优参数
Dubbo性能调优参数及原理
源码及原理分析

threads

FixedThreadPool.java

public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue() :
(queues < 0 ? new LinkedBlockingQueue() :
new LinkedBlockingQueue(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}

LimitedThreadPool.java

public Executor getExecutor(URL url) {
String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue() :
(queues < 0 ? new LinkedBlockingQueue() :
new LinkedBlockingQueue(queues)),
new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
其中,Constants.DEFAULT_QUEUES = 200。threads 参数配置的是业务处理线程池的最大(或核心)线程数。

iothreads

NettyServer.java

@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory(“NettyServerBoss”, true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory(“NettyServerWorker”, true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);

final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    public ChannelPipeline getPipeline() {
        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addLast("decoder", adapter.getDecoder());
        pipeline.addLast("encoder", adapter.getEncoder());
        pipeline.addLast("handler", nettyHandler);
        return pipeline;
    }
});
// bind
channel = bootstrap.bind(getBindAddress());

}

queues

分别在 FixedThreadPool.java、LimitedThreadPool.java 和 CachedThreadPool.java 中使用,代码详情见 3.2章节。 由代码可见,默认值为 0,表示使用同步阻塞队列;如果 queues 设置为小于 0 的值,则使用容量为 Integer.MAX_VALUE 的阻塞链表队列;如果为其他值,则使用指定大小的阻塞链表队列。

connections

DubboProtocol.java

private ExchangeClient[] getClients(URL url){
//是否共享连接
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
//如果connections不配置,则共享连接,否则每服务每连接
if (connections == 0){
service_share_connect = true;
connections = 1;
}

ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
    if (service_share_connect){
        clients[i] = getSharedClient(url);
    } else {
        clients[i] = initClient(url);
    }
}
return clients;

}
DubboInvoker.java

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);

ExchangeClient currentClient;
if (clients.length == 1) {
    currentClient = clients[0];
} else {
    currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
    boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
    boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
    int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
    if (isOneway) {
        boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
        currentClient.send(inv, isSent);
        RpcContext.getContext().setFuture(null);
        return new RpcResult();
    } else if (isAsync) {
        ResponseFuture future = currentClient.request(inv, timeout) ;
        RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
        return new RpcResult();
    } else {
        RpcContext.getContext().setFuture(null);
        return (Result) currentClient.request(inv, timeout).get();
    }
} catch (TimeoutException e) {
    throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
    throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}

}
以上可见,默认值为0,表示针对每个 Provider,所有客户端共享一个长连接;否则,建立指定数量的长连接。在调用时,如果有多个长连接,则使用轮询方式获得一个长连接。

actives

ActiveLimitFilter.java

public Result invoke(Invoker