暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Nats 入门

Golang学习杂记 2021-07-11
1085

NATS
是一个开源的分布式消息队列系统,基于消息发布订阅机制。

使用NATS我们的程序可以在不同的环境,不同的语言之间进行通信。客户端一般使用一个URL连接到NATS服务端,然后订阅消息或者发布消息。

基于主题的消息机制

NATS的消息基于subject,发送的消息和订阅的消息都有一个subject标记,简单来说,subject就是一串字符串,发布者和订阅者可以使用这些名称来查找对方

NATS 服务端保留了几个特殊的字符,规范的subject只包括字母数字
.
,并且区分大小写不能包含空格。

主题层次

使用.
我们可以创建一个具有层次的主题,如下:

time.us
time.us.east
time.us.east.atlanta
time.eu.east
time.eu.warsaw

通配符

NATS提供了两种通配符,订阅者可以使用通配符同时订阅多个主题

  • *
    :匹配单个符号
    • 同一个subject中可以出现多次
    • time.*.east
      可以匹配time.us.east
      也可以匹配time.eu.east
      ,但是不能匹配time.us.xxx.east
  • >
    :可以匹配多个符号,但是只能出现在主题的最后部分
    • 同一个subject中只能出现一次
    • time.us.>
      可以匹配time.us.east
      ,也可以匹配time.us.east.atlanta

发布订阅模式

NATS实现了一对多通信的发布订阅消息分发模型,publisher在一个主题上发布一条信息之后,所有的subscriber都可以接收到对应的消息。

请求回复模式

Request-Reply
是现代分发系统中的常见模式。请求发送之后,应用或者在超时时间里等待消息的到来,或者异步接收响应。

NATS支持多个响应,其中第一个响应被使用,后面的消息系统会有效的忽略掉,这能够让拥有多个响应器的复杂系统减少响应时延和抖动。

队列组

NATS提供了内置的负载均衡--分布式队列。使用队列订阅者可以在一组订阅者之间平衡的消耗消息,提供应用程序容错和扩展工作负载处理。

创建一个队列订阅的时候,订阅者注册一个队列名称,所有有相同队列名称的订阅者组成了一个队列组,当注册的主题消息发布的时候,队列组中的仅有一个成员会接收到消息。

使用

服务端安装

NATS server 安装十分简单,我们有多种方式安装,下面提供几种方式:

二进制文件

Release[1] 界面下载对应系统下的zip文件,解压之后运行即可。

Docker安装

使用下面的命令可以运行一个NATS server

docker pull nats:latest
docker run -p 4222:4222 -it nats:latest

客户端

首先我们需要下载对应的包,NATS客户端支持多门语言,比如Java,Ruby,Rust等等,这里以Go为例子

go get github.com/nats-io/nats.go/

安装之后,我们可以连接到服务端,需要服务端已经开启

// 默认情况下,我们连接到`nats://localhost:4222`即可
// 我们也可以指定地址  nats.Connect("nats://demo.nats.io:4222")
// 或者 nats.Connect("demo.nats.io")
nc, err := nats.Connect(nats.DefaultURL)

if err != nil {
    log.Fatal(err)
}
defer nc.Close()

我们也可以连接到一个cluster
里面

servers := []string{"nats://127.0.0.1:1222""nats://127.0.0.1:1223""nats://127.0.0.1:1224"}

nc, err := nats.Connect(strings.Join(servers, ","))

然后我们就可以通过server进行消息的发布和订阅了,为了能够更加容易的看到结果,我们可以下载对应的 tools[2]

发布一个消息

打开另一个终端,使用go-nats-examples中提供的nats-sub
订阅一个subject,使用下面的代码发布一个消息


err = nc.Publish("demo.test", []byte("Hello nats !"))
if err != nil {
  log.Fatalln("Publish message err: ", err)
}
log.Println("Publish message success")

同步订阅消息

使用nats-pub
发布一个消息,下面的代码可以接收到对应的消息

// 订阅主题
sub, err := nc.SubscribeSync("demo.test")
if err != nil {
  log.Fatal(err)
}

