Dubbo3源码篇6-集群容错策略

当我们进行系统设计时 , 不仅要考虑正常情况下代码逻辑应该如何走 , 还要考虑异常
情况下代码逻辑应该怎么走。当服务消费方调用服务提供方的服务出现错误时 , Dubbo 提
供了多种容错方案 , 默认模式为 Failover Cluster,也就是失败重试。下面让我们看看 Dubbo 提供的集群容错模式。

1. 容错实例的创建和调用图

image.png
image.png

2. 容错策略的解析

2.1 failover

故障转移策略。当消费者调用提供者集群中的某个服务器失败时,其会自动尝试着调用
其它服务器。而重试的次数是通过 retries 属性指定的。

org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker#doInvoke:

  1. public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
  2. List<Invoker<T>> copyInvokers = invokers;
  3. // 检测invokers列表是否为空
  4. checkInvokers(copyInvokers, invocation);
  5. // 获取RPC调用的方法名
  6. String methodName = RpcUtils.getMethodName(invocation);
  7. // 获取retries属性值
  8. int len = calculateInvokeTimes(methodName);
  9. // retry loop.
  10. RpcException le = null; // last exception.
  11. // 存放所有已经尝试调用过的invoker,这些invoker中,除了最后一个外,其它的都是不可用的
  12. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
  13. Set<String> providers = new HashSet<String>(len);
  14. for (int i = 0; i < len; i++) {
  15. //Reselect before retry to avoid a change of candidate `invokers`.
  16. //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
  17. if (i > 0) {
  18. // 检测委托对象invoker是否被销毁
  19. checkWhetherDestroyed();
  20. // 更新本地invoker列表
  21. copyInvokers = list(invocation);
  22. // check again 重新检测invokers列表是否为空
  23. checkInvokers(copyInvokers, invocation);
  24. }
  25. // 负载均衡
  26. Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
  27. // 将选择出的invoker写入到invoked集合
  28. invoked.add(invoker);
  29. RpcContext.getServiceContext().setInvokers((List) invoked);
  30. try {
  31. // 远程调用
  32. Result result = invokeWithContext(invoker, invocation);
  33. //重试过程中,将最后一次调用的异常信息以 warn 级别日志输出
  34. if (le != null && logger.isWarnEnabled()) {
  35. logger.warn("Although retry the method " + methodName
  36. + " in the service " + getInterface().getName()
  37. + " was successful by the provider " + invoker.getUrl().getAddress()
  38. + ", but there have been failed providers " + providers
  39. + " (" + providers.size() + "/" + copyInvokers.size()
  40. + ") from the registry " + directory.getUrl().getAddress()
  41. + " on the consumer " + NetUtils.getLocalHost()
  42. + " using the dubbo version " + Version.getVersion() + ". Last error is: "
  43. + le.getMessage(), le);
  44. }
  45. return result;
  46. } catch (RpcException e) {
  47. // 如果是业务性质的异常,不再重试,直接抛出
  48. if (e.isBiz()) { // biz exception.
  49. throw e;
  50. }
  51. // 其他性质的异常统一封装成RpcException
  52. le = e;
  53. } catch (Throwable e) {
  54. le = new RpcException(e.getMessage(), e);
  55. } finally {
  56. // 将提供者的地址添加到providers
  57. providers.add(invoker.getUrl().getAddress());
  58. }
  59. } // end-for
  60. // 最后抛出异常
  61. throw new RpcException(le.getCode(), "Failed to invoke the method "
  62. + methodName + " in the service " + getInterface().getName()
  63. + ". Tried " + len + " times of the providers " + providers
  64. + " (" + providers.size() + "/" + copyInvokers.size()
  65. + ") from the registry " + directory.getUrl().getAddress()
  66. + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
  67. + Version.getVersion() + ". Last error is: "
  68. + le.getMessage(), le.getCause() != null ? le.getCause() : le);
  69. }

2.2 failfast

快速失败策略。消费者端只发起一次调用,若失败则立即报错。通常用于非幂等性的写操作,比如新增记录.
org.apache.dubbo.rpc.cluster.support.FailfastClusterInvoker#doInvoke:

  1. public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
  2. checkInvokers(invokers, invocation);
  3. Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
  4. try {
  5. return invokeWithContext(invoker, invocation);
  6. } catch (Throwable e) {
  7. if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
  8. throw (RpcException) e;
  9. }
  10. throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
  11. "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
  12. + " for service " + getInterface().getName()
  13. + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
  14. + " use dubbo version " + Version.getVersion()
  15. + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
  16. e.getCause() != null ? e.getCause() : e);
  17. }
  18. }

