暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Axon系列之Command Handling

铁花盆的小世界 2021-06-21
1991

Axon在很大程度上基于领域驱动设计(DDD)和命令查询职责分离(CQRS)的原则。在消息驱动的微服务环境中,微服务之间的通信高效、可靠、易于管理和监视是非常重要的。消息路由不需要任何手动配置,添加新的微服务应该很容易。使用Axon框架,应用程序开发人员可以隐藏通信的内部内容。Axon服务器在分布式环境中提供了同样的体验。它是一个易于使用、易于管理的平台,可以处理所有事件、命令和查询。

本文参考Axon官网文档写成,详见 https://docs.axoniq.io/reference-guide/


一、Event Sourced Aggregate的实现

Aggregate类

Aggregate包含状态和更改该状态的方法。默认情况下,Axon会将Aggregate配置为Event Sourced
方式,下面以GiftCard为例实现一个Aggregate.

    import org.axonframework.commandhandling.CommandHandler;
    import org.axonframework.eventsourcing.EventSourcingHandler;
    import org.axonframework.modelling.command.AggregateIdentifier;


    import static org.axonframework.modelling.command.AggregateLifecycle.apply;


    @Aggregate
    public class GiftCard {


    @AggregateIdentifier // 1.
    private String id;


    @CommandHandler // 2.
    public GiftCard(IssueCardCommand cmd) {
    // 3.
    apply(new CardIssuedEvent(cmd.getCardId(), cmd.getAmount()));
    }


    @EventSourcingHandler // 4.
    public void on(CardIssuedEvent evt) {
    id = evt.getCardId();
    }


    // 5.
    protected GiftCard() {
    }
    // omitted command handlers and event sourcing handlers
    }

    实例代码中的关键部分按编号说明如下:

    1. @AggregateIdentifier,注解该Aggregate的标识ID,必须提供,否则Axon无法分配Command;

    2. @CommandHandler,注解该Aggregate的Command处理方法,可以是构造方法也可以是普通方法,入参是需要可以处理的Command类别;

    3. AggregateLifecycle#apply(Object...),用于发布一个Event,注意,该Event只在本Aggregate内可见;

    4. @EventSourcingHandler,注解该Aggregate的Event处理方法,有个前提条件,那就是该Aggregate为Event Sourced
      方式。该方法中可更改Aggregate状态。需要注意,在Aggregate的首个Event处理中应设置ID,只有这样后续的Event才能正确分配。该方法可private不影响Axon调用;

    5. 必须提供一个无参的构造方法,Axon调用它来创建Aggregate对象。


    Command 类

    见如下代码为例:

      import org.axonframework.modelling.command.TargetAggregateIdentifier;


      public class IssueCardCommand {


      @TargetAggregateIdentifier
      private final String cardId;
      private final Integer amount;


      public IssueCardCommand(String cardId, Integer amount) {
      this.cardId = cardId;
      this.amount = amount;
      }
      // omitted getters, equals/hashCode, toString functions
      }


      public class RedeemCardCommand {


      @TargetAggregateIdentifier
      private final String cardId;
      private final String transactionId;
      private final Integer amount;


      public RedeemCardCommand(String cardId, String transactionId, Integer amount) {
      this.cardId = cardId;
      this.transactionId = transactionId;
      this.amount = amount;
      }
      // omitted getters, equals/hashCode, toString functions
      }

      需要注意的是必须提供 @TargetAggregateIdentifier,用来标识AggreagteID,否则Axon无法知道该命令将分配给哪个Aggregate对象实例。


      CommandHandler和EventHandler的注意事项

      CommandHandler应该检查Aggregate是否处于正确的状态。如果是,则发布Event;如果不是,则可以忽略该命令,或者根据需要抛出异常。

      CommandHandler中不应该更新Aggregate的状态,更新Aggregate的状态应该在EventHandler中去做。

      可以在EventSourcingHandler方法中apply() 新Event,这使得EntityB可以apply event来响应EntityA所做的事情。当对Sourcing Aggregate重放历史Event时,Axon将忽略apply()调用。请注意,在从EventSourcingHandler发布Event的场景中,内部apply()调用的Event仅在所有Entity收到第一个Event后发布到Entity。如果需要根据apply内部Event后Entity的状态发布更多Event,可使用apply().andThenApply().

      Aggregate的EventSoucingHandler只能处理它自己apply的event,如果要处理来自其他Aggregate实例的event,可以考虑使用Sagas或EventHandler.


      Aggregate Lifecycle控制

      Axon提供的AggregateLifecycle类有几个静态方法可以管理生命周期。

      1. apply(event) or apply(event, metadata):发布event到EventBus.

      2. createNew(class, callable):用于在Command处理中初始化一个Aggregate实例。

      3. isLive():如果一个Aggregate完成了重放历史Event以重新创建其状态,则该Aggregate被视为“live”;如果Aggregate处于EventSourcing过程中,则AggregateLifecycle.isLive()调用将返回false。使用isLive()方法,可以控制只处理新生成的Event。

      4. markDeleted():标识该Aggregate为deleted状态,从此之后该Aggregate不再允许处理Command,该方法只允许在EventSourcingHandler中调用。


      Aggregate 存储

      Aggregate需要通过Repository进行存储,Axon提供两个Repository实现:

      1. GenericJpaRepository:适用于State-Stored Aggregate;

      2. EventSourcingRepository:适用于Event Sourced Aggregate;

      Repository接口提供两个方法用于创建和导入aggregate.

      1. load(identifier) or load(identifier, version)

      2. newInstance(factoryMethod)

      Repository不会提供delete方法,要delete一个aggregate,应该采用AggregateLifecycle.markDeleted()

      在定义Aggregate的时候,可以指定repository,见如下代码所示:

        @Bean
        public Repository<MyAggregate> repositoryForMyAggregate(EventStore eventStore) {
        return EventSourcingRepository.builder(GiftCard.class).eventStore(eventStore).build();
        }
        ...
        @Aggregate(repository = "repositoryForMyAggregate")
        public class MyAggregate {...}



        二、多Entity的Aggregate

        为了实现复杂的业务逻辑,一个Aggregate通常会由Aggregate Root 和其它一些Entity组成。我们需要考虑如何在Aggregate中创建Entity,以及Entity如何处理消息。

        首先,同一个Aggregate的Entity之间是可以相互访问状态数据的,但是Aggregate不应该对外暴露状态。

        看看下面的代码实例:

          import org.axonframework.modelling.command.AggregateIdentifier;
          import org.axonframework.modelling.command.AggregateMember;
          import org.axonframework.modelling.command.EntityId;


          @Aggregate
          public class GiftCard {


          @AggregateIdentifier
          private String id;


          @AggregateMember // 1.
          private List<GiftCardTransaction> transactions = new ArrayList<>();


          private int remainingValue;

          @CommandHandler
          public void handle(RedeemCardCommand cmd) {
          // Some decision making logic
          apply(new CardRedeemedEvent(id, cmd.getTransactionId(), cmd.getAmount()));
          }


          @EventSourcingHandler
          public void on(CardRedeemedEvent evt) {
          // 4.
          transactions.add(new GiftCardTransaction(evt.getTransactionId(), evt.getAmount()));
          }


          // omitted constructors, command and event sourcing handlers
          }


          public class GiftCardTransaction {


          @EntityId // 2.
          private String transactionId;


          private int transactionValue;
          private boolean reimbursed = false;


          public GiftCardTransaction(String transactionId, int transactionValue) {
          this.transactionId = transactionId;
          this.transactionValue = transactionValue;
          }


          public String getTransactionId() {
          return transactionId;
          }

          @CommandHandler // 3.
          public void handle(ReimburseCardCommand cmd) {
          if (reimbursed) {
          throw new IllegalStateException("Transaction already reimbursed");
          }
          apply(new CardReimbursedEvent(cmd.getCardId(), transactionId, transactionValue));
          }

          @EventSourcingHandler
          public void on(CardReimbursedEvent event) {
          // 5.
          if (transactionId.equals(event.getTransactionId())) {
          reimbursed = true;
          }
          }


          // omitted command handlers, event sourcing handlers and equals/hashCode
          }

          上面的代码中,GiftCard是AggregateRoot,GiftCardTransaction是Entity,它们都是简单对象。代码按编号说明如下:

          1. 在AggregateRoot中引入Entity,需要加上@AggregateMember,这个成员可以是List(Iterable), 或者Map,也可以是Single Object.

          2. 如果@AggregateMember标注的成员是List or Map,则该Entity中的@EntityId是必需的。@EntityId作为Entity的标识ID,用来路由Command or Event 到正确的Entity。在Command or Event中应该包含一个同名的属性,如果不同名,则需在注解中指明routeKey,如 @EntityId(routingKey = "customRoutingProperty")

          3. @CommandHandler可以放在AggregateRoot中,也可以放在Entity中。如果一个Command只和某个Entity相关,则应该优先考虑放在该Entity中。

          4. 普通Entity的创建一般发生在AggregateRoot的EventSourcingHandler中,与AggregateRoot不通,普通的Entity不可以通过@CommandHandler注解的构造方法去创建。

          5. 对于EventSourced类型的Aggregate,Axon首先在AggregateRoot中找EventSourcinghandler进行处理,然后向下传播到用@AggregateMember标注的Entity,在Entity中找EventSourcingHandler进行处理。Entity的EventSourcingHandler在处理时,应先验证EntityId是否正确,然后再进行处理。

          对于上面的第5点,还有一种方法,通过设置@AggregateMember的eventForwardingMode为ForwardMatchingInstances,可以让Axon只传播Event到EntityId匹配的Entity,这时就不需要再在Entity的EventSourcingHandler中验证EntityId了,代码如下:

            import org.axonframework.modelling.command.AggregateIdentifier;
            import org.axonframework.modelling.command.AggregateMember;
            import org.axonframework.modelling.command.ForwardMatchingInstances;


            @Aggregate
            public class GiftCard {


            @AggregateIdentifier
            private String id;
            @AggregateMember(eventForwardingMode = ForwardMatchingInstances.class)
            private List<GiftCardTransaction> transactions = new ArrayList<>();


            // omitted constructors, command and event sourcing handlers
            }


            三、外部的CommandHandler

            CommandHandler除了可以放在Aggregate内部,也可以放在外面一个单例类中,然后在它的CommandHandler处理方法中分配Command到Aggregate的处理方法,见如下代码:

              import org.axonframework.commandhandling.CommandHandler;
              import org.axonframework.modelling.command.Repository;


              public class GiftCardCommandHandler {


              // 1.
              private final Repository<GiftCard> giftCardRepository;


              @CommandHandler
              public void handle(RedeemCardCommand cmd) {
              giftCardRepository.load(cmd.getCardId()) // 2.
              .execute(giftCard -> giftCard.handle(cmd)); // 3.
              }


              // omitted constructor
              }


              四、State-Stored Aggregate

              Aggregate除了Event-Sourced方式外,还可以采用State-Stored方式,这意味着Repository保存的不是历史事件,而是最新状态。与Event-Sourced方式不同,在State-Stored Aggregate的CommandHandler中可以更新状态数据。见如下代码示例。

                import org.axonframework.commandhandling.CommandHandler;
                import org.axonframework.eventhandling.EventHandler;
                import org.axonframework.modelling.command.AggregateIdentifier;
                import org.axonframework.modelling.command.AggregateMember;


                import javax.persistence.CascadeType;
                import javax.persistence.Entity;
                import javax.persistence.FetchType;
                import javax.persistence.Id;
                import javax.persistence.JoinColumn;
                import javax.persistence.OneToMany;


                @Entity // 1.
                public class GiftCard {


                @Id // 2.
                @AggregateIdentifier
                private String id;


                // 3.
                @OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL)
                @JoinColumn(name = "giftCardId")
                @AggregateMember
                private List<GiftCardTransaction> transactions = new ArrayList<>();


                private int remainingValue;


                @CommandHandler // 4.
                public GiftCard(IssueCardCommand cmd) {
                if (cmd.getAmount() <= 0) {
                throw new IllegalArgumentException("amount <= 0");
                }
                id = cmd.getCardId();
                remainingValue = cmd.getAmount();


                // 5.
                apply(new CardIssuedEvent(cmd.getCardId(), cmd.getAmount()));
                }


                @CommandHandler
                public void handle(RedeemCardCommand cmd) {
                // 6.
                if (cmd.getAmount() <= 0) {
                throw new IllegalArgumentException("amount <= 0");
                }
                if (cmd.getAmount() > remainingValue) {
                throw new IllegalStateException("amount > remaining value");
                }
                if (transactions.stream().map(GiftCardTransaction::getTransactionId).anyMatch(cmd.getTransactionId()::equals)) {
                throw new IllegalStateException("TransactionId must be unique");
                }


                // 7.
                remainingValue -= cmd.getAmount();
                transactions.add(new GiftCardTransaction(id, cmd.getTransactionId(), cmd.getAmount()));


                apply(new CardRedeemedEvent(id, cmd.getTransactionId(), cmd.getAmount()));
                }


                @EventHandler // 8.
                protected void on(CardReimbursedEvent event) {
                this.remainingValue += event.getAmount();
                }


                protected GiftCard() { } // 9.
                }


                从上面的代码编号7可以看出,在CommandHandler中可以更新状态数据;而代码编号8处使用的是@EventHandler而不是@EventSourcingHandler,该方法会处理Aggregate发布的Event。

                五、Command的调度

                Axon调度Command有两种接口,一是CommandBus,一是Command Gateway。

                Command Bus

                Axon框架会保证,CommandBus分派的Command会送往一个正确的CommandHandler处理,如果Axon找不到一个正确的CommandHandler,将会抛出NoHandlerForCommandException异常。

                CommandBus有两种分派Command的方式,见如下代码示例。

                  private CommandBus commandBus;


                  public void dispatchCommands() {
                  String cardId = UUID.randomUUID().toString();


                  // 1.
                  commandBus.dispatch(GenericCommandMessage.asCommandMessage(new IssueCardCommand(cardId, 100, "shopId")));


                  // 2.
                  commandBus.dispatch(
                  GenericCommandMessage.asCommandMessage(new IssueCardCommand(cardId, 100, "shopId")),
                  (CommandCallback<IssueCardCommand, String>) (cmdMsg, cmdResultMsg) -> {
                  // 3.
                  if (cmdResultMsg.isExceptional()) {
                  Throwable throwable = cmdResultMsg.exceptionResult();
                  } else {
                  String commandResult = cmdResultMsg.getPayload();
                  }
                  }
                  );
                  }

                  上述代码按编号说明如下:

                  1. 分派的Command对象,需要采用GenericCommandMessage.asCommandMessage(…)包一下才可以分派;当对Command结果不感兴趣时,使用dispatch(CommandMessage)

                  2. 当对Command结果感兴趣时,采用dispatch(CommandMessage, CommandCallback),在CommandCallback的回调方法中可以获得CommandResultMessage进行处理。

                  3. CommandResultMessage提供了一个isExceptional()来判断CommandHandler处理过程中是否发生异常,如果发生了异常,可以用exceptionResult()来获取异常信息;否则的话可以用getPayload()来获取Command结果。需要注意的是,CommandCallback的执行线程与dispatch()的分派执行线程可能不是一个线程,如果线程一致需要用FutureCallback.


                  Command Gateway

                  CommandGateway是一种更方便的分派Command的方式,它是CommandBus的抽象接口,它底层还是把Command分派到CommandBus.

                  CommandGateway提供了两个方法来分派Command,send() 和 sendAndWait()。

                  CommandGateway.send(Object) 是非阻塞方式,它立刻返回一个CompletableFuture对象,这允许在返回Command结果后进行异步的后续操作。如下代码所示。

                    private CommandGateway commandGateway; // 1.


                    public void sendCommand() {
                    String cardId = UUID.randomUUID().toString(); // 2.


                    // 3.
                    CompletableFuture<String> futureResult = commandGateway.send(new IssueCardCommand(cardId, 100, "shopId"));
                    }

                    而CommandGateway#sendAndWait(Object)采用阻塞模式,分派线程将会阻塞,直到Command执行完成返回结果,或者超时,如下代码所示。

                      private CommandGateway commandGateway;


                      public void sendCommandAndWaitOnResult() {
                      IssueCardCommand commandPayload = new IssueCardCommand(UUID.randomUUID().toString(), 100, "shopId");
                      // 1.
                      String result = commandGateway.sendAndWait(commandPayload);


                      // 2.
                      result = commandGateway.sendAndWait(commandPayload, 1000, TimeUnit.MILLISECONDS);
                      }


                      Command Dispatching Results

                      分派结果依赖分派过程以及CommandHandler处理过程,比如CommandHandler抛了异常,则这个异常就会是Dispatching Result.

                      Axon不建议CommandHandler方法返回有业务含义的结果,如果有这个需求,那么应该用Query而不是Command,所以大多数情况下成功的Dispatching Result是null.

                      六、Aggregate创建另一个Aggregate

                      通常情况下,Aggregate中存在一个用@CommandHandler注解的构造方法,应用可以通过在Aggregate外部分派一个相关的Command来引发该Aggregate的创建。

                      但是,也可以在一个Aggregate的CommandHandler中,通过AggregateLifecycle#createNew()来创建另一个Aggregate,见如下代码:

                        public class AggregateB {
                        ...
                        @CommandHandler
                        public void AggregateB(SomeAggregateBCommand command) {
                        AggregateLifecycle.createNew(AggregateA.class, () -> new AggregateA(/* provide the id for AggregateA */));
                        }


                        }

                        注意,AggregateLifecycle#createNew()不可以在EventSourcingHandler中调用,否则会抛异常UnsupportedOperationException。


                        END




                        文章转载自铁花盆的小世界,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                        评论