// 等待消息,并且设置超时时间
msg, err := sub.NextMsg(10 * time.Second)
if err != nil {
  log.Fatal(err)
}

log.Printf("Reply: %s", msg.Data)

异步订阅消息

// 用来等待消息到来
wg := sync.WaitGroup{}
wg.Add(1)

// 异步订阅
if _, err := nc.Subscribe("demo.test"func(m *nats.Msg) {
    wg.Done()
}); err != nil {
    log.Fatal(err)
}

wg.Wait()

取消订阅

如果我们对一个主题不在感兴趣,那么我们可以取消订阅

sub, err := nc.SubscribeSync("demo.test")
if err != nil {
    log.Fatal(err)
}
// 同步订阅取消
if err := sub.Unsubscribe(); err != nil {
    log.Fatal(err)
}

sub, err = nc.Subscribe("demo.test"func(_ *nats.Msg) {})
if err != nil {
    log.Fatal(err)
}
// 异步订阅取消
if err := sub.Unsubscribe(); err != nil {
    log.Fatal(err)
}

我们也可以在接收到特定数量的消息之后取消订阅

n := 10 
if err := sub.AutoUnsubscribe(n); err != nil {
    log.Fatal(err)
}

回复消息

接收到消息之后,我们还可以对该消息进行回复


timeAsBytes := []byte(time.Now().String())

// 将时间作为响应
msg.Respond(timeAsBytes)

队列组消息

使用队列接收消息,多个subscriber不会接收到同样的消息,使用方式和subscribe
类似

queue, err := nc.QueueSubscribeSync("demo.test""queue")
if err != nil {
  log.Fatal(err)
}
msg, err := queue.NextMsg(time.Second * 10)
if err != nil {
  log.Fatal(err)
}
log.Printf("Msg: %s", msg.Data)

响应式发布

我们发布消息的时候,有时候需要用户接收到之后进行回复

nc.PublishRequest("demo.test.request""demo.test.response", []byte("PublishRequest"))

// 然后订阅者接收到消息之后,进行Response
sub, err := nc.SubscribeSync("demo.test.request")
if err != nil {
  log.Fatalln(err)
}

msg, err := sub.NextMsg(time.Second)

msg.Respond([]byte("response"))

基于nats搭建一个聊天室

聊天室中,我们需要接收一个房间中所有用户的消息,nats的发布订阅功能正好满足我们的需求,下面是一个简单的聊天程序

package main

import (
 "bufio"
 "fmt"
 "log"
 "os"

 "github.com/nats-io/nats.go"
)


func main() {
 nc, err := nats.Connect(nats.DefaultURL)
 if err != nil {
  log.Fatalln(err)
 }
 defer nc.Close()
 reader := bufio.NewReader(os.Stdin)
 fmt.Print("Input your username: ")
 username, err := reader.ReadString('\n')
 if err != nil {
  log.Fatalln(err)
 }

  // 发布用户加入房间
 err = nc.Publish("chatroom.join", []byte(fmt.Sprintf("welcome, %s", username)))
 if err != nil {
  log.Fatalln(err)
 }
 nc.Flush()

  // 订阅用户加入房间
 nc.Subscribe("chatroom.join"func(msg *nats.Msg) {
  fmt.Printf("%s", msg.Data)
 })

  // 订阅用户发送消息
 nc.Subscribe("chatroom.msg"func(msg *nats.Msg) {
  fmt.Printf("recv: %s", msg.Data)
 })

 for {
    // 读取用户发送的消息
  msg, err := reader.ReadBytes('\n')
  if err != nil {
   log.Fatalln(err)
  }

    // 发送消息
  nc.Publish("chatroom.msg", msg)
  nc.Flush()
 }
}

上面的代码虽然能够实现一个简单的聊天功能,但是还有很多不足之处,比如自己也会收到同样的消息,比如提示输入内容,不过基本的聊天还是可以的~

参考资料

[1]

Release: https://github.com/nats-io/nats-server/releases

[2]

tools: https://github.com/nats-io/go-nats-examples/releases


文章转载自Golang学习杂记,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论