RabbitMQ

本文最后更新于:2023年12月5日 晚上

阿里云消息队列:https://www.aliyun.com/product/ons

RabbitMQ 基于 erlang 语言开发,具有高并发优点、支持分布式具有消息确认机制、消息持久化机制,消息可靠性和集群可靠性高,简单易用、运行稳定、跨平台、多语言开源

Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念,当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接。
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)。
Queue:消息最终被送到这里等待 consumer 取走,先进先出,可以持久化到磁盘节点服务器
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。

RabbitMQ 中的生产者消费者示例

生产者发送消息到 broker server(RabbitMQ),在 Broker 内部,用户创建 Exchange/Queue,通过 Binding 规则将两者联系在一起,Exchange 分发消息,根据类型/binding 的不同分发策略有区别,消息最后来到 Queue 中,等待消费者取走

RabbitMQ 单机部署

快速部署示例:https://www.rabbitmq.com/install-debian.html#apt-bintray-quick-start

  1. 主机名解析

    [root@mq1 ~]$hostname
    mq1.ljk.cn
    [root@mq1 ~]$hostname -I
    10.0.1.101
    [root@mq1 ~]$vim /etc/hosts
    ...
    10.0.1.101 mq1.ljk.cn mq1
  2. 时间原因,选择 apt 安装,直接执行以下脚本

    #!/bin/sh
    
    ## If sudo is not available on the system,
    ## uncomment the line below to install it
    # apt-get install -y sudo
    
    sudo apt-get update -y
    
    ## Install prerequisites
    sudo apt-get install curl gnupg -y
    
    ## Install RabbitMQ signing key
    curl -fsSL https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo apt-key add -
    
    ## Install apt HTTPS transport
    sudo apt-get install apt-transport-https
    
    $distribution='bionic'  # ubuntu18.04
    sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list <<EOF
    ## 默认安装最新版本的 erlang 23.x,可以指定版本例如:erlang-22.x
    deb https://dl.bintray.com/rabbitmq-erlang/debian $distribution erlang
    ## Installs latest RabbitMQ release
    deb https://dl.bintray.com/rabbitmq/debian $distribution main
    EOF
    
    ## Update package indices
    sudo apt-get update -y
    
    ## Install rabbitmq-server and its dependencies
    sudo apt-get install rabbitmq-server -y --fix-missing
  3. 安装成功

    [root@zabbix src]$systemctl status rabbitmq-server.service
    ● rabbitmq-server.service - RabbitMQ broker
       Loaded: loaded (/lib/systemd/system/rabbitmq-server.service; enabled; vendor preset: enabled)
       Active: active (running) since Sun 2021-02-21 02:41:45 UTC; 1h 26min ago
     Main PID: 5992 (beam.smp)
       Status: "Initialized"
        Tasks: 22 (limit: 2289)
       CGroup: /system.slice/rabbitmq-server.service
               ├─5992 /usr/lib/erlang/erts-11.1.8/bin/beam.smp -W w -MBas ageffcbf -MHas ageffcbf -MBlmbcs 512 -MHlmbcs 512 -MMmcs 30 -P 1048576 -t 5000000 -stbt db -zd
               ├─6013 erl_child_setup 32768
               ├─6039 /usr/lib/erlang/erts-11.1.8/bin/epmd -daemon
               ├─6058 inet_gethost 4
               └─6059 inet_gethost 4
    ...
  4. 插件管理
    https://www.rabbitmq.com/management.html
    开启 web 管理界面

    [root@zabbix src]$which rabbitmq-plugins
    /usr/sbin/rabbitmq-plugins
    [root@zabbix src]$rabbitmq-plugins enable rabbitmq_management
    Enabling plugins on node rabbit@mq1:
    rabbitmq_management
    The following plugins have been configured:
      rabbitmq_management
      rabbitmq_management_agent
      rabbitmq_web_dispatch
    Applying plugin configuration to rabbit@mq1...
    The following plugins have been enabled:
      rabbitmq_management
      rabbitmq_management_agent
      rabbitmq_web_dispatch
    
    started 3 plugins.
    
    # 5672  消费者访问的 端口
    # 15672  web 管理端口
    # 25672  集群状态通信端口
    [root@mq1 ~]$rabbitmqctl list_users
    Listing users ...
    user    tags
    guest   [administrator]  # guest是初始化的用户,账号密码都是guest,但是只能使用localhost访问
    
    [root@mq1 ~]$rabbitmqctl add_user ljk 123456 # 添加用户
    Adding user "ljk" ...
    Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more.
    [root@mq1 ~]$rabbitmqctl set_user_tags ljk administrator # 用户授权
    Setting tags for user "ljk" to [administrator] ...
    [root@mq1 ~]$rabbitmqctl list_users
    Listing users ...
    user    tags
    ljk     [administrator]  # 授权成功
    guest   [administrator]

    使用账号 ljk,密码 123456,成功登录 web 管理界面:

    登录之后,我们很少进行更改,主要是查看 rabbitmq 的运行状态