2.3 failsafe

失败安全策略。当消费者调用提供者出现异常时,直接忽略本次消费操作。该策略通常用于执行相对不太重要的服务。org.apache.dubbo.rpc.cluster.support.FailsafeClusterInvoker#doInvoke:

  1. public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
  2. try {
  3. checkInvokers(invokers, invocation);
  4. Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
  5. return invokeWithContext(invoker, invocation);
  6. } catch (Throwable e) {
  7. logger.error("Failsafe ignore exception: " + e.getMessage(), e);
  8. return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
  9. }
  10. }

2.4 failback

失败自动恢复策略。消费者调用提供者失败后, Dubbo 会记录下该失败请求,然后会定时发起重试请求,而定时任务执行的次数仍是通过配置文件中的 retries 指定的。 该策略通常用于实时性要求不太高的服务.

org.apache.dubbo.rpc.cluster.support.FailbackClusterInvoker#doInvoke:

  1. protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
  2. Invoker<T> invoker = null;
  3. try {
  4. checkInvokers(invokers, invocation);
  5. invoker = select(loadbalance, invocation, invokers, null);
  6. return invokeWithContext(invoker, invocation);
  7. } catch (Throwable e) {
  8. logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
  9. + e.getMessage() + ", ", e);
  10. addFailed(loadbalance, invocation, invokers, invoker);
  11. return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
  12. }
  13. }
  14. private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
  15. if (failTimer == null) {
  16. synchronized (this) {
  17. if (failTimer == null) {
  18. failTimer = new HashedWheelTimer(
  19. new NamedThreadFactory("failback-cluster-timer", true),
  20. 1,
  21. TimeUnit.SECONDS, 32, failbackTasks);
  22. }
  23. }
  24. }
  25. RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
  26. try {
  27. failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
  28. } catch (Throwable e) {
  29. logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
  30. }
  31. }

2.5 forking

并行策略。消费者对于同一服务并行调用多个提供者服务器,只要一个成功即调用结束并返回结果。通常用于实时性要求较高的读操作,但其会浪费较多服务器资源:
org.apache.dubbo.rpc.cluster.support.ForkingClusterInvoker#doInvoke:

  1. public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
  2. try {
  3. checkInvokers(invokers, invocation);
  4. // 存放的是挑选出的用于进行并行运行的invoker
  5. final List<Invoker<T>> selected;
  6. // 获取forks属性值
  7. final int forks = getUrl().getParameter(FORKS_KEY, DEFAULT_FORKS);
  8. // 获取timeout属性值,远程调用超时时限
  9. final int timeout = getUrl().getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
  10. if (forks <= 0 || forks >= invokers.size()) {
  11. selected = invokers;
  12. } else { // 处理forks取值在(0, invokers.size())范围的情况
  13. selected = new ArrayList<>(forks);
  14. while (selected.size() < forks) {
  15. // 负载均衡选择一个invoker
  16. Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
  17. if (!selected.contains(invoker)) {
  18. //Avoid add the same invoker several times.
  19. selected.add(invoker);
  20. }
  21. }
  22. }
  23. RpcContext.getServiceContext().setInvokers((List) selected);
  24. // 计数器,记录并行运行异常的invoker数量
  25. final AtomicInteger count = new AtomicInteger();
  26. // 队列:存放并行运行结果
  27. final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
  28. // 并行运行
  29. for (final Invoker<T> invoker : selected) {
  30. // 使用线程池中的线程执行,这是并行执行的过程
  31. executor.execute(() -> {
  32. try {
  33. // 远程调用
  34. Result result = invokeWithContext(invoker, invocation);
  35. // 将当前invoker执行结果写入到队列
  36. ref.offer(result);
  37. } catch (Throwable e) {
  38. // 若invoker执行过程中出现异常,则计数器加一
  39. int value = count.incrementAndGet();
  40. if (value >= selected.size()) {
  41. // 代码走到这里说明,没有任何一个并行远程调用是成功的。
  42. // 为了能够唤醒后面的poll(),这里就将异常信息写入到ref队列
  43. ref.offer(e);
  44. }
  45. }
  46. });
  47. } // end-for
  48. try {
  49. // poll()是一个阻塞方法,等待ref中具有一个元素。
  50. // 只要ref中被写入了一个元素,阻塞马上被唤醒。或一直等待到timeout超时
  51. // 注意,该poll()方法的执行与前面的并行远程调用的执行也是并行的
  52. Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
  53. if (ret instanceof Throwable) {
  54. Throwable e = (Throwable) ret;
  55. throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
  56. }
  57. return (Result) ret;
  58. } catch (InterruptedException e) {
  59. throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
  60. }
  61. } finally {
  62. // clear attachments which is binding to current thread.
  63. RpcContext.getClientAttachment().clearAttachments();
  64. }
  65. }

