欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Dubbo超时

程序员文章站 2024-01-13 19:35:10
...
1、概念

 1)服务提供者超时是指远程调用服务的方法执行的超时时间.
 2)服务调用者超时是指服务调用者调用远程方法的执行超时时间.

2、超时设置

  使用dubbo进行远程调用的过程中,需要设置远程调用的超时间.超时时间分别可以在服务的提供者配置中设置,也可以在服务调用者配置中设置,超时时间的单位是毫秒.
  1)全局超时配置

<dubbo:consumer timeout="5000" /><dubbo:provider timeout="5000" />

  2)接口及指定方法超时配置

<dubbo:reference interface="com.foo.BarService" timeout="2000">
    <dubbo:method name="sayHello" timeout="3000" />
</dubbo:reference>
或
<dubbo:provider interface="com.foo.BarService" timeout="2000">
    <dubbo:method name="sayHello" timeout="3000" />
</dubbo:provider>
3、超时覆盖机制

  1、方法级配置别优于接口级别,即小Scope优先
  2、Consumer端配置 优于 Provider配置 优于 全局配置,最后是Dubbo Hard Code的配置值
  dubbo的机制是如果服务的调用者配置了超时时间,会覆盖服务的提供者设置的超时时间.请注意,如果服务的调用者覆盖了服务提供者的远程方法调用超时时间,那么对于服务的提供者就会变得不可控,即服务的调用者控制了服务提供者方法执行的超时时间,这对于一次远程调用是非常不合理的,所以dubbo非常不建议在服务的调用者配置中配置服务的超时时间.

4、超时重试

  dubbo在调用服务不成功时,默认是会重试两次的。这样在服务端的处理时间超过了设定的超时时间时,就会有重复请求,比如在发邮件时,可能就会发出多份重复邮件,执行注册请求时,就会插入多条重复的注册数据,那么怎么解决超时问题呢?如下
  1)对于核心的服务中心,去除dubbo超时重试机制,并重新评估设置超时时间。
  2)业务处理代码必须放在服务端,客户端只做参数验证和服务调用,不涉及业务流程处理
  全局配置实例

<!-- 延迟到Spring初始化完成后,再暴露服务,服务调用超时设置为6秒,超时不重试-->    
<dubbo:provider delay="-1" timeout="6000" retries="0"/>
5、Dubbo调用源码分析

Dubbo协议超时实现使用了Future模式,主要涉及类DubboInvoker,ResponseFuture, DefaultFuture。
在DubboInvoker中会判断是同步异步还是不需要返回值,doInvoke()方法如下