RabbitMQ 集群部署

  • 普通模式:创建好 RabbitMQ 集群之后的默认模式
    所有的 mq 节点保存相同的元数据,即消息队列,但队列里的数据仅保存一份,消费者从 A 节点拉取消息,如果消息在 B 节点,那么 B 会将消息发送给 A 节点,然后消费者就能拉取到数据了
    缺点:单点失败
  • 镜像模式:把需要的队列做成镜像队列
    在普通模式的基础上,增加一些镜像策略,消息实体会主动在镜像节点间同步,而不是在 consumer 取数据时临时拉取,解决了单点失败的问题,但是性能下降,增加集群内部网络消耗一个
    队列想做成镜像队列,需要先设置 policy,然后客户端创建队列的时候,rabbitmq 集群根据“队列名称”自动设置是普通集群模式或镜像队列

推荐设计架构:

在一个 rabbitmq 集群里,有 3 台或以上机器,其中 1 台使用磁盘模式(数据保存到内存和磁盘),其它节点使用内存模式(数据只保存到内存),使用磁盘模式的节点,作为数据备份使用

Ubuntu 1804 安装集群版 RabbitMQ

10.0.1.101 mq1.ljk.cn mq1
10.0.1.102 mq1.ljk.cn mq2
10.0.1.103 mq1.ljk.cn mq3

Rabbitmq 的集群是依赖于 erlang 的集群来工作的,所以必须先构建起 erlang 的集群环境,而 Erlang 的集群中各节点是通过一个 magic cookie 来实现的,这个 cookie 存放在 /var/lib/rabbitmq/.erlang.cookie 中,文件是 400 的权限,所以必须保证各节点 cookie 保持一致,否则节点之间就无法通信

  1. 各节点安装 RabbitMQ 并安装插件,参考上面单机部署

  2. 各服务器关闭 RabbitMQ

    [root@mq1 ~]$systemctl stop rabbitmq-server.service
    [root@mq2 ~]$systemctl stop rabbitmq-server.service
    [root@mq3 ~]$systemctl stop rabbitmq-server.service
  3. 在 mq1 同步.erlang.cookie 至其他两台服务器

    [root@mq1 ~]$cd /var/lib/rabbitmq/
    [root@mq1 rabbitmq]$scp .erlang.cookie 10.0.1.102:/var/lib/rabbitmq
    [root@mq1 rabbitmq]$scp .erlang.cookie 10.0.1.103:/var/lib/rabbitmq
  4. 各服务器启动 RabbitMQ

    [root@mq1 ~]$systemctl start rabbitmq-server.service
    [root@mq2 ~]$systemctl start rabbitmq-server.service
    [root@mq3 ~]$systemctl start rabbitmq-server.service
  5. 查看当前集群状态

    [root@mq1 ~]$rabbitmqctl cluster_status
    [root@mq2 ~]$rabbitmqctl cluster_status
    [root@mq3 ~]$rabbitmqctl cluster_status
  6. 创建 RabbitMQ 集群:mq1、mq2 作为内存节点,mq3 作为磁盘节点

    [root@mq1 ~]$rabbitmqctl stop_app  #停止 app 服务
    [root@mq1 ~]$rabbitmqctl reset   #清空元数据
    [root@mq1 ~]$rabbitmqctl join_cluster rabbit@mq3 --ram  #将mq1添加到集群中,并成为内存节点,不加--ram默认是磁盘节点
    [root@mq1 ~]$rabbitmqctl start_app #启动 app 服务
    [root@mq2 ~]$rabbitmqctl stop_app
    [root@mq2 ~]$rabbitmqctl reset
    [root@mq2 ~]$rabbitmqctl join_cluster rabbit@mq3 --ram
    [root@mq2 ~]$rabbitmqctl start_app
  7. 将集群设置为镜像模式,任意节点执行以下命令

    [root@mq1 ~]$rabbitmqctl set_policy ha-all "#" '{"ha-mode":"all"}'
  8. 查看集群状态

    [root@mq1 ~]$rabbitmqctl cluster_status
    Cluster status of node rabbit@mq1 ...
    Basics
    
    Cluster name: rabbit@mq3.ljk.cn
    
    Disk Nodes
    
    rabbit@mq3
    
    RAM Nodes
    
    rabbit@mq1
    rabbit@mq2
    
    Running Nodes
    
    rabbit@mq1
    rabbit@mq2
    rabbit@mq3
    
    Versions
    
    rabbit@mq1: RabbitMQ 3.8.12 on Erlang 23.2.5
    rabbit@mq2: RabbitMQ 3.8.12 on Erlang 23.2.5
    rabbit@mq3: RabbitMQ 3.8.12 on Erlang 23.2.5
    
    Maintenance status
    
    Node: rabbit@mq1, status: not under maintenance
    Node: rabbit@mq2, status: not under maintenance
    Node: rabbit@mq3, status: not under maintenance
    
    Alarms
    
    (none)
    
    Network Partitions
    
    (none)
    
    Listeners
    
    Node: rabbit@mq1, interface: [::], port: 15672, protocol: http, purpose: HTTP API
    Node: rabbit@mq1, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
    Node: rabbit@mq1, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
    Node: rabbit@mq2, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
    Node: rabbit@mq2, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
    Node: rabbit@mq2, interface: [::], port: 15672, protocol: http, purpose: HTTP API
    Node: rabbit@mq3, interface: [::], port: 25672, protocol: clustering, purpose: inter-node and CLI tool communication
    Node: rabbit@mq3, interface: [::], port: 5672, protocol: amqp, purpose: AMQP 0-9-1 and AMQP 1.0
    Node: rabbit@mq3, interface: [::], port: 15672, protocol: http, purpose: HTTP API
    
    Feature flags
    
    Flag: drop_unroutable_metric, state: enabled
    Flag: empty_basic_get_metric, state: enabled
    Flag: implicit_default_bindings, state: enabled
    Flag: maintenance_mode_status, state: enabled
    Flag: quorum_queue, state: enabled
    Flag: user_limits, state: enabled
    Flag: virtual_host_metadata, state: enabled