2.6 broadcast

广播策略。广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。
org.apache.dubbo.rpc.cluster.support.BroadcastClusterInvoker#doInvoke

2.7 available

首个可用策略。从所有 invoker 中查找,选择第一个可用的 invoker。
org.apache.dubbo.rpc.cluster.support.AvailableClusterInvoker#doInvoke

2.8 mergeable

合并策略。将多个 group 的 invoker 的执行结果进行合并

org.apache.dubbo.rpc.cluster.support.MergeableClusterInvoker#doInvoke

2.9 zone-aware

当有多个注册中心可供订阅时,该容错机制提供了一种策略,用于决定如何在它们之间分配
流量:

  1. 标记为“preferred=true”的注册表具有最高优先级。
  2. 检查当前请求所属的区域,首先选择具有相同区域的注册表。
  3. 根据每个注册表的权重均衡所有注册表之间的流量。
  4. 挑选任何有空的人。

3. 基于扩展接口自定义集群容错策略

Dubbo 本身提供了丰富的集群容错策略,但是如果你有定制化需求 ,可以根据 Dubbo 提供的扩展接口 Cluster 进行定制 。为了自定义扩展实现, 首先需要实现 Cluster 接口 :

  1. public class MyCluster implements Cluster{
  2. @Override
  3. public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
  4. return new MyClusterinvoker(directory) ;
  5. }
  6. }

在上面的代码中, MyCluster 实现了 Cluster 的 join 接口 。然后,需要集成 AbstractC lusterinvoker 类创建自己的 Clusterlnvoker 类 :

  1. public class MyCluster l nvoker<T> extends AbstractClusterlnroker<T> {
  2. public MyClusterinvoker(Di rectory<T> directory) {
  3. super(directory) ;
  4. }
  5. @Override
  6. protected Result doinvoke (工nvocation invocation, List<Invoker<T> invokers ,
  7. Loac!Balance loadbalance )
  8. throws RpcException {
  9. checklnvokers (invoker s , invocation) ;
  10. RpcContext . getContext () . setinvokers ( (List) invokers ) ;
  11. RpcExcepton exception = null ;
  12. Result result = null ;
  13. //做 自己的集群容错策略
  14. return result ;
  15. }
  16. }

通过上面的代码可知, dolnvoke 方法需要重写 ,在该方法内用户就可以实现自己的集群容错策略。
然后,在 org.apache .dubbo.rpc.cluster.Cluster 目录下创建文件 , 并在文件里添 加
myCluster=org.apache.dubbo.demo.cluster.MyCluster

最后,使用下面的方法将集群容错模式切换为 myCluster:

  1. <dubbo :reference id= "greetingService"
  2. interface"com.books.dubbo.demo.api.GreetingService " group"dubbo"
  3. cluster"myCluster">

参考文章

Dubbo3.0源码注释github地址
dubbo源码系列\
dubbo源码分析专栏


文章标签:

原文连接:https://juejin.cn/post/7118926702394212359

相关推荐

10 Dubbo 配置实战

# 8 快速入门 dubbo

7 什么是dubbo

Dubbo3源码篇10-注册中心(Zookeeper和CuratorZookeeperClient)

Dubbo3源码篇9-深度解析过滤器FilterChain及过滤器功能

Dubbo3源码篇6-集群容错策略

Dubbo源码篇4-Directort目录与Router路由服务源码分析

dubbo(九):timeout超时机制解析

抓到Dubbo异步调用的小BUG,再送你一个贡献开源代码的机会

分布式服务之Dubbo

Dubbo3源码篇2-服务订阅(本地和远程引用)源码分析

Dubbo3源码篇1-服务发现(本地和远程暴露)源码分析

Dubbo3内核篇-SPI机制(IOC和AOP)、自适应和自动激活机制源码分析

Dubbo3 官方文档贡献者征集令

动态线程池框架 DynamicTp v1.0.6版本发布。还在为Dubbo线程池耗尽烦恼吗?还在为Mq消费积压烦恼吗?

HIPPO-4J 1.3.0 正式发布:支持 Dubbo、RibbitMQ、RocketMQ 框架线程池

我是一个Dubbo数据包...

面试常问的dubbo的spi机制到底是什么?

Dubbo3 源码系列 Dubbo“纠葛”(入门篇)

从原理到操作,让你在 Apache APISIX 中代理 Dubbo3 服务更便捷