RocketMQ - CentOS 7.x 安装单机版并测试

小明 2025-04-29 12:13:21 4
【安装前环境准备】
检查是否安装好JDK(必要):java -version
查看CPU信息:
# cat /proc/cpuinfo
# lscpu
# getconf _NPROCESSORS_ONLN
# cat /sys/devices/system/cpu/online
# cat /proc/interrupts | egrep -i 'cpu
查看内存信息:
# free -hm
# cat /proc/meminfo
查看磁盘信息:
# lsblk
# fdisk -l   - 显示系统中的磁盘分区表信息,包括硬盘的大小、分区类型等
# df -hl
# df -a
# du -sh [目录名]
# du -sm [文件夹]`:查看指定文件夹的总M数
# du -h [目录名]`:查看指定文件夹下的所有文件大小(包含子文件夹)
# ls -lh /opt/install-file/
RocketMQ 防火墙端口开放
NameServer端口:9876
Broker端口:10911
Broker高可用(HA)端口:10912
Broker管理端口:10909(通常是10911 - 2,默认不开启)
FastRemoting端口:通常为 Broker 监听端口 + 2(例如 10911 + 2 = 10913)
状态:sudo systemctl status firewalld
启动:sudo systemctl start firewalld
查看:firewall-cmd --list-ports --permanent
添加防火墙开放访问端口:
firewall-cmd --zone=public --add-port=9876/tcp --permanent
firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=10911/tcp --permanent
firewall-cmd --zone=public --add-port=10912/tcp --permanent
加载:firewall-cmd --reload
重启:systemctl restart firewalld
停止:sudo systemctl stop firewalld
安装解压工具:yum install -y unzip
创建安装目录:mkdir -p /opt/soft/rocketmq
【正式安装 RocketMQ】
1.下载:https://dist.apache.org/repos/dist/release/rocketmq/4.9.8/rocketmq-all-4.9.8-bin-release.zip
2.解压到指定安装目录:unzip rocketmq-all-4.9.8-bin-release.zip -d /opt/soft/rocketmq/
3.重命名文件夹
# cd /opt/soft/rocketmq/
# mv rocketmq-all-4.9.8-bin-release rocketmq-all-4.9.8
目录结构说明:
 benchmark :存放的是性能测试脚本
 bin:可执行文件脚本文件
conf: 存放配置文件的目录
 lib:其他第三方依赖库
 LICENSE:授权信息
 NOTICE:版本公告信息
 README.md