RabbitMQ 常用命令

  • 创建 vhost

    [root@mq1 ~]$rabbitmqctl add_vhost test
    Adding vhost "test" ...
  • 列出所有 vhost

    [root@mq1 ~]$rabbitmqctl list_vhosts
    Listing vhosts ...
    name
    /
    test
  • 列出所有队列

    [root@mq1 ~]$rabbitmqctl list_queues
    Timeout: 60.0 seconds ...
    Listing queues for vhost / ...
  • 删除指定 vhost

    [root@mq1 ~]$rabbitmqctl delete_vhost test
    Deleting vhost "test" ...
  • 添加账户 jack 密码为 123456

    [root@mq1 ~]$rabbitmqctl add_user jack 123456
    Adding user "jack" ...
    Done. Don't forget to grant the user permissions to some virtual hosts! See 'rabbitmqctl help set_permissions' to learn more.
  • 更改用户密码

    [root@mq1 ~]$rabbitmqctl change_password jack 654321
    Changing password for user "jack" ...
  • 设置 jack 用户对 test 的 vhost 有读写权限,三个点为配置正则、读和写

    [root@mq1 ~]$rabbitmqctl set_permissions -p test jack ".*" ".*" ".*"

RabbitMQ API

RabbitMQ Cluster Monitor

集群状态监控

#!/bin/env python
# coding:utf-8
#Author: ZhangJie
import subprocess

running_list = []
error_list = []
false = "false"
true = "true"


def get_status():
    obj = subprocess.Popen(
        ("curl - s - u guest: guest http: // localhost: 15672/api/nodes & > / dev/null"), shell=True, stdout=subprocess.PIPE)

    data = obj.stdout.read()
    data1 = eval(data)
    for i in data1:
        if i.get("running") == "true":
            running_list.append(i.get("name"))
        else:
            error_list.append(i.get("name"))


def count_server():
    if len(running_list) < 3:   # 可以判断错误列表大于 0 或者运行列表小于 3,3未总计的节点数量
        print(101)              # 100 就是集群内有节点运行不正常了
    else:
        print(50)               # 50 为所有节点全部运行正常


def main():
    get_status()
    count_server()


if __name__ == "__main__":
    main()

内存使用监控

# cat rabbitmq_memory.py
#!/bin/env python
# coding:utf-8
#Author: ZhangJie
import subprocess
import sys

running_list = []
error_list = []
false = "false"
true = "true"


def get_status():
    obj = subprocess.Popen(
        ("curl - s - u guest: guest http: // localhost: 15672/api/nodes & > / dev/null"), shell=True, stdout=subprocess.PIPE)

    data = obj.stdout.read()
    data1 = eval(data)
    # print(data1)
    for i in data1:
        if i.get("name") == sys.argv[1]:
            print(i.get("mem_used"))


def main():
    get_status()


if __name__ == "__main__":
    main()

root@mq-server3: ~  # python3 rabbitmq_memory.py rabbit@mq-server1
85774336
root@mq-server3: ~  # python3 rabbitmq_memory.py rabbit@mq-server2
91099136
root@mq-server3: ~  # python3 rabbitmq_memory.py rabbit@mq-server3
96428032

RabbitMQ
http://blog.lujinkai.cn/运维/消息队列与微服务/RabbitMQ/
作者
像方便面一样的男子
发布于
2021年2月21日
更新于
2023年12月5日
许可协议