在我吐槽了无数次之后,NATS JetStream
终于结束了beta阶段正式进入RC阶段。终于官方也在最近刚刚正式回复了我正式版本在处理几个问题之后就会正式发布。那么在这个比较重要的NATS-Server
特性发布之际聊一下NATS产品本身区别和新特性的使用,还有更多的潜在的区别。
概念区分:NATS-Server / NATS Streaming Server / NATS JetStream
NATS-Server
NATS-Server
(或者叫nats
)是一个开源的、云原生的、高性能的消息传递系统,是NATS的最基础的产品。它的核心是一个发布/订阅(Pub/Sub)系统,客户端可以在不同集群中的服务间nats
进行通讯,而不需要关注具体的消息在哪个服务上。换而言之,客户端可以在任意一个集群的服务端上发布消息,同时在任意集群客户端上尝试读取消息。在官方与其他同类消息队列产品功能对比中,我们也可以管窥一下产品的功能列表。nats
支持多流多服务进行pub/sub
,负载均衡,保障消息最多/最少一次送达,多租户和用户认证等功能。虽然看上去优点很多,但是nats
不是一个应用很广的消息队列的重要原因是,它缺少了一些对消息队列而言很最重要的产品特性,比如持久化支持,比如消息确保一次送达。这意味着当你的消息发送出去之后,你的消息是在处理过程中可能丢失的,甚至是可能送达不到的。
NATS Streaming Server
NATS Streaming Server
(或者叫stan
)是用于尝试解决上面提到的nats
的已存在问题的。stan
添加了持久化功能和消息送达策略支持。stan
中自带了nats
服务端,但是在使用过程中,nats
和stan
不能进行混用。在官方文档中,是这么描述stan
和nats
之间的关系的:
NATS客户端和NATS Streaming Server客户端之间不能相互交换数据。也就是说,如果一个NATS Streaming Server客户端在foo上发布消息,在同一主题上订阅的NATS客户端将不会收到消息。NATS Streaming Server消息是由protobuf组成的NATS消息。NATS Streaming Server要向生产者发送ACK,并接收消费者的ACK。如果与NATS客户端自由交换消息,就会引起问题。
stan
的具体架构如下图:
但是stan
虽然提供了持久化和消息传递策略支持,但是在架构设计上却出现了问题,导致在最开始设计时遗留了很多问题,比如当你确定stan
集群是固定的不能无限制水平扩容(#999),比如不支持多租户功能(#1122),比如客户端无法主动拉取消息只能被推送等等
NATS JetStream
NATS JetStream
(或者叫JetStream
)是NATS基于Raft
算法实现的最新的架构设计尝试解决上述问题的新方案。在区别于原有的stan
功能上,提供了新的持久化功能和消息送达策略,同时支持水平扩容。同时,新的JetStream
也为大消息做了一些优化,不再将这特性功能作为nats
的客户端存在而是嵌入NATS Server
中作为其中的一个功能存在。也就是说,如果在对这几项技术进行选择时,JetStream
应该是最应该被选择的方案。更多详细情况具体可以查看官方的指导文档。
NATS JetStream使用
理论介绍过了,接下来说说实际使用的事情。现在JetStream
还是RC阶段,
编译和启动客户端
下载nats-server
源码,解压之后执行:
cd nats-server-master
go build -o nats-server -ldflags="-s -w -buildid=" .
./nats-server -js
这样就可以启动一个支持JetStream
功能的服务端了。
[54738] 2021/03/02 18:27:02.605197 [INF] Starting nats-server
[54738] 2021/03/02 18:27:02.605236 [INF] Version: 2.2.0-RC.2
[54738] 2021/03/02 18:27:02.605238 [INF] Git: [not set]
[54738] 2021/03/02 18:27:02.605239 [INF] Name: NAFWRGQTR2CHMBIKNPE6R3ZTW2BWV2FWPAZREMHI24IYVM6FVHMVIYLQ
[54738] 2021/03/02 18:27:02.605240 [INF] ID: NAFWRGQTR2CHMBIKNPE6R3ZTW2BWV2FWPAZREMHI24IYVM6FVHMVIYLQ
[54738] 2021/03/02 18:27:02.605658 [INF] Starting JetStream
[54738] 2021/03/02 18:27:02.606062 [WRN] _ ___ _____ ___ _____ ___ ___ _ __ __
[54738] 2021/03/02 18:27:02.606076 [WRN] _ | | __|_ _/ __|_ _| _ \ __| /_\ | \/ |
[54738] 2021/03/02 18:27:02.606077 [WRN] | || | _| | | \__ \ | | | / _| / _ \| |\/| |
[54738] 2021/03/02 18:27:02.606078 [WRN] \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_| |_|
[54738] 2021/03/02 18:27:02.606079 [WRN]
[54738] 2021/03/02 18:27:02.606080 [WRN] _ _
[54738] 2021/03/02 18:27:02.606081 [WRN] | |__ ___| |_ __ _
[54738] 2021/03/02 18:27:02.606082 [WRN] | '_ \/ -_) _/ _` |
[54738] 2021/03/02 18:27:02.606083 [WRN] |_.__/\___|\__\__,_|
[54738] 2021/03/02 18:27:02.606084 [WRN]
[54738] 2021/03/02 18:27:02.606084 [WRN] JetStream is a Beta feature
[54738] 2021/03/02 18:27:02.606085 [WRN] https://github.com/nats-io/jetstream
[54738] 2021/03/02 18:27:02.606092 [INF]
[54738] 2021/03/02 18:27:02.606093 [INF] ----------- JETSTREAM -----------
[54738] 2021/03/02 18:27:02.606095 [INF] Max Memory: 12.00 GB
[54738] 2021/03/02 18:27:02.606096 [INF] Max Storage: 35.79 GB
[54738] 2021/03/02 18:27:02.606098 [INF] Store Directory: "/var/folders/5s/8rczg1gs4wb59y9s22nc3f_r0000gn/T/nats/jetstream"
[54738] 2021/03/02 18:27:02.606099 [INF] ---------------------------------
[54738] 2021/03/02 18:27:02.606399 [INF] Listening for client connections on 0.0.0.0:4222
[54738] 2021/03/02 18:27:02.606512 [INF] Server is ready
编写JetStream DEMO
接下来我们看一下如何使用JetStream
进行消息发布/订阅功能:
// 连接到nats的服务器
conn, err := nats.Connect("nats://127.0.0.1:4222")
if err != nil {
log.Panic(err)
}
defer conn.Close()
// 初始化JetStream功能
js, err := conn.JetStream()
if err != nil {
log.Panic(err)
}
// 判断Stream是否存在,如果不存在,那么需要创建这个Stream,否则会导致pub/sub失败
stream, err := js.StreamInfo(streamName)
if err != nil {
log.Println(err) // 如果不存在,这里会有报错
}
if stream == nil {
log.Printf("creating stream %q and subject %q", streamName, subject)
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{subject},
MaxAge: 3 * 24 * time.Hour,
})
if err != nil {
log.Panicln(err)
}
}
// 订阅消息
sub, err := js.Subscribe(subject, cbHandle, nats.AckAll(), nats.DeliverNew())
if err != nil {
log.Panic(err)
return
}
defer sub.Unsubscribe()
// 发送消息
js.Publish(subject, []byte("Hello World! "+time.Now().Format(time.RFC3339)))
time.Sleep(5 * time.Second)
log.Println("Exiting...")
在这个例子中,有个值得注意的功能需要额外强调一下,在Subscribe
消息时,我们在这里特别声明了nats.DeliverNew()
这个选项。如果不声明,则默认为nats.DeliverAll()
;除了这两个参数,还有一个nats.DeliverLast()
参数,这分别对应了3种开始订阅时的方式:默认方式nats.DeliverAll()
是会读取有效生命周期内的所有消息,甚至包含已被处理的消息;nats.DeliverLast()
是会包含消息队列中的最后一条消息,即使被处理过的消息;nats.DeliverNew()
则只处理订阅之后的新消息。