RabbitMQ

pub/sub模式

发送方发送到交换机, 交换机再路由到不同的队列

  • Durability: Durable服务重启后队列存在;Transient服务重启后队列消失
  • 交换机类型:
    • fanout: 一个消息进入,多个消息分到不同消息(需要注意,不负责存储,如果没人接受也会消失)
    • direct: 路由模式,fanout 升级版,可以根据属性进入到不同的队列
    • topic: 模糊匹配属性进入到不同的队列; 例如: *.user.* 可以匹配到 hello.user.1, user.# 可以匹配到用户后面所有的 user.hello.1, 而 user.*是无法匹配到的。

fanount

创建队列

image-20220807141904805

创建交换机

image-20220807142008185

交换机绑定队列

image-20220807142120902

direct

创建交换机

image-20220807142409819

绑定队列

user 的消息发送到q1, room 的消息发送到q2

image-20220807142609460

测试消息

image-20220807142652370

队列接受消息image-20220807142714824

Golang

安装依赖包

$ go get -u github.com/streadway/amqp

简单模式

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "time"
)

// 普通的发送与接受
func main() {
    // 建立amqp连接
    conn, err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
    if err != nil {
        panic(any(err))
    }

    // 建立amqp通道
    ch, err := conn.Channel()
    if err != nil {
        panic(any(err))
    }

    // 创建队列(只有不存在才会创建
    declare, err := ch.QueueDeclare(
        "gq1",
        true,  // 重启后队列还存在
        false, // 不自动删除
        false, // 是否加排他锁
        false, // 是否不等待结果
        nil)
    if err != nil {
        panic(any(err))
    }
    // 接受消息
    go consume(conn, declare.Name)

    // 发送消息
    for i := 0; i < 3000; i++ {
        err := ch.Publish(
            "", // 交换机名称
            declare.Name,
            false,
            false,
            amqp.Publishing{
                Body: []byte(fmt.Sprintf("go send message: %d", i)),
            },
        )
        if err != nil {
            fmt.Println(err.Error())
        }
        time.Sleep(200 * time.Millisecond)
    }
}

func consume(conn *amqp.Connection, queueName string) {
    ch, err := conn.Channel()
    if err != nil {
        panic(any(err))
    }

    deliveries, err := ch.Consume(queueName, "go-consumer", true, false, false, false, nil)
    if err != nil {
        return
    }

    // 打印消息体
    for msg := range deliveries {
        fmt.Printf("%s\n", msg.Body)
    }
}

pub/sub模式

package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "time"
)

const exchangeName = "go_exchange1"

// 普通的发送与接受
func main() {
    // 建立amqp连接
    conn, err := amqp.Dial("amqp://admin:admin@127.0.0.1:5672")
    if err != nil {
        panic(any(err))
    }

    // 建立amqp通道
    ch, err := conn.Channel()
    if err != nil {
        panic(any(err))
    }

    // 创建队列(只有不存在才会创建
    err = ch.ExchangeDeclare(
        exchangeName,
        "fanout",
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        panic(any(err))
    }
    // 接受消息
    go subscribe(conn, exchangeName)
    go subscribe(conn, exchangeName)

    // 发送消息
    for i := 0; i < 3000; i++ {
        err := ch.Publish(
            exchangeName, // 交换机名称
            "",           // 绑定队列的key
            false,
            false,
            amqp.Publishing{
                Body: []byte(fmt.Sprintf("go send message: %d", i)),
            },
        )
        if err != nil {
            fmt.Println(err.Error())
        }
        time.Sleep(200 * time.Millisecond)
    }
}

// 交换机分发消息
func subscribe(conn *amqp.Connection, exchangeName string) {
    // 如果路由绑定的队列不存在创建队列
    ch, err := conn.Channel()
    if err != nil {
        panic(any(err))
    }
    q, _ := ch.QueueDeclare(
        "geq1",
        true,  // 重启后队列还存在
        false, // 不自动删除
        false, // 是否加排他锁
        false, // 是否不等待结果
        nil)

    // 交换机绑定队列
    err = ch.QueueBind(q.Name, "", exchangeName, false, nil)
    if err != nil {
        panic(any(err))
    }
    // 接受消息
    consume(ch, q.Name)
}

func consume(ch *amqp.Channel, queueName string) {

    deliveries, err := ch.Consume(queueName, "go-consumer", true, false, false, false, nil)
    if err != nil {
        return
    }

    // 打印消息体
    for msg := range deliveries {
        fmt.Printf("%s\n", msg.Body)
    }
}

RocketMQ

安装

定义配置文件./config/broker.conf

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.


# 所属集群名字
brokerClusterName=DefaultCluster

# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a

# 0 表示 Master,> 0 表示 Slave
brokerId=0

# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876

# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1=docker.for.mac.localhost

# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4

# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true

# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true

# Broker 对外服务的监听端口
listenPort=10911

# 删除文件时间点,默认凌晨4点
deleteWhen=04

# 文件保留时间,默认48小时
fileReservedTime=120

# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824

# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000

# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存储路径
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消费队列存储
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存储路径
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存储路径
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存储路径
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536

# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000

# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER

# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128

docker-composer.yaml

