
Axon在很大程度上基于领域驱动设计(DDD)和命令查询职责分离(CQRS)的原则。在消息驱动的微服务环境中,微服务之间的通信高效、可靠、易于管理和监视是非常重要的。消息路由不需要任何手动配置,添加新的微服务应该很容易。使用Axon框架,应用程序开发人员可以隐藏通信的内部内容。Axon服务器在分布式环境中提供了同样的体验。它是一个易于使用、易于管理的平台,可以处理所有事件、命令和查询。
本文参考Axon官网文档写成,详见 https://docs.axoniq.io/reference-guide/

一、Event Sourced Aggregate的实现
Aggregate类
Aggregate包含状态和更改该状态的方法。默认情况下,Axon会将Aggregate配置为Event Sourced
方式,下面以GiftCard为例实现一个Aggregate.
import org.axonframework.commandhandling.CommandHandler;import org.axonframework.eventsourcing.EventSourcingHandler;import org.axonframework.modelling.command.AggregateIdentifier;import static org.axonframework.modelling.command.AggregateLifecycle.apply;@Aggregatepublic class GiftCard {@AggregateIdentifier // 1.private String id;@CommandHandler // 2.public GiftCard(IssueCardCommand cmd) {// 3.apply(new CardIssuedEvent(cmd.getCardId(), cmd.getAmount()));}@EventSourcingHandler // 4.public void on(CardIssuedEvent evt) {id = evt.getCardId();}// 5.protected GiftCard() {}// omitted command handlers and event sourcing handlers}
实例代码中的关键部分按编号说明如下:
@AggregateIdentifier,注解该Aggregate的标识ID,必须提供,否则Axon无法分配Command;
@CommandHandler,注解该Aggregate的Command处理方法,可以是构造方法也可以是普通方法,入参是需要可以处理的Command类别;
AggregateLifecycle#apply(Object...),用于发布一个Event,注意,该Event只在本Aggregate内可见;
@EventSourcingHandler,注解该Aggregate的Event处理方法,有个前提条件,那就是该Aggregate为
Event Sourced
方式。该方法中可更改Aggregate状态。需要注意,在Aggregate的首个Event处理中应设置ID,只有这样后续的Event才能正确分配。该方法可private不影响Axon调用;必须提供一个无参的构造方法,Axon调用它来创建Aggregate对象。
Command 类
见如下代码为例:
import org.axonframework.modelling.command.TargetAggregateIdentifier;public class IssueCardCommand {@TargetAggregateIdentifierprivate final String cardId;private final Integer amount;public IssueCardCommand(String cardId, Integer amount) {this.cardId = cardId;this.amount = amount;}// omitted getters, equals/hashCode, toString functions}public class RedeemCardCommand {@TargetAggregateIdentifierprivate final String cardId;private final String transactionId;private final Integer amount;public RedeemCardCommand(String cardId, String transactionId, Integer amount) {this.cardId = cardId;this.transactionId = transactionId;this.amount = amount;}// omitted getters, equals/hashCode, toString functions}
需要注意的是必须提供 @TargetAggregateIdentifier,用来标识AggreagteID,否则Axon无法知道该命令将分配给哪个Aggregate对象实例。
CommandHandler和EventHandler的注意事项
CommandHandler应该检查Aggregate是否处于正确的状态。如果是,则发布Event;如果不是,则可以忽略该命令,或者根据需要抛出异常。
CommandHandler中不应该更新Aggregate的状态,更新Aggregate的状态应该在EventHandler中去做。
可以在EventSourcingHandler方法中apply() 新Event,这使得EntityB可以apply event来响应EntityA所做的事情。当对Sourcing Aggregate重放历史Event时,Axon将忽略apply()调用。请注意,在从EventSourcingHandler发布Event的场景中,内部apply()调用的Event仅在所有Entity收到第一个Event后发布到Entity。如果需要根据apply内部Event后Entity的状态发布更多Event,可使用apply().andThenApply().
Aggregate的EventSoucingHandler只能处理它自己apply的event,如果要处理来自其他Aggregate实例的event,可以考虑使用Sagas或EventHandler.
Aggregate Lifecycle控制
Axon提供的AggregateLifecycle类有几个静态方法可以管理生命周期。
apply(event) or apply(event, metadata):发布event到EventBus.
createNew(class, callable):用于在Command处理中初始化一个Aggregate实例。
isLive():如果一个Aggregate完成了重放历史Event以重新创建其状态,则该Aggregate被视为“live”;如果Aggregate处于EventSourcing过程中,则AggregateLifecycle.isLive()调用将返回false。使用isLive()方法,可以控制只处理新生成的Event。
markDeleted():标识该Aggregate为deleted状态,从此之后该Aggregate不再允许处理Command,该方法只允许在EventSourcingHandler中调用。
Aggregate 存储
Aggregate需要通过Repository进行存储,Axon提供两个Repository实现:
GenericJpaRepository:适用于State-Stored Aggregate;
EventSourcingRepository:适用于Event Sourced Aggregate;
Repository接口提供两个方法用于创建和导入aggregate.
load(identifier) or load(identifier, version)
newInstance(factoryMethod)
Repository不会提供delete方法,要delete一个aggregate,应该采用AggregateLifecycle.markDeleted()
在定义Aggregate的时候,可以指定repository,见如下代码所示:
@Beanpublic Repository<MyAggregate> repositoryForMyAggregate(EventStore eventStore) {return EventSourcingRepository.builder(GiftCard.class).eventStore(eventStore).build();}...@Aggregate(repository = "repositoryForMyAggregate")public class MyAggregate {...}

二、多Entity的Aggregate
为了实现复杂的业务逻辑,一个Aggregate通常会由Aggregate Root 和其它一些Entity组成。我们需要考虑如何在Aggregate中创建Entity,以及Entity如何处理消息。
首先,同一个Aggregate的Entity之间是可以相互访问状态数据的,但是Aggregate不应该对外暴露状态。
看看下面的代码实例:
import org.axonframework.modelling.command.AggregateIdentifier;import org.axonframework.modelling.command.AggregateMember;import org.axonframework.modelling.command.EntityId;@Aggregatepublic class GiftCard {@AggregateIdentifierprivate String id;@AggregateMember // 1.private List<GiftCardTransaction> transactions = new ArrayList<>();private int remainingValue;@CommandHandlerpublic void handle(RedeemCardCommand cmd) {// Some decision making logicapply(new CardRedeemedEvent(id, cmd.getTransactionId(), cmd.getAmount()));}@EventSourcingHandlerpublic void on(CardRedeemedEvent evt) {// 4.transactions.add(new GiftCardTransaction(evt.getTransactionId(), evt.getAmount()));}// omitted constructors, command and event sourcing handlers}public class GiftCardTransaction {@EntityId // 2.private String transactionId;private int transactionValue;private boolean reimbursed = false;public GiftCardTransaction(String transactionId, int transactionValue) {this.transactionId = transactionId;this.transactionValue = transactionValue;}public String getTransactionId() {return transactionId;}@CommandHandler // 3.public void handle(ReimburseCardCommand cmd) {if (reimbursed) {throw new IllegalStateException("Transaction already reimbursed");}apply(new CardReimbursedEvent(cmd.getCardId(), transactionId, transactionValue));}@EventSourcingHandlerpublic void on(CardReimbursedEvent event) {// 5.if (transactionId.equals(event.getTransactionId())) {reimbursed = true;}}// omitted command handlers, event sourcing handlers and equals/hashCode}
上面的代码中,GiftCard是AggregateRoot,GiftCardTransaction是Entity,它们都是简单对象。代码按编号说明如下:
在AggregateRoot中引入Entity,需要加上@AggregateMember,这个成员可以是List(Iterable), 或者Map,也可以是Single Object.
如果@AggregateMember标注的成员是List or Map,则该Entity中的@EntityId是必需的。@EntityId作为Entity的标识ID,用来路由Command or Event 到正确的Entity。在Command or Event中应该包含一个同名的属性,如果不同名,则需在注解中指明routeKey,如
@EntityId(routingKey = "customRoutingProperty")
。@CommandHandler可以放在AggregateRoot中,也可以放在Entity中。如果一个Command只和某个Entity相关,则应该优先考虑放在该Entity中。
普通Entity的创建一般发生在AggregateRoot的EventSourcingHandler中,与AggregateRoot不通,普通的Entity不可以通过@CommandHandler注解的构造方法去创建。
对于EventSourced类型的Aggregate,Axon首先在AggregateRoot中找EventSourcinghandler进行处理,然后向下传播到用@AggregateMember标注的Entity,在Entity中找EventSourcingHandler进行处理。Entity的EventSourcingHandler在处理时,应先验证EntityId是否正确,然后再进行处理。
对于上面的第5点,还有一种方法,通过设置@AggregateMember的eventForwardingMode为ForwardMatchingInstances,可以让Axon只传播Event到EntityId匹配的Entity,这时就不需要再在Entity的EventSourcingHandler中验证EntityId了,代码如下:
import org.axonframework.modelling.command.AggregateIdentifier;import org.axonframework.modelling.command.AggregateMember;import org.axonframework.modelling.command.ForwardMatchingInstances;@Aggregatepublic class GiftCard {@AggregateIdentifierprivate String id;@AggregateMember(eventForwardingMode = ForwardMatchingInstances.class)private List<GiftCardTransaction> transactions = new ArrayList<>();// omitted constructors, command and event sourcing handlers}

三、外部的CommandHandler
CommandHandler除了可以放在Aggregate内部,也可以放在外面一个单例类中,然后在它的CommandHandler处理方法中分配Command到Aggregate的处理方法,见如下代码:
import org.axonframework.commandhandling.CommandHandler;import org.axonframework.modelling.command.Repository;public class GiftCardCommandHandler {// 1.private final Repository<GiftCard> giftCardRepository;@CommandHandlerpublic void handle(RedeemCardCommand cmd) {giftCardRepository.load(cmd.getCardId()) // 2..execute(giftCard -> giftCard.handle(cmd)); // 3.}// omitted constructor}

四、State-Stored Aggregate
Aggregate除了Event-Sourced方式外,还可以采用State-Stored方式,这意味着Repository保存的不是历史事件,而是最新状态。与Event-Sourced方式不同,在State-Stored Aggregate的CommandHandler中可以更新状态数据。见如下代码示例。
import org.axonframework.commandhandling.CommandHandler;import org.axonframework.eventhandling.EventHandler;import org.axonframework.modelling.command.AggregateIdentifier;import org.axonframework.modelling.command.AggregateMember;import javax.persistence.CascadeType;import javax.persistence.Entity;import javax.persistence.FetchType;import javax.persistence.Id;import javax.persistence.JoinColumn;import javax.persistence.OneToMany;@Entity // 1.public class GiftCard {@Id // 2.@AggregateIdentifierprivate String id;// 3.@OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL)@JoinColumn(name = "giftCardId")@AggregateMemberprivate List<GiftCardTransaction> transactions = new ArrayList<>();private int remainingValue;@CommandHandler // 4.public GiftCard(IssueCardCommand cmd) {if (cmd.getAmount() <= 0) {throw new IllegalArgumentException("amount <= 0");}id = cmd.getCardId();remainingValue = cmd.getAmount();// 5.apply(new CardIssuedEvent(cmd.getCardId(), cmd.getAmount()));}@CommandHandlerpublic void handle(RedeemCardCommand cmd) {// 6.if (cmd.getAmount() <= 0) {throw new IllegalArgumentException("amount <= 0");}if (cmd.getAmount() > remainingValue) {throw new IllegalStateException("amount > remaining value");}if (transactions.stream().map(GiftCardTransaction::getTransactionId).anyMatch(cmd.getTransactionId()::equals)) {throw new IllegalStateException("TransactionId must be unique");}// 7.remainingValue -= cmd.getAmount();transactions.add(new GiftCardTransaction(id, cmd.getTransactionId(), cmd.getAmount()));apply(new CardRedeemedEvent(id, cmd.getTransactionId(), cmd.getAmount()));}@EventHandler // 8.protected void on(CardReimbursedEvent event) {this.remainingValue += event.getAmount();}protected GiftCard() { } // 9.}
从上面的代码编号7可以看出,在CommandHandler中可以更新状态数据;而代码编号8处使用的是@EventHandler而不是@EventSourcingHandler,该方法会处理Aggregate发布的Event。

五、Command的调度
Axon调度Command有两种接口,一是CommandBus,一是Command Gateway。
Command Bus
Axon框架会保证,CommandBus分派的Command会送往一个正确的CommandHandler处理,如果Axon找不到一个正确的CommandHandler,将会抛出NoHandlerForCommandException异常。
CommandBus有两种分派Command的方式,见如下代码示例。
private CommandBus commandBus;public void dispatchCommands() {String cardId = UUID.randomUUID().toString();// 1.commandBus.dispatch(GenericCommandMessage.asCommandMessage(new IssueCardCommand(cardId, 100, "shopId")));// 2.commandBus.dispatch(GenericCommandMessage.asCommandMessage(new IssueCardCommand(cardId, 100, "shopId")),(CommandCallback<IssueCardCommand, String>) (cmdMsg, cmdResultMsg) -> {// 3.if (cmdResultMsg.isExceptional()) {Throwable throwable = cmdResultMsg.exceptionResult();} else {String commandResult = cmdResultMsg.getPayload();}});}
上述代码按编号说明如下:
分派的Command对象,需要采用GenericCommandMessage.asCommandMessage(…)包一下才可以分派;当对Command结果不感兴趣时,使用dispatch(CommandMessage)
当对Command结果感兴趣时,采用dispatch(CommandMessage, CommandCallback),在CommandCallback的回调方法中可以获得CommandResultMessage进行处理。
CommandResultMessage提供了一个isExceptional()来判断CommandHandler处理过程中是否发生异常,如果发生了异常,可以用exceptionResult()来获取异常信息;否则的话可以用getPayload()来获取Command结果。需要注意的是,CommandCallback的执行线程与dispatch()的分派执行线程可能不是一个线程,如果线程一致需要用FutureCallback.
Command Gateway
CommandGateway是一种更方便的分派Command的方式,它是CommandBus的抽象接口,它底层还是把Command分派到CommandBus.
CommandGateway提供了两个方法来分派Command,send() 和 sendAndWait()。
CommandGateway.send(Object) 是非阻塞方式,它立刻返回一个CompletableFuture对象,这允许在返回Command结果后进行异步的后续操作。如下代码所示。
private CommandGateway commandGateway; // 1.public void sendCommand() {String cardId = UUID.randomUUID().toString(); // 2.// 3.CompletableFuture<String> futureResult = commandGateway.send(new IssueCardCommand(cardId, 100, "shopId"));}
而CommandGateway#sendAndWait(Object)采用阻塞模式,分派线程将会阻塞,直到Command执行完成返回结果,或者超时,如下代码所示。
private CommandGateway commandGateway;public void sendCommandAndWaitOnResult() {IssueCardCommand commandPayload = new IssueCardCommand(UUID.randomUUID().toString(), 100, "shopId");// 1.String result = commandGateway.sendAndWait(commandPayload);// 2.result = commandGateway.sendAndWait(commandPayload, 1000, TimeUnit.MILLISECONDS);}
Command Dispatching Results
分派结果依赖分派过程以及CommandHandler处理过程,比如CommandHandler抛了异常,则这个异常就会是Dispatching Result.
Axon不建议CommandHandler方法返回有业务含义的结果,如果有这个需求,那么应该用Query而不是Command,所以大多数情况下成功的Dispatching Result是null.

六、Aggregate创建另一个Aggregate
通常情况下,Aggregate中存在一个用@CommandHandler注解的构造方法,应用可以通过在Aggregate外部分派一个相关的Command来引发该Aggregate的创建。
但是,也可以在一个Aggregate的CommandHandler中,通过AggregateLifecycle#createNew()来创建另一个Aggregate,见如下代码:
public class AggregateB {...@CommandHandlerpublic void AggregateB(SomeAggregateBCommand command) {AggregateLifecycle.createNew(AggregateA.class, () -> new AggregateA(/* provide the id for AggregateA */));}}
注意,AggregateLifecycle#createNew()不可以在EventSourcingHandler中调用,否则会抛异常UnsupportedOperationException。

END






