
QueryHandler处理incoming query消息,一般它就是从view models中读取数据,而view models是通过event处理来更新的,所以QueryHandler一般不会去产生event or command.

一、Handling Queries
Axon一般是在Aggregate外部的Component类中来处理query message,如下面代码所示:
@Componentpublic class MyQueryHandler {@QueryHandlerpublic MyResult handle(QueryA query) {}@QueryHandlerpublic MyResult handle(QueryB query) {}@QueryHandlerpublic MyResult handle(QueryC query) {}}
QueryHandler类也可以继承,如果有两个QueryHandler类有继承关系,则先在子类中寻找符合query类型的处理方法,找不到的话再去父类找。

二、Dispatching Queries
Axon支持采用QueryBus或者QueryGateway来分派Query message,而QueryGateway的底层还是基于QueryBus。
Axon支持三种方式来分派Query,点对点模式(Point-to-Point);散集模式(Scatter-Gather),订阅模式(Subscription)。
点对点模式(Point-to-Point)
这种模式指Query只会被一个QueryHandler处理并返回结果,如何没有找到合适的QueryHandler,则会抛NoHandlerForQueryException异常;如果有多个相关QueryHandler被定义,则根据规则选择其一来处理Query. 代码如下所示。
@QueryHandlerpublic List<String> query(String criteria) {// return the query result based on given criteria}
// create a query messageGenericQueryMessage<String, List<String>> query =new GenericQueryMessage<>("criteria", ResponseTypes.multipleInstancesOf(String.class));// send a query message and print query responsequeryBus.query(query).thenAccept(System.out::println);
散集模式(Scatter-Gather)
这种模式指Query请求会被多个相匹配的QueryHandler,返回结果为一个stream,这个stream包含每个QueryHandler的成功处理结果,如果没有找到一个QueryHandler或者所有的QueryHandler都发生异常,则stream为空。见代码如下。
@QueryHandler(queryName = "query")public List<String> query1(String criteria) {// return the query result based on given criteria}@QueryHandler(queryName = "query")public List<String> query2(String criteria) {// return the query result based on given criteria}
// create a query messageGenericQueryMessage<String, List<String>> query =new GenericQueryMessage<>("criteria", "query", ResponseTypes.multipleInstancesOf(String.class));// send a query message and print query responsequeryBus.scatterGather(query, 10, TimeUnit.SECONDS).map(Message::getPayload).flatMap(Collection::stream).forEach(System.out::println);
订阅模式(Subscription)
这种模式的Query也只会被一个QueryHandler处理,而且这种模式允许客户端先query到view model的当前状态数据,然后当view model的状态更新时,客户端也会收到状态更新数据。为了实现这种功能,程序在更新view model时需要用QueryUpdateEmitter。
实现这种模式的代码分三个地方,第一是定义QueryHandler.
@QueryHandlerpublic List<CardSummary> handle(FetchCardSummariesQuery query) {log.trace("handling {}", query);TypedQuery<CardSummary> jpaQuery = entityManager.createNamedQuery("CardSummary.fetch", CardSummary.class);jpaQuery.setParameter("idStartsWith", query.getFilter().getIdStartsWith());jpaQuery.setFirstResult(query.getOffset());jpaQuery.setMaxResults(query.getLimit());return log.exit(jpaQuery.getResultList());}
然后在EventHandler中试图更新view model时,同时使用QueryUpdateEmitter。
@EventHandlerpublic void on(RedeemedEvt evt) {// 1.CardSummary summary = entityManager.find(CardSummary.class, event.getId());summary.setRemainingValue(summary.getRemainingValue() - event.getAmount());// 2.queryUpdateEmitter.emit(FetchCardSummariesQuery.class,query -> event.getId().startsWith(query.getFilter().getIdStartsWith()),summary);}
当QueryHandler和QueryUpdateEmitter都实现好了之后,可以用queryGateway.subscriptionQuery(...)发出一个query,来获取当前状态和之后更新的状态数据。
// 1.commandGateway.sendAndWait(new IssueCmd("gc1", amount));// 2.FetchCardSummariesQuery fetchCardSummariesQuery =new FetchCardSummariesQuery(offset, limit, filter);// 3.SubscriptionQueryResult<List<CardSummary>, CardSummary> fetchQueryResult = queryGateway.subscriptionQuery(fetchCardSummariesQuery,ResponseTypes.multipleInstancesOf(CardSummary.class),ResponseTypes.instanceOf(CardSummary.class));// 4.fetchQueryResult.handle(cs -> cs.forEach(System.out::println), System.out::println);// 5.commandGateway.sendAndWait(new RedeemCmd("gc1", amount));
大功告成,在第4步的fetchQueryResult.handle()中,初始状态和更新后的状态都会被打印出来。

END






