暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Apache Dubbo2.7.8 事件通知机制实现回调的bug

老吕架构 2021-05-16
732

背景

dubbo的事件通知官方例子是通过xml配置实现的(

https://dubbo.apache.org/zh/docs/v2.7/user/examples/events-notify/

那么通过注解的方式能不能实现呢?周末实践了一把,结果还真出事了。

实验过程

1、ISeataStorageService 是要远程调用的接口,里面有 selectCount,selectAll,insert,delete等方法

    public interface ISeataStorageService {
    List<SeataStoragePo> selectAll(SeataStoragePo po);
    int selectCount();
    int insert(SeataStoragePo po);
    int delete(SeataStoragePo po);
    int update(SeataStoragePo po);
    }


    2、INotify用来定义事件通知触发的方法(方法的参数定义规则按照dubbo规定来)

      public interface INotify{
      void oninvoke(SeataStoragePo po);
      //第一个参数为方法返回值,后面的参数是方法入参,没有入参的可以不写
      void onreturn(List<SeataStoragePo> list,SeataStoragePo po);
      //第一个参数为异常,后面的参数是方法入参
      void onthrow(Throwable ex,SeataStoragePo po);
      }

      3、NotifyImpl通知实现类

        @Component("notify")
        public class NotifyImpl implements INotify {
        @Override
        public void oninvoke(SeataStoragePo po){
        System.out.println("oninvoke");
        }
        @Override
        public void onreturn(List<SeataStoragePo> list,SeataStoragePo po){
        System.out.println("onreturn");
        list.stream().forEach((storagePo)->{
        System.out.println(storagePo.getId());
        });
        }
        @Override
        public void onthrow(Throwable ex,SeataStoragePo po){
        System.out.println("onthrow:"+ex.getMessage());
        }
        }

        4、消费者端使用注解配置事件通知代码

          @DubboReference(methods ={
          @Method(name = "selectCount",async = true),
          @Method(name = "selectAll",async = true
          ,oninvoke = "notify.oninvoke"
          ,onreturn ="notify.onreturn"
          ,onthrow ="notify.onthrow"
          ),
          @Method(name = "insert",async = true),
          @Method(name = "delete",async = true)
          })
          private ISeataStorageService seataStorageService;

          5、高兴的试一把

            java.lang.IllegalStateException: service:com.dhy.common.itf.ISeataStorageService has a onthrow callback config , but no such method found. url:dubbo://169.254.68.118:21880/com.dhy.common.itf.ISeataStorageService?anyhost=true&application=consumer-service&check=false&delete.async=true&delete.return=true&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&init=false&insert.async=true&insert.return=true&interface=com.dhy.common.itf.ISeataStorageService&metadata-type=remote&methods=selectAll,update,insert,selectCount,delete&pid=70856&qos.enable=false&reference.filter=myConsumerFilter&register.ip=192.168.3.3&release=2.7.8&remote.application=provider-service&retries=0&selectAll.async=true&selectAll.return=true&selectCount.async=true&selectCount.return=true&service.filter=myProviderFilter&side=consumer&sticky=false&timeout=30000&timestamp=1621155227043
            at org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter.fireThrowCallback(FutureFilter.java:156) ~[dubbo-2.7.8.jar:2.7.8]
            at org.apache.dubbo.rpc.protocol.dubbo.filter.FutureFilter.onError(FutureFilter.java:65) ~[dubbo-2.7.8.jar:2.7.8]
            at org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:97) ~[dubbo-2.7.8.jar:2.7.8]
            at org.apache.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:69) ~[dubbo-2.7.8.jar:2.7.8]

            经过多次修改尝试发现都会报 onXXX callback config , but no such method found 错误,反复核对确认配置无误,只能跟踪源码看下到底哪个步骤出了问题

            问题追踪

            1、错误爆出的位置源码

            2、看来是配置信息还是不太正确,往回追踪吧,dubbo的配置信息无论是通过xml还是注解还是api,最终都会回到 ReferenceConfig和ServiceConfig,我今天的情况就需要在ReferenceConfig里打断点,看看初始化过程中那个onthrowMethod配置信息到底时从哪里读取的,为什么会没有。

            在ReferenceConfig.init 方法里发现了这么一段代码用来解析存储事件异步通知相关的方法配置信息的。

              //用来存储方法级别异步相关的配置信息
              Map<String, AsyncMethodInfo> attributes = null;
              if (CollectionUtils.isNotEmpty(getMethods())) {
              //存在针对方法的配置信息
              attributes = new HashMap<>();
              //依次解析每个方法的配置情况,并存储起来
              for (MethodConfig methodConfig : getMethods()) {
              AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
              //重试次数相关的配置
              String retryKey = methodConfig.getName() + ".retry";
              if (map.containsKey(retryKey)) {
              String retryValue = map.remove(retryKey);
              if ("false".equals(retryValue)) {
              map.put(methodConfig.getName() + ".retries", "0");
              }
              }
              //==========重点来了:异步回调相关的配置信息================
              //AsyncMethodInfo的结构在下面,可以看到里面有 onXXXMethod相关的信息的,那么很有可能是这里初始化时没有获取到正确的信息填充,导致后面的报错
              AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
              if (asyncMethodInfo != null) {
              // consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
              //存储起来,供后面使用
              attributes.put(methodConfig.getName(), asyncMethodInfo);
              }
              }
              }

              AsyncMethodInfo 的结构如下:

                public class AsyncMethodInfo {
                // callback instance when async-call is invoked
                private Object oninvokeInstance;


                // callback method when async-call is invoked
                private Method oninvokeMethod;


                // callback instance when async-call is returned
                private Object onreturnInstance;


                // callback method when async-call is returned
                private Method onreturnMethod;


                // callback instance when async-call has exception thrown
                private Object onthrowInstance;


                // callback method when async-call has exception thrown
                private Method onthrowMethod;
                }

                再看下

                AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig); 方法实现,找到了 onXxxxInstance和onXxxxMethod的来源

                  protected static AsyncMethodInfo convertMethodConfig2AsyncInfo(MethodConfig methodConfig) {
                  if (methodConfig == null || (methodConfig.getOninvoke() == null && methodConfig.getOnreturn() == null && methodConfig.getOnthrow() == null)) {
                  return null;
                  }


                  //check config conflict
                  if (Boolean.FALSE.equals(methodConfig.isReturn()) && (methodConfig.getOnreturn() != null || methodConfig.getOnthrow() != null)) {
                  throw new IllegalStateException("method config error : return attribute must be set true when onreturn or onthrow has been set.");
                  }


                  AsyncMethodInfo asyncMethodInfo = new AsyncMethodInfo();


                  //==============看到没有,onXxxxInstance的信息来源于 methodConfig中的onXxxxx属性 ===============
                  asyncMethodInfo.setOninvokeInstance(methodConfig.getOninvoke());
                  asyncMethodInfo.setOnreturnInstance(methodConfig.getOnreturn());
                  asyncMethodInfo.setOnthrowInstance(methodConfig.getOnthrow());


                  try {

                  //==============看到没有,onXxxxMethod的信息来源于 methodConfig中的onXxxxxMethod属性 ===============
                  String oninvokeMethod = methodConfig.getOninvokeMethod();
                  if (StringUtils.isNotEmpty(oninvokeMethod)) {
                  asyncMethodInfo.setOninvokeMethod(getMethodByName(methodConfig.getOninvoke().getClass(), oninvokeMethod));
                  }


                  String onreturnMethod = methodConfig.getOnreturnMethod();
                  if (StringUtils.isNotEmpty(onreturnMethod)) {
                  asyncMethodInfo.setOnreturnMethod(getMethodByName(methodConfig.getOnreturn().getClass(), onreturnMethod));
                  }


                  String onthrowMethod = methodConfig.getOnthrowMethod();
                  if (StringUtils.isNotEmpty(onthrowMethod)) {
                  asyncMethodInfo.setOnthrowMethod(getMethodByName(methodConfig.getOnthrow().getClass(), onthrowMethod));
                  }
                  } catch (Exception e) {
                  throw new IllegalStateException(e.getMessage(), e);
                  }


                  return asyncMethodInfo;
                  }

                  3、看来确实时少配置信息了,按照源码的意思是 实例和方法分开到两个属性里配置了, 那我就配置下 onXxxxMethod属性吧,接着你会发现它并不支持

                  org.apache.dubbo.config.annotation.Method  注解里也没有相关的配置项

                  解决方案

                  通过源码分析,基本确定是 dubbo 的问题了,找了下相关的dubbo  bug列表,真的发现了相关的bug报告

                  https://github.com/apache/dubbo/issues/6833  ,  这个报告和我遇到的问题是一样的,那么是不是已经在新版本中修正了呢?发现7754补丁确实解决了  https://github.com/apache/dubbo/pull/7754  ,不过是22小时之前合并的代码,那不就是昨天(2021-5-15)吗,最近一个发布的版本是 2.7.11 (2021-5-5日发布,里面没有7754补丁) ,所以可以确定最后一个版本2.7.11里仍然存在这个问题。只能等 2.7.12版本了。

                  顺便也看下7754补丁是怎么解决这个问题的吧,如下所示,它是在 MethodConfig类初始化方法里做了改进。

                  总结

                  1、bug总是会有的,关键是你能确定它是dubbo的bug,要想确定是dubbo的bug,从dubbo源码级别定位问题是必不可少的。

                  2、dubbo的官方示例代码多是使用xml来配置的,我觉得项目中使用注解的配置方式会越来越多。

                  3、实践是检验代码的唯一标准。


                  文章转载自老吕架构,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                  评论