暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

浅谈响应式编程

Share and Fun喜来分 2020-12-18
1165
什么是响应式编程

什么是响应式编程?网上给出各种各样的定义,包括 wiki 里面所描述的抽象概念,简单来说是通过异步数据流的方式实现编程。

如上图所示,对于响应系统来说,系统外部的请求都会转化成消息的方式形成一个消息流,系统跟外部的交互也是通过异步消息的方式进行通讯,而异步返回的结果又会形成一个新的消息流在系统内部处理。所以响应式系统的内部代码实现,更像声明式的编程方式对消息流逻辑处理进行声明式定义。

从响应式系统的角度看,业界响应式系统的四大特质也是很好阐述了响应式编程:

  • Responsive 即时响应性

只要有可能,系统就会及时地做出响应。即时响应是可用性和实用性的基石,而更加重要的是,即时响应意味着可以快速地检测到问题并且有效地对其进行处理。即时响应的系统专注于提供快速而一致的响应时间,确立可靠的反馈上限,以提供一致的服务质量。这种一致的行为转而将简化错误处理、建立最终用户的信任并促使用户与系统作进一步的互动。

  • Resilient 回弹性

系统在出现失败时依然保持即时响应性。这不仅适用于高可用的、任务关键型系统——任何不具备回弹性的系统都将会在发生失败之后丢失即时响应性。回弹性是通过复制、遏制、隔离以及委托来实现的。失败的扩散被遏制在了每个组件内部,与其他组件相互隔离,从而确保系统某部分的失败不会危及整个系统,并能独立恢复。每个组件的恢复都被委托给了另一个(外部的)组件,此外,在必要时可以通过复制来保证高可用性。因此组件的客户端不再承担组件失败的处理。

  • Elastic 弹性

系统在不断变化的工作负载之下依然保持即时响应性。响应式系统可以对输入(负载)的速率变化做出反应,比如通过增加或者减少被分配用于服务这些输入(负载)的资源。这意味着设计上并没有争用点和中央瓶颈,得以进行组件的分片或者复制,并在它们之间分布输入(负载)。通过提供相关的实时性能指标,响应式系统能支持预测式以及响应式的伸缩算法。这些系统可以在常规的硬件以及软件平台上实现成本高效的弹性。

  • Message Driven 消息驱动

响应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。这一边界还提供了将失败作为消息委托出去的手段。使用显式的消息传递,可以通过在系统中塑造并监视消息流队列,并在必要时应用回压,从而实现负载管理、弹性以及流量控制。使用位置透明的消息传递作为通信的手段,使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。非阻塞的通信使得接收者可以只在活动时才消耗资源,从而减少系统开销。

从图中可以看出响应式系统的价值在于提供了即时响应性、可维护性和可扩展性,表现的形式则是回弹性和弹性,而实现的手段则是消息驱动。简单来说就是异步数据流的编程方式。

我们为什么要使用响应式编程

讲到这里,我们还是回归到计算机的软件跟硬件交互上,比如敲打键盘的时候会产生很多键盘事件,使用智能手机时会产生很多触屏事件,再者是我们网上购物无一不是通过点击按钮、滚动页面等事件来操作的,或再者车辆传感器跟平台交互的都是以事件消息方式上报。实际上我们可以把这些事件消息当成消息流,从系统外传到系统内部,然后通过异步的方式响应到系统外部,如此看来响应式编程在软件跟硬件交互上是有着与生俱来的优势。

  • 异步

响应式编程是采用异步的交互方式,说到异步我们传统的系统就有很多的优化空间了。在传统的 Web 容器中,比如 Tomcat,一个请求就一个线程,先不说操作系统是有着线程数量的限制,线程多了自然也是比较占内存,更重要的是线程多了线程切换就变得很频繁,线程切换本身就是很耗资源的,很大程度的影响了程序性能。可见采用响应式编程,在性能上有很大的优化空间。大家都知道,Netty 对网络异步高性能编程已经是发挥得淋淋尽致,Netty 的 NIO Reactor 模型就是异步、事件驱动的最好实现,大家有兴趣可以深入了解一下 Netty 的 NIO Reactor 模型。

  • 消息流

上面谈到了异步对系统的好处,在这里我们也聊聊消息流。Stream 在 JDK8 版本上就开始实现了,Stream 的filter
map
大家可能已经用的滚瓜烂熟,Stream 的出现使得数据处理变得更加简单跟规范化。目前开源的各种响应式编程框架对流处理都提供了更加丰富 API,包括对线程处理的封装,也很大程度上降低了大家对线程模型的使用难度,比如针对计算密集的数据处理的数据流,我们可能简单设置一下跟 cpu 核数一样的线程数就可以了。

