暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Reactor 编程之旅

领创集团Advance Group 2021-12-29
1392
序言:lamda与FunctionalInterface
lamda表达式,其本质是⼀段函数,确切的说是⼀段闭包。我们当然可以通过定义普通的函数来代替lamda,但是lamda形式的函数使得函数的定义更加灵活,同时可以让你的代码看起来很简洁。在java8以前,如果想使⽤lamda表达式(确切的说是像使⽤lamda表达式那样取调⽤⼀个函数),只能使⽤接口+匿名类的方式:
    定义⼀个接⼝
    interface TestInterface {
    void method();
    }
    static void main(String[] args) {
    定义接⼝的匿名实现
    TestInterface testInterface = new TestInterface() {
    @Override
    public void method() {
    System.out.println("hello world");
    }
    };
    调⽤接⼝
    testInterface.method();
    }
    从Java8开始,引入了lamda表达式和functioninterface。所谓functioninterface,是指当接口中有且只有⼀个需要实现的函数时,可以给此接口加上@FunctionalInterface注解,同时此接口可以采⽤lamda的形式进⾏实现。
    使⽤lamda表达式:
      / 定义⼀个接⼝
      @FunctionalInterface //注意这⾥添加了注解
      interface TestInterface {
      void method();
      }
      static void main(String[] args) {
      // 定义接⼝的匿名实现
      TestInterface testInterface = () -> {
      System.out.println("hello world");
      };
      // 调⽤接⼝
      testInterface.method();
      }
      @FunctionalInterface 注解严格来说更像⼀种标识,⽤来标识当前接⼝符合FunctionalInterface规范,因此如果接⼝本身已经符合了 FunctionalInterface规范,是可以不用添加 @FunctionalInterface 注解的。因此上⾯示例中的 @FunctionalInterface 注解是可以省略的。
      java8在引⼊FunctionalInterface的同时,还将⼀些⾼频使⽤的场景进⾏了封装,这些接⼝都位于 java.util.function 路径下,例如:
      public interface Consumer<T> :只有⼀个参数的函数
      public interface BiConsumer<T, U> :两个参数的函数
      public interface Function<T, R> :⼀个⼊参,⼀个返回值的函数
      public interface Supplier<T> :只有⼀个返回值的函数
      其他更多的示例可以直接查看源码,这些基本能覆盖80%以上的场景。
      同时,⼀些Java以前的接⼝,例如 Runnable Callable ,也都添加了 @FunctionalInterface 注解,因此在使⽤这些接⼝时,可以直接使⽤lamda表达式去编写他们的实现。
      Reactive Programming、Reactive Streams和Reactor
      Reactive Programming,中⽂称反应式编程,是⼀种⾼性能应⽤的编程⽅式。其最早是由微软提出并引⼊到 .NET 平台中,随后 ES6 也引⼊了类似的技术( Promise编程)。在 Java 平台上,较早采⽤反应式编程技术的是 Netflflix 公司开源的 RxJava 框架。现在⼤家⽐较熟知的Hystrix 就是以 RxJava 为基础开发的。
      和Reactive Programming对应的就是Imperative Programming(指令式编程),我们常⽤的Java、Python 等语⾔写代码的常⻅编程⻛格即为指令式编程,此⻛格的特点是代码执⾏顺序和编写顺序基本⼀致。
      Reactive Programming也可以称为Observable模式(可以类⽐观察者模式),ImperativeProgramming可以称为Iterable模式,对应的前者即为推模式,后者为拉模式,推的是事件,拉的是指令。
      在Java 平台上,Netflflix、TypeSafe、Pivatol共同制定了⼀个被称为 Reactive Streams 的项⽬规范,⽤于制定反应式编程相关的规范以及接⼝,此规范主要的接⼝有这三个:
      • Publisher

      • Subscriber

      • Subcription

      其中,Subcriber 主要包含了 onNext、onError、onCompleted 这三个⽅法。对于 ReactiveStreams,⼤家只需要理解其思想就可以。
      在Spring 5中,作为在背后⽀持其反应式编程的框架 Reactor,onNext对应Reactor的 doOnNext() ⽅法;onError对应Reactor的 onErrorContinue()onErrorResume()onErrorReturn() ⽅法;onCompleted对应Reactor的 doOnSuccess() ⽅法。

      ⼀句话,Reactive Programming是⼀种编程⽅式;Reactive Streams是针对ReactiveProgramming制定的编程规范;Reactor是实现了Reactive Streams的编程框架(有具体的组件包),这些为spring webflflux提供了组件基础。

      Thread per Connection 和 Reactor in SingleThread
      在操作系统中,进程(process)是线程(thread)的集合,线程是进程中的最⼩执⾏单位。在处理并发时,我们有两种编程思路,⼀个是多线程⽅式,⼀个是协程⽅式(python⾥的概念,这⾥我想不到更好的名词了)。
      • Thread per Connection:即⼀请求⼀线程,为每个请求分配⼀个线程,此线程负责请求的执⾏,当请求执⾏结束时,线程也随之结束。在没有nio之前,这是传统的java⽹络编程和servlet等所采⽤的即为线程模型。此⽅案的优点即实现简单,缺点则是⽅案的伸缩性受到线程数的限制。

      • Reactor in Single Thread:即为将需要执⾏的多个任务放置到⼀个队列中,并通过事件驱动的⽅式(通常做法),将每个任务交由某个线程去执⾏。在此⽅式下,即使只有⼀个线程,也可以实现⼀种伪多线程(参考python的协程),此⽅式的实现⽅式之⼀即为IO多路复⽤。此⽅案的优点是不受线程数的限制,且适合于CPU资源紧张的应⽤上。基于nio的mina、netty等框架,就是使⽤的此⽅式;缺点是受限于使⽤场景,仅适合于IO密集的应⽤,不太适合CPU密集的应⽤。

      spring mvc和spring webflflux
      先说springmvc,springmvc刚诞⽣时,servlet ⼤⾏其道(现在也是),所以springmvc是完全基于servlet的。servlet的设计思路即为内部启动了⼀个线程池,池内会有⼀定数量的空闲线程,当有http请求进⼊时,servlet会从线程池中获取⼀空闲线程负责http请求的执⾏。因此springmvc的并发数极容易受线程池容量的限制,当请求数超过线程池的容量时,会直接导致请求⽆法被处理。因此在⾼并发场景下,如果使⽤springmvc,只能通过增加线程池上限(但线程数量⼜受cpu核⼼数和操作系统允许的open fifiles数量限制)和扩展物理机器的⽅式来增加整个系统的吞吐量。其实对于servlet来将,其⾃身是典型的io密集型⽽⾮cpu密集型,是不需要启动很多线程的,在这种背景下spring webflflux应运⽽⽣。
      Spring 5中引⼊了⼀个基于 Netty ⽽不是 Servlet 的⾼性能的 Web 框架,Netty是⼀款基于NIO(Nonblocking I/O,⾮阻塞IO)开发的⽹络通信框架,对⽐于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很⼤提⾼。spring webflflux将Netty和Reactor集成在⼀起⽤于处理web请求(所以说spring webflflux的诞⽣离不开nio和Reactive Programming),能充分发挥两者的优势,从⽽极⼤增加框架的吞吐量(业务⾃身的耗时不在讨论之列)。我们先来看以下两者的编程区别,当我们使⽤springmvc时,通常的编码是这样的:
        @RestController
        @RequestMapping("/test")
        public class TestController {
        @RequestMapping("/hello")
        public String demo(){
        return "hello world";
        }


        }        
        当我们使⽤spring webflflux时,通常的编码是这样的:
          @RestController
          @RequestMapping("/test")
          public class TestController {
          @RequestMapping(value = "/foobar")
          public Mono<String> demo() {
          return Mono.just("hello world");
          }


          }
          两者的变化即为由之前的直接返回⽬标对象改为返回⼀个Mono/Flux对象(Mono和Flux的介绍在后⾯)。
          这⾥可以看到,当我们使⽤Reactor编程时,⼤部分情况下并不需要直接接触Publisher、Subscriber、Subcription等接⼝,因为这些接⼝已经被spring webflflux封装了,我们只需要构建我们的Mono/Flux对象即可,spring webflflux会⾃动把Mono/Flux对象所包含的任务推送到任务队列中并被Reactor和Netty处理。
          此外,当我们使⽤feign进⾏服务调⽤时,在springmvc框架下和spring webflflux框架下,所使⽤的feign是不同的。在springmvc框架下,通常使⽤ spring-cloud-st
          arter-openfeign (内部实现是feign+ribbon)来进⾏http服务调⽤;在spring webflflux下,⽬前还没有正式的框架可以使⽤,不过有⼀个官⽅还为正式发布的可以使⽤ feign-reactor-spring-cloud-starter ,不过⽬前此项⽬还在孵化器中。
          Reactor使用
          具体的说是Mono和Flux的使⽤,先放⼀段Reactor中关于Mono和Flux的介绍:
          • Mono

          A Reactive Streams Publisher with basic rx operators that completes successfullyby emitting an element, or with an error. The recommended way to learn about theMono API and discover new operators is through the reference documentation,rather than through this javadoc (as opposed to learning more about individualoperators). See the "which operator do I need?" appendix The rx operators willoffer aliases for input Mono type to preserve the "at most one" property of theresulting Mono. For instance flflatMap returns a Mono, while there is a flflatMapManyalias with possibly more than 1 emission. Mono should be used for Publisher thatjust completes without any value. It is intended to be used in implementations andreturn types, input parameters should keep using raw Publisher as much aspossible. Note that using state in the java.util.function / lambdas used within Monooperators should be avoided, as these may be shared between severalSubscribers.
          • Flux

          A Reactive Streams Publisher with rx operators that emits 0 to N elements, andthen completes (successfully or with an error). The recommended way to learnabout the Flux API and discover new operators is through the referencedocumentation, rather than through this javadoc (as opposed to learning moreabout individual operators). See the "which operator do I need?" appendix . It isintended to be used in implementations and return types. Input parameters shouldkeep using raw Publisher as much as possible. If it is known that the underlyingPublisher will emit 0 or 1 element, Mono should be used instead. Note that usingstate in the java.util.function / lambdas used within Flux operators should beavoided, as these may be shared between several Subscribers.subscribe(CoreSubscriber) is an internal extension to subscribe(Subscriber) usedinternally for Context passing. User provided Subscriber may be passed to this"subscribe" extension but will loose the available per-subscribeHooks.onLastOperator.

          通俗来说,Mono就是Reactor中⼀个基础的任务单元,或者说是⼀段lamda表达式指令。所以我们查看Mono的各种⽅法时,⼊参基本都是 FunctionalInterface ,这这段指令内,我们可以接收若⼲参数,同时返回若⼲结果值;甚⾄可以返回另⼀个Mono对象。

          Flux是Mono的集合,Mono是Reactor中的基础任务单元,Flux就是若⼲个基础单元的集合。因此Flux更适合处理并发任务(作为类⽐,Mono是串⾏的)。
          下面介绍Mono的⼀些创建⽅式。
          从对象创建:
            Mono.just("hello  world")//从字符串创建⼀个Mono对象,当任务被执⾏时,会返回此字符串
            .subscribe();
            Mono.just(123)//从int创建⼀个Mono对象
            .subscribe();
            Mono.empty()//创建⼀个不返回任何值的Mono对象,函数原型为`public static <T> Mono<T> empty()`,
            //因此返回值可以被当作任何类型
            .subscribe();
            RunnableFuture等对象创建,基本都是使用Mono.fromXxxx()方法创建
              Mono.fromRunnable(()->{
              //从Runnable创建,这⾥使⽤了lamda表达式,当Mono被执⾏时,此lamda包裹的代码段会被执⾏
              System.out.println("hello world");
              });
              Mono.fromCallable(() -> {//从Callable创建,和Runnable的区别是带返回值
                      return "Hello  world"; 
              })
              .subscribe()
              创建⼀个Mono,且这个Mono会创建另⼀个Mono,此场景使⽤还是⽐较多的
                Mono.defer(()->{
                return Mono.just("hello world");//注意这⾥返回了⼀个Mono,⽽⾮其他类型
                }).
                subscribe();
                在上⼀个Mono处理完后,如果需要处理另⼀个Mono,可以使⽤Mono.then()⽅法进⾏连接:public final <V> Mono<V> then(Mono<V> other)
                  Mono.empty()
                      .then(Mono.fromRunnable(() -> {
                  System.out.println("hello world");
                  }))
                  .subscribe();
                  接收参数(消费者)
                    Mono.just("hello world")
                            .doOnNext((e) -> {//e的类型为Mono.just()返回的类型
                                System.out.println(e);//打印hello  world
                    })
                    .doOnNext((e) -> {//这⾥调⽤了两次doOnNext,因为变量会在doOnNext间传递
                    System.out.println(e);//打印hello world
                              }) 
                    .subscribe();
                    变量转换(变量变形,就是编程中的map操作)
                      Mono.just("hello world")
                      .map((e) -> {
                      System.out.println(e);//打印hello world
                      return "simon's dream";//这⾥返回了⼀个新的变量
                      })
                      .doOnNext((e) -> {
                      System.out.println(e);//打印simon's dream
                      }) .subscribe();
                      Mono.just("hello world")
                      .flatMap((e) -> {
                      //这⾥使⽤了flatMap,flatMap和map的区别是,
                      //map是直接转换,flatMap是返回⼀个包裹了新值的Mono
                      System.out.println(e);//打印hello world
                      return Mono.just("simon's dream");
                      })
                      .doOnNext((e) -> {
                      System.out.println(e);//打印simon's dream
                      }) .subscribe();

                      缓存。当调⽤Mono.cache()⽅法后,当前产⽣的值会被缓存下来,当有新的订阅者加⼊后,Mono任务会从调⽤了cache()⽅法的位置开始执⾏,⽽⾮从头开始
                        Mono a1 = Mono.defer(() -> {
                        System.out.println("start");
                        return Mono.just("a"); });
                        a1.subscribe((e) -> {
                        System.out.println("s1:" + e); });
                        a1.subscribe((e) -> {
                        System.out.println("s2:" + e); });
                        System.out.println("==============================");
                        a1 = a1.cache() ;
                        a1.subscribe((e) -> {
                        System.out.println("s21:" + e); });
                        a1.subscribe((e) -> {
                        System.out.println("s22:" + e); });
                        输出:
                        start < - 输出start
                        s1:a
                        start < - 输出start
                        s2:a
                        ==============================
                        start < -输出start
                        s21:a
                        s22:a
                        可以看到,当调⽤了cache后,新的subscribe会从cache处开始执⾏
                        异常处理
                          Mono.just("hello world")
                          .map((e) -> {
                          throw new RuntimeException("这是⼿动扔出的异常");
                          }).doOnError(e -> {
                          //doOnError会拦截到异常,但不会捕获异常,这⾥可以打印异常的信息,同时异常会被继续向上抛
                          e.printStackTrace();
                          }).onErrorResume(e -> {
                          //onErrorResume会捕获异常,同时产⽣⼀个新的Mono继续后续其他操作
                          e.printStackTrace();
                          return Mono.just("simon's dream");
                          }).doOnNext(e -> {
                          System.out.println(e);//打印simon's dream
                          }).doOnSuccess(e -> {
                          System.out.println(e);//打印simon's dream
                          }) ;
                          repeat,如果循环执⾏某个Mono,可以使⽤repeatWhen(),repeatWhen()会根据条件判断是否循环执⾏
                            int count = 0;
                            Mono.fromRunnable(() -> {
                            System.out.println("hello world");//会被打印3次
                            })
                            .repeatWhen(Repeat.onlyIf((e) -> {
                            count++;
                            return count < 3;
                            }))
                            .subscribe() ;
                            retry,如果需要在出现error时⾃动重试,可使⽤retryWhen()
                              int count = 0;
                              Mono.fromRunnable(() -> {
                              System.out.println("hello world");//会打印三次
                              throw new RuntimeException("⼿动扔出的异常");
                              })
                              .retryWhen(Retry.onlyIf((e) -> {
                              count++;
                              return count < 3;
                              }))
                              .onErrorResume((e) -> {
                              System.out.println(e.getMessage());//打印⼀次
                              return Mono.empty();
                              })
                              .subscribe() ;
                              fifilter 过滤,对上⼀步产⽣的值进⾏判断,并返回true/false。当返回true时,后续的的doOnNext可以正常执⾏;当返回false时,后续的doOnNext会被忽略,其他不需要接收参数的任务不受影响,例如then等
                                Mono.just(10)
                                .filter((e) -> {
                                return e > 5;
                                })
                                .doOnNext((e) -> {
                                System.out.println("after filter1:" + e);//正常输出
                                })
                                .filter((e) -> {
                                return e < 5;
                                })
                                .doOnNext((e) -> {
                                System.out.println("after filter2:" + e);//没有输出
                                })
                                .then(Mono.fromRunnable(() -> {
                                System.out.println("then");//正常输出
                                }))
                                                .subscribe();
                                doOnSuccess、doOnError、doFinally
                                当所有mono正常执⾏完毕,没有出现任何异常时,doOnSuccess会被执⾏;当有任何异常且异常未被捕获时,doOnError会被执⾏(注意doOnError⾃身并不会捕获异常);doFinally在任何条件下都会执⾏
                                  Mono.just("hello world")
                                  .doOnSuccess((e) -> {
                                  System.out.println(e);//打印hello world
                                  })
                                  .doFinally((e) -> {
                                  System.out.println("doFinally");//打印doFinally
                                  })
                                  .subscribe();
                                  Mono.just("hello world")
                                  .doOnNext((e) -> {
                                  System.out.println(e);//打印hello world
                                  throw new RuntimeException("⼿动扔出的异常");
                                  })
                                  .doOnSuccess((e) -> {//因为出现了Exception,因此doOnSuccess并不会被执⾏
                                  System.out.println("doOnSuccess:" + e);
                                  })
                                  .doOnError((e) -> {
                                  System.out.println(e.getMessage());//打印异常信息
                                  })
                                  .doFinally((e) -> {
                                  System.out.println("doFinally");//打印doFinally
                                  })
                                  .subscribe();
                                  zip 并发。我们可以使⽤zip⽅法来同时执⾏多个Mono,并把他们的结果汇总在⼀起。注意以下多个Mono实际是并发执⾏的
                                    Mono<Integer> mono1 = Mono.just(1);
                                    Mono<Integer> mono2 = Mono.just(2);
                                    Mono<Integer> mono3 = Mono.just(3);
                                    Mono.zip(mono1, mono2, mono3)
                                    .doOnNext(e -> {
                                    System.out.println("结果:" + e);
                                    //输出[1,2,3],注意e是⼀个Tripple类型
                                    })
                                    .subscribe()
                                    也可以⽤这种写法
                                      Mono<Integer> mono1 = Mono.just(1);
                                      Mono<Integer> mono2 = Mono.just(2);
                                      Mono<Integer> mono3 = Mono.just(3);
                                      Mono.zip((e) -> {
                                      return e;
                                      }, mono1, mono2, mono3)
                                      .doOnNext(e -> {
                                      System.out.println("结果:" + Arrays.asList(e));
                                      //输出[1,2,3],注意e是⼀个Object[]类型
                                      })
                                      .subscribe() ;
                                      Mono的⽤法基本就是这些了,实际使⽤时就是这些⽤法的排列组合。
                                      可以看到,Reactive Programming和Imperative Programming编程思路还是相差很⼤的,Reactive就是各种代码段的组合,然后变量会在各个代码段之间流动;ImperativeProgramming则是定义各个⽅法,然后各个⽅法之间互相调⽤。因此在使⽤Reactor时,需要考虑清除业务的处理流程,然后将整个业务流程拆分成⼀段⼀段的过程,然后⽤Mono去实现。
                                      再来说说Flux,⾸先先看⼀下Mono和Flux的解释:
                                      • Mono

                                      • FLux

                                      可以看到,Mono每次是对单个任务进⾏操作,Flux是对多个任务同时进⾏操作,因此Flux更适合多元素处理的场景。
                                      这⾥先⽤前⾯的fifilter来演示⼀下Flux是运作的
                                        Flux.just(1,2,3,4,5,6,7,8,9)//这⾥可以理解为同时产⽣了多个Mono,且每个Mono包裹了⼀个元素,
                                        //每个Mono会被分别交给后续的任务进⾏处理
                                        .filter((e) -> { //每个元素都会被此过滤器过滤,其中通过的元素会流动到后续其他任务
                                        return e > 5;
                                        })
                                        .collectList()//收集所有元素并产⽣⼀个List
                                        .doOnNext((list) -> {//这⾥的⼊参是⼀个List
                                        System.out.println("after filter1:" + list);//输出 after filter1:[6, 7, 8, 9]
                                        })
                                        .then(Mono.fromRunnable(() -> {
                                        System.out.println("then");//输出then
                                        }))
                                        .subscribe() ;x.fromIterable(list) .reduce(set, (initial, e) -> {//将每个元素过滤并添加到Set中 if (e > 5) { initial.add(e); } return initial; }) .doOnNext(e -> {//e是Set类型 System.out.println("结果:" + e);//输出:[6, 7, 8, 9] }) .subscribe() ;
                                        Flux的另⼀个特点是能对结果集执⾏reduce操作
                                          List<Integer>  list  =  Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
                                          Set set = new HashSet();
                                          Flux.fromIterable(list)
                                          .reduce(set, (initial, e) -> {//将每个元素过滤并添加到Set中
                                          if (e > 5) {
                                          initial.add(e);
                                          }
                                          return initial;
                                          })
                                          .doOnNext(e -> {//e是Set类型
                                          System.out.println("结果:" + e);//输出:[6, 7, 8, 9]
                                          })
                                          .subscribe() ;
                                          我们在了解了这些后,就可以在spring webflflux编写Reactor⻛格的程序了。

                                          总结
                                          本⽂开始先介绍了lamda与FunctionalInterface,因为在Reactor中⼤量⽤到了它们;然后介绍了Reactive Programming、Reactive Streams和Reactor的区别与联系,只有理解了这些才能理解Reactor编程的思路;之后对⽐了Thread per Connection 和 Reactor in Single Thread及spring mvc和spring webflflux,⽤来对⽐当下⽐较流⾏的处理http请求的⽅式;在最后列举了Reactor中Mono和Flux的常⻅⽤法。
                                          写在最后
                                          Reactor在编程复杂度和代码可读性上,相⽐传统的⽅式更加的复杂,因此对编写者的要求较⾼,⽽且当Reactor程序出现问题时排查问题也较为困难。但Reactor在⾼并发场景下优势更加明显,其编写的程序吞吐量相对传统⻛格程序有质的提升。因此我们需要仔细评估可能带来的技术⻛险和对程序性能的提升,最终确定是否需要在项⽬中使⽤。
                                          关于领创集团
                                          (Advance Intelligence Group)
                                          领创集团成立于2016年,致力于通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈。集团旗下包含企业业务和消费者业务两大板块,企业业务包含ADVANCE.AI和Ginee,分别为银行、金融、金融科技、零售和电商行业客户提供基于AI技术的数字身份验证、风险管理产品和全渠道电商服务解决方案;消费者业务Atome Financial包括亚洲最大的先享后付平台Atome和数字信贷服务。今年9月,领创集团宣布完成超4亿美元D轮融资,融资完成后领创集团估值已超20亿美元,成为新加坡最大的独立科技创业公司之一。
                                          往期回顾
                                          BREAK AWAY
                                          技术创想 | Python 事件循环及协程原理

                                          技术创想 | GitOps 系列之 Flux 实践篇


                                             技术创想 | 一个空格引发的DuplicateException





                                          文章转载自领创集团Advance Group,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                          评论