暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Axon系列之Query Handling

铁花盆的小世界 2021-06-21
778

QueryHandler处理incoming query消息,一般它就是从view models中读取数据,而view models是通过event处理来更新的,所以QueryHandler一般不会去产生event or command.


一、Handling Queries

Axon一般是在Aggregate外部的Component类中来处理query message,如下面代码所示:

    @Component
    public class MyQueryHandler {

    @QueryHandler
    public MyResult handle(QueryA query) {
    }


    @QueryHandler
    public MyResult handle(QueryB query) {
    }


    @QueryHandler
    public MyResult handle(QueryC query) {
    }
    }

    QueryHandler类也可以继承,如果有两个QueryHandler类有继承关系,则先在子类中寻找符合query类型的处理方法,找不到的话再去父类找。


    二、Dispatching Queries

    Axon支持采用QueryBus或者QueryGateway来分派Query message,而QueryGateway的底层还是基于QueryBus。

    Axon支持三种方式来分派Query,点对点模式(Point-to-Point);散集模式(Scatter-Gather),订阅模式(Subscription)。

    点对点模式(Point-to-Point)

    这种模式指Query只会被一个QueryHandler处理并返回结果,如何没有找到合适的QueryHandler,则会抛NoHandlerForQueryException异常;如果有多个相关QueryHandler被定义,则根据规则选择其一来处理Query. 代码如下所示。

      @QueryHandler 
      public List<String> query(String criteria) {
      // return the query result based on given criteria
      }

        // create a query message
        GenericQueryMessage<String, List<String>> query =
        new GenericQueryMessage<>("criteria", ResponseTypes.multipleInstancesOf(String.class));


        // send a query message and print query response
        queryBus.query(query).thenAccept(System.out::println);

        散集模式(Scatter-Gather)

        这种模式指Query请求会被多个相匹配的QueryHandler,返回结果为一个stream,这个stream包含每个QueryHandler的成功处理结果,如果没有找到一个QueryHandler或者所有的QueryHandler都发生异常,则stream为空。见代码如下。

          @QueryHandler(queryName = "query")
          public List<String> query1(String criteria) {
          // return the query result based on given criteria
          }


          @QueryHandler(queryName = "query")
          public List<String> query2(String criteria) {
          // return the query result based on given criteria
          }

            // create a query message
            GenericQueryMessage<String, List<String>> query =
            new GenericQueryMessage<>("criteria", "query", ResponseTypes.multipleInstancesOf(String.class));
            // send a query message and print query response
            queryBus.scatterGather(query, 10, TimeUnit.SECONDS)
            .map(Message::getPayload)
            .flatMap(Collection::stream)
            .forEach(System.out::println);

            订阅模式(Subscription)

            这种模式的Query也只会被一个QueryHandler处理,而且这种模式允许客户端先query到view model的当前状态数据,然后当view model的状态更新时,客户端也会收到状态更新数据。为了实现这种功能,程序在更新view model时需要用QueryUpdateEmitter。

            实现这种模式的代码分三个地方,第一是定义QueryHandler.

              @QueryHandler
              public List<CardSummary> handle(FetchCardSummariesQuery query) {
              log.trace("handling {}", query);
              TypedQuery<CardSummary> jpaQuery = entityManager.createNamedQuery("CardSummary.fetch", CardSummary.class);
              jpaQuery.setParameter("idStartsWith", query.getFilter().getIdStartsWith());
              jpaQuery.setFirstResult(query.getOffset());
              jpaQuery.setMaxResults(query.getLimit());
              return log.exit(jpaQuery.getResultList());
              }

              然后在EventHandler中试图更新view model时,同时使用QueryUpdateEmitter。

                @EventHandler
                public void on(RedeemedEvt evt) {
                // 1.
                CardSummary summary = entityManager.find(CardSummary.class, event.getId());
                summary.setRemainingValue(summary.getRemainingValue() - event.getAmount());
                // 2.
                queryUpdateEmitter.emit(FetchCardSummariesQuery.class,
                query -> event.getId().startsWith(query.getFilter().getIdStartsWith()),
                summary);
                }

                当QueryHandler和QueryUpdateEmitter都实现好了之后,可以用queryGateway.subscriptionQuery(...)发出一个query,来获取当前状态和之后更新的状态数据。

                  // 1.
                  commandGateway.sendAndWait(new IssueCmd("gc1", amount));
                  // 2.
                  FetchCardSummariesQuery fetchCardSummariesQuery =
                  new FetchCardSummariesQuery(offset, limit, filter);
                  // 3.
                  SubscriptionQueryResult<List<CardSummary>, CardSummary> fetchQueryResult = queryGateway.subscriptionQuery(
                  fetchCardSummariesQuery,
                  ResponseTypes.multipleInstancesOf(CardSummary.class),
                  ResponseTypes.instanceOf(CardSummary.class));
                  // 4.
                  fetchQueryResult.handle(cs -> cs.forEach(System.out::println), System.out::println);
                  // 5.
                  commandGateway.sendAndWait(new RedeemCmd("gc1", amount));

                  大功告成,在第4步的fetchQueryResult.handle()中,初始状态和更新后的状态都会被打印出来。

                  END



                  文章转载自铁花盆的小世界,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                  评论