4.配置环境变量
执行路径:/opt/soft/rocketmq/rocketmq-all-4.9.8/bin
查看网卡eth0的IP地址:ifconfig
# vim /etc/profile
添加 NAMESRV_ADDR 环境变量配置:
# rocketmq config
export NAMESRV_ADDR=192.168.1.210:9876
保存生效:source /etc/profile
5.修改启动脚本
需要修改两个启动脚本:runserver.sh 与 runbroker.sh
脚本位置:/opt/soft/rocketmq/rocketmq-all-4.9.8/bin/
先备份好原来的配置,然后开始修改
a.修改runserver.sh脚本: vim runserver.sh
  找到 choose_gc_options() 函数,根据你的jdk版本修改启动配置参数:
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
根据你的机器配置修改java参数,比如:-Xms4g -Xmx4g -Xmn2g 改为 -Xms512m -Xmx512m -Xmn256m
b.修改runbroker.sh脚本: vim runbroker.sh
# 同样找到 choose_gc_log_directory ,修改函数中的java虚拟机参数配置
如:JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g" 改为JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"
6.修改配置文件
# cd /opt/soft/rocketmq/rocketmq-all-4.9.8/conf/
# ll
# vim broker.conf
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
在文件文本后面添加如下配置:
namesrvAddr=192.168.1.210:9876   -- IP地址根据自己的通信地址设置
autoCreateTopicEnable=true
brokerIP1=192.168.1.210
其他参数说明:
brokerClusterName - MQ集群的名称,我们改为RocketMQ-Cluster。
brokerName - 队列的名字,配置为broker-a。
brokerId - 队列的id,0代表是“主”,其他正整数代表着“从”。
deleteWhen=04 - 代表着commitLog过期了,就会被删除。
fileReservedTime - commitLog的过期时间,单位是小时,这里配置的是48小时。
brokerRole - 队列的角色,ASYNC_MASTER是异步主。
flushDiskType - 保存磁盘的方式,异步保存。
配置参数说明:
namesrvAddr:nameSrv地址,如果nameSrv与broker在同一台服务器上运行,可以配置为localhost+端口。集群中最好配置为对外提供服务的IP地址+端口
autoCreateTopicEnable:true说明自动创建主题,false则需要手动创建
brokeIP1:这个一定要配置为对外提供服务的IP地址
7.启动服务
如果机器配置不够,启动前先清一下缓存
清除PageCache页面高速缓存:sudo sync && echo 3 | sudo tee /proc/sys/vm/drop_caches
清除dentries和inodes,即目录项和索引节点:sudo sync && echo 2 > /proc/sys/vm/drop_caches
a.先启动namesrv
# cd /opt/soft/rocketmq/rocketmq-all-4.9.8/bin/
后台启动:nohup sh mqnamesrv &
后台带日志启动: nohup sh mqnamesrv  > ../namesrv.log 2>&1 &
b.然后启动broker:
后台启动:nohup sh mqbroker -c ../conf/broker.conf &
后台指定日志启动:nohup sh mqbroker -c ../conf/broker.conf > ../broker.log 2>&1 &
8.安装可视化管理控制台
参考官方文档下载 rocketmq-dashboard-1.0.0-source-release.zip 源码包解压,按照文档说明进行配置和打包
官方打包指导:https://github.com/apache/rocketmq-dashboard
最终得到可运行的jar包:rocketmq-dashboard-1.0.0.jar
a.配置rocketmq-dashboard-1.0.0\src\main\resources\application.properties 等
b.使用mvn clean package -Dmaven.test.skip=true 打包获取可运行的jar包:rocketmq-dashboard-1.0.0.jar
c.mkdir -p /opt/soft/rocketmq/rocketmq-dashboard
d.将打包好的jar包上传到新建的 rocketmq-dashboard 目录下
e.进入rocketmq-dashboard目录,启动运行控制台:
启动运行控制台:
# nohup java -jar rocketmq-dashboard-1.0.0.jar --server.port=8090 --rocketmq.config.namesrvAddr=127.0.0.1:9876 > dashboard.log 2>&1 &
访问控制台:ip:8090
9.查看服务启动状态
查看进程:
# jps -l
8753 rocketmq-dashboard-1.0.0.jar
407 org.apache.rocketmq.broker.BrokerStartup
32377 org.apache.rocketmq.namesrv.NamesrvStartup
8781 sun.tools.jps.Jps
查看端口:
# netstat -npl|grep :8090
tcp6       0      0 :::8090                 :::*                    LISTEN      8753/java
# netstat -npl|grep :9876
tcp6       0      0 :::9876                 :::*                    LISTEN      32377/java
# netstat -npl|grep :10911
tcp6       0      0 :::10911                :::*                    LISTEN      407/java
# netstat -npl|grep :10912
tcp6       0      0 :::10912                :::*                    LISTEN      407/java
10.测试
引入客户端依赖:rocketmq-client


    org.apache.rocketmq
    rocketmq-client
    4.9.8

编写生产者测试代码:
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class TestSender {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        // 1.创建消息生产者producer,并指定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("my-test-group");
        // 2.指定 nameserver 地址
        producer.setNamesrvAddr("192.168.1.210:9876");
        // 设置发送超时时间
        producer.setSendMsgTimeout(10000);
        // 3.启动生产者 - producer
        producer.start();
        // 4.构建消息对象
        Message message = new Message();
        message.setTopic("test-topic");
        message.setTags("testTag");
        message.setBody(("Test MQ from sync-main, 今年666!").getBytes());
        // 5.发送消息
        SendResult result = producer.send(message, 10000);
        String msgId = result.getMsgId();
        int queueId = result.getMessageQueue().getQueueId();
        String offsetMegId = result.getOffsetMsgId();
        long offset = result.getQueueOffset();
        SendStatus sendStatus = result.getSendStatus();
        String sendMsg = "同步消息发送状态:"+sendStatus+"\t"+"消息id:"+msgId+"\t 消费者队列id:"+queueId +"\t offsetMegId:"+offsetMegId+"\t offset:"+offset;
        System.out.println("发送的消息:" + sendMsg);
        // 6.关闭生产者
        producer.shutdown();
    }
}
编写消费者测试代码:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class TestConsumer {
    private static final Logger log = LoggerFactory.getLogger(TestConsumer.class);
    public static void main(String[] args) throws MQClientException {
        
        // 1.创建消息消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-test-group");
        // 2.设置NameServer
        consumer.setNamesrvAddr("192.168.1.210:9876");
        // 3.指定订阅的主题和标签
        consumer.subscribe("test-topic","*");
        // 4.注册监听器与编写回调函数
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgList, ConsumeConcurrentlyContext context) {
                log.info("Received Message size =>{}", msgList.size());
                for (MessageExt msg : msgList) {
                    System.out.println("消费端接收到消息主题为: " + msg.getTopic() + "的消息, 队列ID为:" + msg.getQueueId());
                    System.out.println("消费端接收到消息内容为 " + new String(msg.getBody()));
                }
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.启动消费者
        consumer.start();
        System.out.printf("MQ Consumer Started.%n");
    }
    
}
运行测试,如果没问题,说明 RocketMQ 安装成功!
()()
The End
微信