实践:项目中的 Reactor 实现

关于响应式编程,我们在 CV 项目中 SmartGateway 里面已经有了具体的实践,对于初学者来说,可能对响应式的编程方式有点不太适应,编程的思维方式的确跟传统的命令式编程有所差异的,响应式编程更似一种声明式编程风格。在 SmartGatewy 中,我们主要是采用了 Reactor 框架,目前 Spring WebFlux 的底层就是使用了 Reactor 跟 Netty 框架。介绍设计之前,我们还是先简单了解一下 Reactor 两个类:

Flux:表示一个或者多个数据的流
Mono:表示一个数据的流

为了大家更加容易理解,先展现一下简化版的核心类图:

SmartGateway 作为消息中转的应用,主要是集成了Inbound
消息入口、OutBound
消息出口和Handler
消息处理的 3 大类组件,其中Handler
可以分成Transformer
Predicate
的两个子类组件,而Route
就是把 3 大组件串起来,具体实现,我们看一下Route
里面initialize
方法代码:

public void initialize(){
    Flux<Message> messageFlux = inbound.getSources();

    for(Handler handler : handlers) {
        if(handler instanceof Transformer){
            messageFlux = messageFlux.flatMap(message -> ((Transformer)handler)
                                          .transform(Mono.just(message)));
        }else if(handler instanceof Predicate){
            messageFlux =  messageFlux.flatMap(message -> ((Predicate)handler)
                                          .test(Mono.just(message))
                                          .filter(isMatches -> isMatches)
                                          .map(isMatches -> message));
        }
    }

    messageFlux.subscribe(message -> {
        if(outbound!=null){
            outbound.send(message);
        }
    }, error -> logger.error("Handle message error", error));
}

从上面代码中可以看到,组件的逻辑是在对象初始化的时候就做了声明,initialize()
方法声明Inbound
Outbound
的数据出入口的绑定,同时编排了Handler
的子组件Transformer
Predicate
的顺序逻辑处理,同样我们可以在整个过程中监听发生 Error 的数据并做相应的处理。例子代码里面用到了filter
map
这些 Reactor 流处理基本操作,flatMap
则是 Reactor 提供的一个可以异步处理的方法。

我们再看一下消息入口Inbound
的实现例子:

public final Flux<Message> getSources(){
    if(flux==null){
        synchronized (this){
            if(flux==null){
                flux = Flux.<Message>create(sink -> listener =  new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                            sink.next(message);
                        }                   
                        @Override
                        public void onCompleted() {
                            sink.complete();
                        }
                    }).publishOn(Schedulers.parallel()).publish().autoConnect();
            }
        }
    }
    return flux;
}

Reactor 提供的创建Flux
的方式有很多种,上面例子是通过sink
方式来创建Flux
,这里是要自己编写一个监听器跟sink
做对接,把数据传给sink
。例子中的 publishOn(Schedulers.parallel()).publish()
方式则是上面提到设置线程模型的方法,它定义了下游数据处理的线程模型。Reactor 除了上面例子提到的filter
map
flatMap
等流的基础操作以及publishOn(Schedulers.parallel())
设置线程模型的强大功能,同样还提供了onBackpressureBuffer
实现了背压以及onError
错误处理这些强大的功能。Reactor 官方还提供了跟第三软件集成的 reactor-netty、reactor-rabbitmq、reactor-kafka 等一些组件,其中 reactor-netty 我们也是集成在 SmartGateway 里面做 HTTP、WebSocket 的消息入口插件。

扩展:Spring 响应式框架

Spring5 版本之后,Spring 官方已经针对响应式编程提供了一套的框架,主要是用到了 reactor 响应式框架作为底层的实现:

响应式 RESTful 服务:WebFlux
响应式数据访问组件:Spring Data Reactive Repositories(Redis、Cassandra、Mango、Couchbase、R2DBC)
响应式消息通讯组件:Reactive Spring Cloud Stream(Kafka、RabbitMQ)
响应式服务调用:WebClient

总结

响应式编程作为一种新型的编程方式,目前主流的实现技术包括了 RxJava、Akka Streams、Vert.x 和 Project Reactor,其中 Spring5 版本也开始了逐渐支持,当真正落地到项目的各种应用场景上时,还需要大家去探索跟实践。


文章转载自Share and Fun喜来分,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论