暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Hello , Spring Cloud Stream

产教Code 2019-11-15
338

今天给各位带来一个新的 Spring 组件 , Spring Cloud Stream

用官方文档解释 Spring Cloud Stream is a framework for building message-driven microservice applications. Spring Cloud Stream builds upon Spring Boot to create standalone, production-grade Spring applications and uses Spring Integration to provide connectivity to message brokers
,  译文Spring Cloud Stream是用于构建消息驱动的微服务应用程序的框架。Spring Cloud Stream基于Spring Boot来创建独立的生产级Spring应用程序,并使用Spring Integration提供与消息代理的连接。
,

通俗解释 , Spring Cloud Stream
Spring Integration (以前文章提过不再赘述)
做了封装 , 提供了多家消息中间件的内置配置 , 隐藏了具体执行代码的中间件厂商  。几乎所有的消息中间件分为 发送消息 、接收消息 , 只是各自厂商执行的代码不同 , 通过 Spring 的配置可切换不同的中间件厂商 ,从而只用关注 Spring Cloud Stream
层面

需求环境要求安装 kafka | rabbitMQ
任选其一或都可安装 , 安装过程不再讲解贴出链接参考 , 下面直接建立项目演示 , 项目中用 kafka
演示 , gradle
搭建(用 maven 的不影响)

kafka: https://blog.csdn.net/no_can_no_bb_/article/details/85232790

rabbitMQ: https://blog.csdn.net/newbie_907486852/article/details/79788471

  1. 创建 Spring Boot 项目 , 依赖如下

        implementation 'org.springframework.cloud:spring-cloud-stream'
       testImplementation 'org.springframework.cloud:spring-cloud-stream-test-support'
       compile 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
       compile 'org.springframework.boot:spring-boot-starter-web'

既然是 Spring Cloud Stream 用 Kafka 的消息渠道来做消息的发送那就是整合 Kafka , 下面是整合 Kafka 的配置文件 , 演示需要 , 消息发送和接收就在同一个项目上运行 , 安装 Kafka 的可以开启消费者窗口查看自己演示结果

spring:
application:
  name: test-stream
cloud:
  stream:
    default-binder: kafka #绑定默认的消息中间件
    bindings:
      input:
        destination: test #绑定消息的输入通道的主题(订阅等 , 不同的消息中间件叫法不同 , 同下)
      output:
        destination: test #绑定消息的输出通道的主题(...)
    kafka:
      binder:
        brokers: localhost:9092  #zookeeper 服务器地址
        replication-factor: 1    #主题的副本数量 , 不能超过 zookeeper 的服务器数量
        auto-create-topics: true #是否开启自动创建主题
server:
port: 8085 #tomcat 端口

创建一个类来表示消息的发送者

package com.ys.teststresm;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
* @author 云生何处不见风
* @version 1.0
* @date 2019/11/15 10:45
* @singature 自古风云出我辈 , 一入江湖代码催
* @description todo
*/
@Component
public class TestStream
{
   /**
    * Spring Cloud Stream 内置的 Sink 、Source 、Processor 三个消息通道(输入或输出或兼具)
    */
   @Autowired
   private Source source;


   void send(String str){
       source.output().send(MessageBuilder.withPayload(str).build());
  }
}

好了 , 创建了发送者我们再来建立一个 Controller , 更方便的看到消息的发送

package com.ys.teststresm;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* @author 云生何处不见风
* @version 1.0
* @date 2019/11/15 10:53
* @singature 自古风云出我辈 , 一入江湖代码催
* @description todo
*/
@RestController
public class TestController
{
   /**
    * 自动注入创建的 消息发送者 类
    */
   @Autowired
   private TestStream testStream;

   /**
   * 访问发送消息
   * {s} 为访问路径携带的变量
   */
   @RequestMapping("/t/{s}")
   public String test(@PathVariable String s)
  {
       testStream.send(s);
       return "ok";
  }

   /**
    * 注解 @StreamListener(Processor.OUTPUT) 为 Spring Cloud Stream 的消息监听器 , 参数为要监听的输入通道或输出通道
    * @param str 传递的消息
    */
   @StreamListener(Processor.OUTPUT)
   private void monitor(String str){
       System.out.println("监听到消息:" + str);
  }
}

附上启动类

package com.ys.teststresm;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;

/**
* @author 云生何处不见风
*/
@SpringBootApplication
@EnableBinding(Processor.class)
public class TestStreamApplication {
   public static void main(String[] args) {
       SpringApplication.run(TestStreamApplication.class, args);
  }
}

项目启动之前必须启动 Zookeeper 和 Kafka 服务 , Kafka 为消息队列运行在 Zookeeper 环境

项目启动好后 , 访问 localhost:8085/t/hello

看到访问成功 , 接着看下控制台输出

Spring Cloud Stream
也已经监听到输出通道中的消息

至此Spring Cloud Stream
整合kafka
已经完毕 , 以上只是最简单方便的一种整合方式 , 有兴趣的同学可以查看官网 , Spring Cloud Stream
可以整合大部分消息中间件 , Spring Cloud Stream
封装了 Spring Boot 和 Spring integration
从而屏蔽中间件只关注于某个层面(Spring Cloud Stream 定义的规范)
就可进行系统间的轻量级通信(相对), 从而可以看出 Spring integration
支持的协议、中间件等Spring Cloud Stream
大都会都会支持

有没有同学觉得 Spring Cloud Stream
有什么用 , 这个组件定义了规范来规定消息如何发送接收 , 具体的代码实现交由中间件厂商实现 , 再两个系统之间互相通信而消息中间件不同的情况下如何通信呢 ? Spring Cloud Stream
解决了这个问题(包括不限于) , 消息的发送和接收的规范都用我定义好的 , 至于发送和接受的中间件交给配置文件中的中间件厂商信息(个人理解 , 有误私信)

有兴趣的同学可以结合官网或上方的简单示例进一步的开发 , 今天的技能点同学们觉得怎么样? 欢迎留言讨论 ,下期再见 (有疑问或觉得文章有问题的同学可以产教融合班 QQ 群中回复 , 或私信我 QQ:930565758)


文章转载自产教Code,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论