version: '3.5'
services:
  rmqnamesrv:
    image: foxiswho/rocketmq:server
    container_name: rmqnamesrv
    restart: always
    ports:
      - 9876:9876
    volumes:
      - ./logs:/opt/logs
      - ./store:/opt/store
    networks:
        my-network:
          aliases:
            - rmqnamesrv

  rmqbroker:
    image: foxiswho/rocketmq:broker
    container_name: rmqbroker
    restart: always
    ports:
      - 10909:10909
      - 10911:10911
    volumes:
      - ./logs:/opt/logs
      - ./store:/opt/store
      - ./config/broker.conf:/etc/rocketmq/broker.conf
    environment:
        NAMESRV_ADDR: "rmqnamesrv:9876"
        JAVA_OPTS: " -Duser.home=/opt"
        JAVA_OPT_EXT: "-server -Xms256m -Xmx256m -Xmn256m"
    command: mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - rmqnamesrv
    networks:
      my-network:
        aliases:
          - rmqbroker

  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    restart: always
    ports:
      - 8080:8080
    environment:
        JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      my-network:
        aliases:
          - rmqconsole

networks:
  my-network:
    name: my-network
    driver: bridge

创建topic

image-20220716130905307

发送消息

image-20220716131041679

image-20220716131100112

消费消息

image-20220716131222057

image-20220716131338867

Golang

文档地址

安装依赖包

$ go get github.com/apache/rocketmq-client-go/v2

初始化

func main() {
    p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"127.0.0.1:9876"}))
    if err != nil {
        panic(any(err))
    }
    if err := p.Start(); err != nil {
        panic(any(err))
    }

    //SendSync(p)
    Read()
}

发送消息

// SendSync 同步发送
func SendSync(p rocketmq.Producer, messageBody any) {
    message := primitive.NewMessage("hello", []byte("hello GO"))
    res, err := p.SendSync(context.Background(), message)

    if err != nil {
        panic(any(err))
    }
    // 打印结果
    fmt.Printf("%+v", res)
}

读取消息

// Read 读取消息
func Read() {
    // 使用服务端用数据后自动推送给客户端模式
    c, err := rocketmq.NewPushConsumer(
        consumer.WithNameServer([]string{"127.0.0.1:9876"}),
        consumer.WithGroupName("shop"), // 同一服务的group设置为一样的. 保证分布式服务不会消费同一消息
    )
    if err != nil {
        panic(any(err))
    }

    // 订阅指定 topic
    if err := c.Subscribe("hello", consumer.MessageSelector{}, func(ctx context.Context, ext ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
        for i := range ext {
            fmt.Printf("获取值: %v\n", ext[i])
        }
        // 读到处理完后返回成功, 保证不会读取重复消息, 使用ConsumeRetryLater 果断时候后可继续消费
        return consumer.ConsumeSuccess, nil
    }); err != nil {
        fmt.Println("读取消息失败")
    }
  // 启动
    _ = c.Start()
    // 携程消费, 防止主进程小时
    time.Sleep(time.Hour)
    _ = c.Shutdown()
}

延迟消息

延时消息只需要为 primitive.Message 定义延时等级即可实现延时功能, 不同等级延时时间不同

等级延时时常
11s
25s
310s
430s
51m
62m
73m
84m
95m
106m
117m
128m
139m
1410m
1520m
1630m
171h
182h
// SendSync 同步发送
func SendSync(q rocketmq.Producer, messageBody any) {
    message := primitive.NewMessage("hello", []byte("hello GO"))
    message.WithDelayTimeLevel(3) // 延时等级3: 30s后发送
    res, err := q.SendSync(context.Background(), message)

    if err != nil {
        panic(any(err))
    }
    // 打印结果
    fmt.Printf("%+v", res)
}

事物消息

使用事物消息需要实现 primitive.TransactionListener 接口

type TransactionListener interface {
    //  When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
    ExecuteLocalTransaction(*Message) LocalTransactionState

    // When no response to prepare(half) message. broker will send check message to check the transaction status, and this
    // method will be invoked to get local transaction status.
    CheckLocalTransaction(*MessageExt) LocalTransactionState
}
接口实现
type MyTransactionListener struct {
    // 需要实现下面函数
    //ExecuteLocalTransaction(message *primitive.Message) primitive.LocalTransactionState
    //CheckLocalTransaction(*primitive.MessageExt) primitive.LocalTransactionState
}

// ExecuteLocalTransaction 发送成功后调用
func (l *MyTransactionListener) ExecuteLocalTransaction(message *primitive.Message) primitive.LocalTransactionState {
    fmt.Println("开始执行本地事物")
    time.Sleep(2 * time.Second)
    fmt.Printf("执行本地事物, 返回状态: %d\n", primitive.UnknowState)
    // 返回提交状态 CommitMessageState: 执行成功提交; RollbackMessageState: 执行失败回滚消息;
    return primitive.UnknowState
}

// CheckLocalTransaction 检查没有响应消息的状态(如果未返回 或者 UnknowState:调用此方法
func (l *MyTransactionListener) CheckLocalTransaction(*primitive.MessageExt) primitive.LocalTransactionState {
    fmt.Println("检查本地状态")
    time.Sleep(5 * time.Second)
    return primitive.UnknowState
}
发送消息
// Transaction 事物消息
func Transaction() {
    p, err := rocketmq.NewTransactionProducer(
        &MyTransactionListener{},
        producer.WithNameServer([]string{"127.0.0.1:9876"}),
        producer.WithRetry(1),
    )
    if err != nil {
        // 示例化失败
        panic(any(err))
    }
    err = p.Start()
    if err != nil {
        fmt.Printf("start producer error: %s\n", err.Error())
    }

    // 发送事物消息
    message := primitive.NewMessage("hello", []byte("事物消息"))
    res, err := p.SendMessageInTransaction(context.Background(), message)
    if err != nil {
        // 发送失败
        fmt.Println("发送失败")
        panic(any(err))
    }

    fmt.Printf("发送结果: %s\n", res.String())

    time.Sleep(10 * time.Hour)

    if err := p.Shutdown(); err != nil {
        // 结束失败
        panic(any(err))
    }
}
Last modification:July 23rd, 2024 at 02:41 pm