RabbitMQ
pub/sub模式
发送方发送到交换机, 交换机再路由到不同的队列
Durability
: Durable服务重启后队列存在;Transient服务重启后队列消失- 交换机类型:
fanout
: 一个消息进入,多个消息分到不同消息(需要注意,不负责存储,如果没人接受也会消失)direct
: 路由模式,fanout 升级版,可以根据属性进入到不同的队列topic
: 模糊匹配属性进入到不同的队列; 例如:*.user.*
可以匹配到hello.user.1
,user.#
可以匹配到用户后面所有的user.hello.1
, 而user.*
是无法匹配到的。
fanount
创建队列
创建交换机
交换机绑定队列
direct
创建交换机
绑定队列
user 的消息发送到q1, room 的消息发送到q2
测试消息
队列接受消息
Golang
安装依赖包
$ go get -u github.com/streadway/amqp
简单模式
package main
import (
"fmt"
"github.com/streadway/amqp"
"time"
)
// 普通的发送与接受
func main() {
// 建立amqp连接
conn, err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
if err != nil {
panic(any(err))
}
// 建立amqp通道
ch, err := conn.Channel()
if err != nil {
panic(any(err))
}
// 创建队列(只有不存在才会创建
declare, err := ch.QueueDeclare(
"gq1",
true, // 重启后队列还存在
false, // 不自动删除
false, // 是否加排他锁
false, // 是否不等待结果
nil)
if err != nil {
panic(any(err))
}
// 接受消息
go consume(conn, declare.Name)
// 发送消息
for i := 0; i < 3000; i++ {
err := ch.Publish(
"", // 交换机名称
declare.Name,
false,
false,
amqp.Publishing{
Body: []byte(fmt.Sprintf("go send message: %d", i)),
},
)
if err != nil {
fmt.Println(err.Error())
}
time.Sleep(200 * time.Millisecond)
}
}
func consume(conn *amqp.Connection, queueName string) {
ch, err := conn.Channel()
if err != nil {
panic(any(err))
}
deliveries, err := ch.Consume(queueName, "go-consumer", true, false, false, false, nil)
if err != nil {
return
}
// 打印消息体
for msg := range deliveries {
fmt.Printf("%s\n", msg.Body)
}
}
pub/sub模式
package main
import (
"fmt"
"github.com/streadway/amqp"
"time"
)
const exchangeName = "go_exchange1"
// 普通的发送与接受
func main() {
// 建立amqp连接
conn, err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
if err != nil {
panic(any(err))
}
// 建立amqp通道
ch, err := conn.Channel()
if err != nil {
panic(any(err))
}
// 创建队列(只有不存在才会创建
err = ch.ExchangeDeclare(
exchangeName,
"fanout",
true,
false,
false,
false,
nil,
)
if err != nil {
panic(any(err))
}
// 接受消息
go subscribe(conn, exchangeName)
go subscribe(conn, exchangeName)
// 发送消息
for i := 0; i < 3000; i++ {
err := ch.Publish(
exchangeName, // 交换机名称
"", // 绑定队列的key
false,
false,
amqp.Publishing{
Body: []byte(fmt.Sprintf("go send message: %d", i)),
},
)
if err != nil {
fmt.Println(err.Error())
}
time.Sleep(200 * time.Millisecond)
}
}
// 交换机分发消息
func subscribe(conn *amqp.Connection, exchangeName string) {
// 如果路由绑定的队列不存在创建队列
ch, err := conn.Channel()
if err != nil {
panic(any(err))
}
q, _ := ch.QueueDeclare(
"geq1",
true, // 重启后队列还存在
false, // 不自动删除
false, // 是否加排他锁
false, // 是否不等待结果
nil)
// 交换机绑定队列
err = ch.QueueBind(q.Name, "", exchangeName, false, nil)
if err != nil {
panic(any(err))
}
// 接受消息
consume(ch, q.Name)
}
func consume(ch *amqp.Channel, queueName string) {
deliveries, err := ch.Consume(queueName, "go-consumer", true, false, false, false, nil)
if err != nil {
return
}
// 打印消息体
for msg := range deliveries {
fmt.Printf("%s\n", msg.Body)
}
}
RocketMQ
安装
定义配置文件./config/broker.conf
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# 所属集群名字
brokerClusterName=DefaultCluster
# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a
# 0 表示 Master,> 0 表示 Slave
brokerId=0
# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1=docker.for.mac.localhost
# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true
# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
# Broker 对外服务的监听端口
listenPort=10911
# 删除文件时间点,默认凌晨4点
deleteWhen=04
# 文件保留时间,默认48小时
fileReservedTime=120
# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存储路径
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消费队列存储
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存储路径
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存储路径
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存储路径
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536
# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000
# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER
# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128
docker-composer.yaml
version: '3.5'
services:
rmqnamesrv:
image: foxiswho/rocketmq:server
container_name: rmqnamesrv
restart: always
ports:
- 9876:9876
volumes:
- ./logs:/opt/logs
- ./store:/opt/store
networks:
my-network:
aliases:
- rmqnamesrv
rmqbroker:
image: foxiswho/rocketmq:broker
container_name: rmqbroker
restart: always
ports:
- 10909:10909
- 10911:10911
volumes:
- ./logs:/opt/logs
- ./store:/opt/store
- ./config/broker.conf:/etc/rocketmq/broker.conf
environment:
NAMESRV_ADDR: "rmqnamesrv:9876"
JAVA_OPTS: " -Duser.home=/opt"
JAVA_OPT_EXT: "-server -Xms256m -Xmx256m -Xmn256m"
command: mqbroker -c /etc/rocketmq/broker.conf
depends_on:
- rmqnamesrv
networks:
my-network:
aliases:
- rmqbroker
rmqconsole:
image: styletang/rocketmq-console-ng
container_name: rmqconsole
restart: always
ports:
- 8080:8080
environment:
JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
depends_on:
- rmqnamesrv
networks:
my-network:
aliases:
- rmqconsole
networks:
my-network:
name: my-network
driver: bridge
创建topic
发送消息
消费消息
Golang
安装依赖包
$ go get github.com/apache/rocketmq-client-go/v2
初始化
func main() {
p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"127.0.0.1:9876"}))
if err != nil {
panic(any(err))
}
if err := p.Start(); err != nil {
panic(any(err))
}
//SendSync(p)
Read()
}
发送消息
// SendSync 同步发送
func SendSync(p rocketmq.Producer, messageBody any) {
message := primitive.NewMessage("hello", []byte("hello GO"))
res, err := p.SendSync(context.Background(), message)
if err != nil {
panic(any(err))
}
// 打印结果
fmt.Printf("%+v", res)
}
读取消息
// Read 读取消息
func Read() {
// 使用服务端用数据后自动推送给客户端模式
c, err := rocketmq.NewPushConsumer(
consumer.WithNameServer([]string{"127.0.0.1:9876"}),
consumer.WithGroupName("shop"), // 同一服务的group设置为一样的. 保证分布式服务不会消费同一消息
)
if err != nil {
panic(any(err))
}
// 订阅指定 topic
if err := c.Subscribe("hello", consumer.MessageSelector{}, func(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for i := range ext {
fmt.Printf("获取值: %v\n", ext[i])
}
// 读到处理完后返回成功, 保证不会读取重复消息, 使用ConsumeRetryLater 果断时候后可继续消费
return consumer.ConsumeSuccess, nil
}); err != nil {
fmt.Println("读取消息失败")
}
// 启动
_ = c.Start()
// 携程消费, 防止主进程小时
time.Sleep(time.Hour)
_ = c.Shutdown()
}
延迟消息
延时消息只需要为 primitive.Message
定义延时等级即可实现延时功能, 不同等级延时时间不同
等级 | 延时时常 |
---|---|
1 | 1s |
2 | 5s |
3 | 10s |
4 | 30s |
5 | 1m |
6 | 2m |
7 | 3m |
8 | 4m |
9 | 5m |
10 | 6m |
11 | 7m |
12 | 8m |
13 | 9m |
14 | 10m |
15 | 20m |
16 | 30m |
17 | 1h |
18 | 2h |
// SendSync 同步发送
func SendSync(q rocketmq.Producer, messageBody any) {
message := primitive.NewMessage("hello", []byte("hello GO"))
message.WithDelayTimeLevel(3) // 延时等级3: 30s后发送
res, err := q.SendSync(context.Background(), message)
if err != nil {
panic(any(err))
}
// 打印结果
fmt.Printf("%+v", res)
}
事物消息
使用事物消息需要实现 primitive.TransactionListener 接口
type TransactionListener interface {
// When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
ExecuteLocalTransaction(*Message) LocalTransactionState
// When no response to prepare(half) message. broker will send check message to check the transaction status, and this
// method will be invoked to get local transaction status.
CheckLocalTransaction(*MessageExt) LocalTransactionState
}
接口实现
type MyTransactionListener struct {
// 需要实现下面函数
//ExecuteLocalTransaction(message *primitive.Message) primitive.LocalTransactionState
//CheckLocalTransaction(*primitive.MessageExt) primitive.LocalTransactionState
}
// ExecuteLocalTransaction 发送成功后调用
func (l *MyTransactionListener) ExecuteLocalTransaction(message *primitive.Message) primitive.LocalTransactionState {
fmt.Println("开始执行本地事物")
time.Sleep(2 * time.Second)
fmt.Printf("执行本地事物, 返回状态: %d\n", primitive.UnknowState)
// 返回提交状态 CommitMessageState: 执行成功提交; RollbackMessageState: 执行失败回滚消息;
return primitive.UnknowState
}
// CheckLocalTransaction 检查没有响应消息的状态(如果未返回 或者 UnknowState:调用此方法
func (l *MyTransactionListener) CheckLocalTransaction(*primitive.MessageExt) primitive.LocalTransactionState {
fmt.Println("检查本地状态")
time.Sleep(5 * time.Second)
return primitive.UnknowState
}
发送消息
// Transaction 事物消息
func Transaction() {
p, err := rocketmq.NewTransactionProducer(
&MyTransactionListener{},
producer.WithNameServer([]string{"127.0.0.1:9876"}),
producer.WithRetry(1),
)
if err != nil {
// 示例化失败
panic(any(err))
}
err = p.Start()
if err != nil {
fmt.Printf("start producer error: %s\n", err.Error())
}
// 发送事物消息
message := primitive.NewMessage("hello", []byte("事物消息"))
res, err := p.SendMessageInTransaction(context.Background(), message)
if err != nil {
// 发送失败
fmt.Println("发送失败")
panic(any(err))
}
fmt.Printf("发送结果: %s\n", res.String())
time.Sleep(10 * time.Hour)
if err := p.Shutdown(); err != nil {
// 结束失败
panic(any(err))
}
}