@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;
    final String methodName = RpcUtils.getMethodName(invocation);
    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
    inv.setAttachment(Constants.VERSION_KEY, version);

    ExchangeClient currentClient;
    if (clients.length == 1) {
        currentClient = clients[0];
    } else {
        currentClient = clients[index.getAndIncrement() % clients.length];
    }
    try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
            ResponseFuture future = currentClient.request(inv, timeout) ;
            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
            return new RpcResult();
        } else {
            RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

1.如果不需要返回值,直接使用send方法,发送出去,设置当期和线程绑定RpcContext的future为null
2.如果需要异步通信,使用request方法构建一个ResponseFuture,然后设置到和线程绑定的RpcContext中
3.如果需要同步通信,使用request方法构建一个ResponseFuture,阻塞等待请求完成
ResponseFuture.get()在请求还未处理完或未到超时前一直是wait状态;响应达到后,设置请求状态,并进行notify唤醒。get()方法如下

public Object get(int timeout) throws RemotingException {
  if (timeout <= 0) {
      timeout = Constants.DEFAULT_TIMEOUT;
  }
  if (! isDone()) {
      long start = System.currentTimeMillis();
      lock.lock();//加锁
      try {
          while (! isDone()) {
              done.await(timeout, TimeUnit.MILLISECONDS); //等待timeout
              if (isDone() || System.currentTimeMillis() - start > timeout) {
                  break;
              }
          }
      } catch (InterruptedException e) {
          throw new RuntimeException(e);
      } finally {
          lock.unlock();
      }
      if (! isDone()) {// 客户端超时仍然没有得到服务端返回,抛出异常
          throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
      }
  }
  return returnFromResponse();
}

Client端的处理最终转化成ChannelHandler接口实现上,HeaderExchangeHandler的received()接口如下

public void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
    try {
        if (message instanceof Request) {
            // handle request.
            Request request = (Request) message;
            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {
                if (request.isTwoWay()) {
                //服务端处理请求
                    Response response = handleRequest(exchangeChannel, request);
                    channel.send(response);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {
        //这里就是作为消费者的dubbo客户端在接收到响应后,触发通知对应等待线程的起点
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {
            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);
                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

public static void handleResponse(Channel channel, Response response) throws RemotingException {
    if (response != null && !response.isHeartbeat()) {
        DefaultFuture.received(channel, response);
    }
}

received()方法又会调用DefaultFuture的received()方法,如下

public static void received(Channel channel, Response response) {
    try {
        DefaultFuture future = FUTURES.remove(response.getId());
        if (future != null) {
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at " 
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                        + ", response " + response 
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
                            + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}

  客户端调用远程服务时,本地会生成一个DefaultFuture,调用DefaultFuture.get()获取远程服务返回的结构,此方法获取锁,调用await方法,此时当前线程进入等待队列,此线程会有两种结果过:要么超时,抛出TimeOutException;如果被唤醒,则返回rpc的结果,如果超时,服务端TimeoutFilter会根据服务端timeout检测到操作超时,打出warn日志。
  而Dubbo生成对象ResponseFuture时,在一个全局map里通过put(ID,Future)将该次调用的唯一ID存放起来,然后传递给服务端,再服务端又回传回来,通过该ID,DefaultFuture.FUTURES可以拿到具体的那个DefaultFuture对象,即阻塞请求线程的那个对象。调用它的doReceived方法,就可唤醒阻塞的线程,拿到返回结果

private void doReceived(Response res) {
   lock.lock();
   try {
       response = res;
       if (done != null) {
           done.signal();
       }
   } finally {
       lock.unlock();
   }
   if (callback != null) {
       invokeCallback(callback);
   }
}
7、案例1:客户端超时时间>服务端超时时间

服务端配置:

<dubbo:service ref="xxxService" interface="com.xxx.XxxService"  timeout="5000"  />

客户端配置:

<dubbo:reference id="xxxService" interface="com.xxx.XxxService" timeout="10000"  />

客户端调用远程服务时,本地会生成一个DefaultFuture,调用DefaultFuture.get()获取远程服务返回的结构,此方法获取锁,调用await方法,此时当前线程进入等待队列,此线程会有两种结果过:要么超时,抛出TimeOutException;如果被唤醒,则返回rpc的结果在服务端timeout时间为5s,如果实际的数据操作耗时7s,服务端TimeoutFilter会根据服务端timeout检测到操作超时,打出warn日志。在第7s,客户端接收到数据包,客户端timeout设置为10s>7s,DefaultFuture被唤醒,仍然可以接收到Rpc返回值。
  如果超出了10s还没有返回值,抛出TimeoutException。此时这个DefaultFuture超时了,有一个线程RemotingInvocationTimeoutScan,清理所有超时的DefaultFuture,创建一个timeoutResponse,DefaultFuture.received这样的response就会抛出TimeoutException。
  综上所诉:当客户端timeout值>服务端timeout值,会出现超时日志,但是仍然可以获取到结果。客户端timeout超时抛出异常时,对应超时的Future会自动清理。

引用:https://blog.csdn.net/peerless_hero/article/details/68922880
   https://blog.csdn.net/qq418517226/article/details/51906357