rocketmq topic queue同一个topic 可以多个消费组消费吗

&span style=&font-family: Arial, Helvetica, sans- background-color: rgb(255, 255, 255);&&简单写一个MQ的生产者和消费者。生产者生产4个topic。消费者订阅消费,多线程启动4个线程,每个线程新建一个消费者来消费一个topic的数据。rocketMQ本身就是多线程的,设置每个消费者的线程数为5个。例子如下:&/span&
生产者代码如下:
import javax.annotation.PreD
import com.alibaba.rocketmq.client.exception.MQClientE
import com.alibaba.rocketmq.client.producer.DefaultMQP
public class Producer {
public static DefaultMQProducer defaultMQ
static String rocketmqAddress =&10.20.18.20:9876&;
public static DefaultMQProducer init() {
if (null == defaultMQproducer){
defaultMQproducer = new DefaultMQProducer(&messageGroup&);
defaultMQproducer.setNamesrvAddr(rocketmqAddress);
defaultMQproducer.setInstanceName(&messageProducer&);
defaultMQproducer.setMaxMessageSize(9999999);
defaultMQproducer.start();
} catch (MQClientException e) {
return defaultMQ
@PreDestroy
public void preDestroy(){
if (defaultMQproducer != null) {
defaultMQproducer.shutdown();
import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.producer.DefaultMQP
import com.alibaba.rocketmq.client.producer.SendR
import com.mon.message.M
public class ProduceCar {
public static void main(String args[]){
for (int i=1;i&=4;i++){
for (int j=1;j&=100;j++){
Message msg = new Message(&hello&+String.valueOf(i), JSON.toJSONString(i+&hello&+j).getBytes());
DefaultMQProducer difaultProducer = Producer.init();
SendResult sendResult = difaultProducer.send(msg);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.println(&finished&);
消费者代码如下:
import java.util.HashM
import java.util.M
import java.util.concurrent.E
import java.util.concurrent.E
public class Consumer {
public static void main(String args[]){
Executor executor = Executors.newFixedThreadPool(4);
Map&String,String& map = new HashMap();
map.put(&hello1&, &5&);
map.put(&hello2&, &5&);
map.put(&hello3&, &5&);
map.put(&hello4&, &5&);
for (String key : map.keySet()) {
//logger.aduit(&Key = {0}, Value = {1}&,entry.getKey(),entry.getValue());
ConsumerFactory runComsumer = new ConsumerFactory();
runComsumer.setTopics(key);
runComsumer.setInstanceName(&ConsumerInstance&+key);
runComsumer.setThreadNum(Integer.valueOf(map.get(key)));
runComsumer.setGroupName(&Consumer&+key);
executor.execute(runComsumer);
}catch(Exception e){
System.out.println(&create consumer err:{0}&+ e.getMessage());
import java.util.L
import com.alibaba.fastjson.JSON;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushC
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyC
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyS
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerC
import com.mon.message.MessageE
public class ConsumerFactory extends Thread{
public final static String ANDROID = &Android&;
public final static String IOS = &Ios&;
private String rocketmqAddress=&10.20.18.20:9876&;
int threadN
String instanceN
String groupN
DefaultMQPushConsumer consumer =
public void run() {
consumer = new DefaultMQPushConsumer(groupName);
consumer.setNamesrvAddr(rocketmqAddress);//MQ地址
consumer.setClientCallbackExecutorThreads(threadNum);//消费现场数量
consumer.setInstanceName(instanceName);//实例名称
consumer.subscribe(topics, &*&);
//注册监听
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List&MessageExt& msgs,
ConsumeConcurrentlyContext context) {
for (int i = 0; i & msgs.size(); i++) {
MessageExt msgExt =
msgs.get(i);
String msgId = msgExt.getMsgId();
String mesBody = new String(msgExt.getBody());
System.out.println(mesBody);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
consumer.start();
System.out.println(&线程执行中&);
} catch (Exception e) {
System.out.println(e);
public int getThreadNum() {
return threadN
public void setThreadNum(int threadNum) {
this.threadNum = threadN
public String getTopics() {
public void setTopics(String topics) {
this.topics =
public String getInstanceName() {
return instanceN
public void setInstanceName(String instanceName) {
this.instanceName = instanceN
public String getGroupName() {
return groupN
public void setGroupName(String groupName) {
this.groupName = groupN
本文已收录于以下专栏:
相关文章推荐
Producer的用途大家都很清楚,主要就是生产消息,那么分布式模式下与单队列模式不一样,如何能够充分利用分布式的优势,将生产的消息分布到不同的队列下呢?Rocket-MQ提供了3种不同模式的Prod...
本示例展示了一个RocketMQ producer的简单实现,通过解析文本文件获取输入数据,将数据经过Avro序列化后发送到RocketMQ。
程序通过stdin.xml配置文件获取主要参数值,std...
RocketMQ中的生产者组只能有一个在用的生产者:
Warning: Considering the provided producer is sufficiently powerful at s...
Consumer-集群Push模式-简介:
0、背景介绍
         Consumer主要用于向Broker请求Producer产生的消息,对其进行消费;对于RocketMQ,我们一定很好奇,如...
C端先启动和C端后启动
消息重试机制:P端和C端2中重试
1.Rocketmq生产者
public class Producer {
public static void main(String[] args) throws MQClientExc...
默认创建的topic读写队列数是4,用的3.5.8版本。生产者的源代码DEMO如下:
package com.xd.
import com.alibaba.rocketm...
一 nameserver
相对来说,nameserver的稳定性非常高。原因有二:
1 nameserver互相独立,彼此没有通信关系,单台nameserver挂掉,不影响其他nameserver...
1 同一个订阅组内不同Consumer实例订阅不同topic消费混乱问题调查
背景说明:
如图1左半部分,假设目前的关系如下:
broker: 两个,broker_a和broker...
他的最新文章
讲师:刘文志
讲师:陈伟
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)重复消费的问题的一个可能的问题:消费者消费消息时产生了异常,并没有返回CONSUME_SUCCESS标志。
我急于寻找解决方法,结果百度的结果都是一期多个消费者问题云云,根本没有解决我的问题。
我发现重复消费的消息和第一次消费的消息不同,多了一些重复消费的信息:
reconsumeTimes=1,2,…10
REAL_TOPIC也会是:%RETRY%XXXXX
这就是因为消息处理异常导致的消息重新消费,无路时重启服务端,还是通过mqadmin删除都没用,RocketMQ可以很好的保持消息,一定要消费成功才可以!
官方对comsumerMessage方法的实现建议是:
It is not recommend to throw exception,rather than returning ConsumeConcurrentlyStatus.RECONSUME_LATER if consumption failure
无论如何,都不要抛出异常,如果需要重新消费,可以返回RECONSUME_LATER主动要求重新消费。
下面是我的代码,我加入了catch Exception根异常来捕获业务处理的异常。
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List&MessageExt& msgs,
ConsumeConcurrentlyContext context) {
logger.debug(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
MessagePack msgpack = new MessagePack();
for (MessageExt msg : msgs){
byte[] data = msg.getBody();
RTMsgPack rtmsg = msgpack.read(data, RTMsgPack.class);
logger.debug("Receive a message:" + rtmsg);
anlysisRTMsgPack(rtmsg, engine);
} catch (IOException e) {
logger.error("Unpack RTMsg:", e);
} catch (Exception e1){
logger.warn("Unexcepted exception.", e1);
logger.debug("RETURN CONSUME SUCCESS.");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
本文已收录于以下专栏:
相关文章推荐
最近同事在调试环境中遇到了RocketMQ的重复消费问题,邀请我去解决下。
     先谈谈这个问题的背景,关于RocketMQ的重复消费结合官方和大家的使用经验,我们有这样的一个认知:
1.如果您此前未接触过RocketMQ,请先阅读附录部分,以便了解RocketMQ的整体架构和相关术语
2.文中的MQServer与Broker表示同一概念
分布式消息系统作为实现...
DefaultMQPushConsumerImpl.pullMessage(PullRequest)
(com.alibaba.rocketmq.client.impl.consumer){
在不开启事物的情况下 采用的是应答模式4(ActiveMQSession.AUTO_ACKNOWLEDGE)消费一次 应答一次
这时候消费失败了,由于没有配置死亡队列,消息就不会被消费堆积在队列中,...
本文仅记录排查和问题定位、解决的过程
由于业务需要这样一种场景,将消息按照id(业务id)尾号发送到对应的queue中,并启动10个消费者(单jvm,10个消费者组),从对应的queue中集群消费,如下图1所示(假设有两个...
Consumer-集群Push模式-简介:
0、背景介绍
         Consumer主要用于向Broker请求Producer产生的消息,对其进行消费;对于RocketMQ,我们一定很好奇,如...
RocketMq Exception &connect to
failed&问题原因分析和解决。
高并发实战之幂等处理
1. 前端重复提交选中的数据,应该后台只产生对应这个数据的一个反应结果。
2. 我们发起一笔付款请求,应该只扣用户账户一次钱,当遇到网络重发或系统bug重发,...
C端先启动和C端后启动
消息重试机制:P端和C端2中重试
他的最新文章
讲师:刘文志
讲师:陈伟
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)12955人阅读
JAVA/WEB(3)
开源项目学习(5)
Consumer-集群Push模式-简介:
0、背景介绍
&&&&&&&& Consumer主要用于向Broker请求Producer产生的消息,对其进行消费;对于RocketMQ,我们一定很好奇,如何实现分布式的Consumer消费,如何保证Consumer的顺序性,不重复性呢?
存在的问题:
1. 如果在集群模式中新增/减少 组(group) 消费者,可能会导致重复消费;原因是:
假设新增消费者前,ConsumerA正在消费MessageQueue-M,消费到第3个offset,
这个时候新增了ConsumerB,那么根据集群模式的AllocateMessageQueue的策略,可能MessageQueue-M被分配给了ConsumerB,这个时候ConsumerA由于消费的offset没有实时更新回去,会导致ConsumerB和ConsumerA之前的消费有重叠;
2. 消费失败怎么办?
3. 异常处理
4. 线程,Auto变量的使用
一、术语介绍
topic: 最细粒度的订阅单位,一个group可以订阅多个topic的消息
group: 组,一个组可以订阅多个topic
clientId: 一个服务(IP/机器)的标识,一个机器可以有多个group;同时,多个相同group的clientId组成一个集群,一起消费消息
messageQueue:消息队列,一个broker的一个topic有多个messageQueue
offset: 每一个消息队列里面还有偏移(commitOffset, offset)的区别,为什么有2个offset呢??
集群消费:
广播消费:
立即消费:
顺序消费:
消费位置:
offsetStore---------commitOffset:消费到的offset
PullRequest ------ offset的区别:拉取的位置
二、总体框架
三、数据结构
数据结构主要分为2个部分来讲解:
一部分是在MQClientInstance中进行统一管理的,不管是Consumer还是Producer,能够统一管理的部分都放在了这个区域;
还有一部分是在Consumer或Producer中区分管理的,比如各自订阅的MessageQueue,下面对这两个部分分别介绍;
-----------------------------------------------PartI:MQClientInstance---------------------------
1. TopicRouteData:
用于保存了所有的Queue信息,不管是consumer还是producer的
private String orderTopicC//brokerName:num count
private List&QueueData& queueD
private List&BrokerData& brokerD
private HashMap&String/* brokerAddr */, List&String&/* Filter Server */& filterServerT
2.QueueData:内部通过wirte或者read来区分queue属于Consumer(read)/Producer(write)
private String brokerN
private int readQueueN
private int writeQueueN
private int topicSynF
3.BrokerData:Broker的地址信息
private String brokerN
private HashMap&Long/* brokerId */, String/* broker address */& brokerA
4.&PullRequest:拉取的请求信息,包括所属组信息,要拉取的offset信息,Queue信息,消费进度信息
private String consumerG
private MessageQueue messageQ
private ProcessQueue processQ
private long nextO
5.&PullMessageService:拉取信息的服务,会不断遍历每一个PullRequest进行信息的拉取
private final LinkedBlockingQueue&PullRequest& pullRequestQueue = new LinkedBlockingQueue&PullRequest&();
private final MQClientInstance mQClientF
------------------------------------------------------------------Part II:区分consumer --------------------------------------------------------
1. TopicPublishInfo:这个是Producer使用的保存MessageQueue的数据结构
private boolean orderTopic =
private boolean haveTopicRouterInfo =
private List&MessageQueue& messageQueueList = new ArrayList&MessageQueue&();
private AtomicInteger sendWhichQueue = new AtomicInteger(0);
2. SubscriptionData:包装consumer的消费信息,比如topic,订阅的tags
public final static String SUB_ALL = &*&;
private boolean classFilterMode =
private String subS
private Set&String& tagsSet = new HashSet&String&();
private Set&Integer& codeSet = new HashSet&Integer&();
private long subVersion = System.currentTimeMillis();
3.RebalanceImpl
ConcurrentHashMap&String/* topic */, Set&MessageQueue&& topicSubscribeInfoTable
ConcurrentHashMap&String /* topic */, SubscriptionData& subscriptionInner
&MessageQueue, ProcessQueue& processQueueTable
4.MessageQueue
private String brokerN
private int queueId;
5. ProcessQueue
private final TreeMap&Long, MessageExt& msgTreeMap = new TreeMap&Long, MessageExt&();
private volatile long queueOffsetMax = 0L;
private final AtomicLong msgCount = new AtomicLong();
6.RemoteBrokerOffsetStore
private final MQClientInstance mQClientF
private final String groupN
private final AtomicLong storeTimesTotal = new AtomicLong(0);
private ConcurrentHashMap&MessageQueue, AtomicLong& offsetTable =
new ConcurrentHashMap&MessageQueue, AtomicLong&();
四、主要类管理(group, instance, topic)
4.1 DefaultMQPushConsumer(group):用于设置主要的参数,包括:组名,消费模式,消费offset,线程数目,批量拉去大小
4.2 DefaultMQPushConsumerImpl(group):包括RebalanceImpl,OffsetStore,AllocateStrategy
4.3 OffsetStore(group):有2种模式,集群模式和广播模式不同;第一种是:RemoteBrokerOffsetStore,第二种是LocalFileOffsetStore,它将会记录我们消费到的offset位置
4.4 RebalanceImpl(group):有2种模式,RebalancePushImpl,RebalancePullImpl,分别对应推拉2种模式的处理,它用于将所有的MessageQueue进行一个平均分配,然后进行消费;对于推的模式,会根据不同位置拉取;对于拉的模式,它的拉取位置永远是第0个;
4.5 PullMessageService:循环所有的PullRequest,不断调用pullMessage进行MessageQueue的拉取
4.6 RebalanceService:循环所有的Consumer,对所有的consumer调用doRebalance
4.7 AllocateMessageQueueStrategy:分配消息的策略,将所有的MessageQueue均分到各个instance上面
4.8 PullAPIWrapper
4.9 ConsumeMessageService:有2种模式,ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService,用于调用MessageListener进行具体消费
4.10 MessageListener:客户端实现的接口,用于业务逻辑处理
4.11 MQClientAPIImpl:用于网络连接处理
五、总体模块
Consumer主要分为以下几个模块:
1. Rebalance模块:
主要包含以下几个部分:
RebalanceImpl
AllocateMessageQueueStrategy
RebalanceService
新增PullRequest
主要工作如下:
用于将某个topic的mqSet按策略分配到各个消费者cidSet,解释一下各个术语:
mqSet:是可以消费的所有Queue,可以理解成一块大蛋糕;
cidSet:可以理解成该topic的所有消费者,吃这块蛋糕的人。
这里采用的策略是遍历每一个consumer,再遍历每一个consumer的topic,对每个topic调用rebalanceByTopic;这里的Average均分策略是获得所有的midSet和cidSet,然后将他们进行均分,按图说话:
A. midSet&cidSet
B.midSet & cidSet, 且midSet%cidSet!=0
C.midSet &= cidSet, 且midSet%cidSet=0
2. PullMessage模块:
主要包含以下几个部分:
PullMessageService
pullAPIWrapper
PullCallback
ConsumeMessageConcurrentlyService.processConsumeResult
ConsumeMessageConcurrentlyService.ConsumeRequest
主要工作如下:
遍历PullMessageService的pullRequestQueue,take每一个PullRequest,然后调用pullMessage进行消息的拉取.,拉取后调用PullCallback进行回调处理
3. RemoteBrokerOffsetStore模块
在offsetTable中维护了一个offset变量,对这个offset的操作有2种,第一种是操作RemoteBrokerOffsetStore里面的offsetTable来维护其本地offset;还有一种是persist,将这些变量固化到远程的broker中
3.1 updateConsumeOffsetToBroker
设置UpdateConsumerOffsetRequestHeader为头部,然后调用updateConsumerOffsetOneway,以UPDATE_CONSUMER_OFFSET为请求码,向broker服务器发送信息
3.2 设置removeOffset,将它从offsetTable里面移除
3.3 查询消费者序列long offset,queryConsumerOffset,QUERY_CONSUMER_OFFSET
4. Consumer模块:
这里和上面的PullMessage融合在一起处理,当pullMessage结束后,将会回调PullCallback。这里将调用consumeMessageService的submitConsumeRequest进行处理,而后更新offsetStore的消费位置等信息
5. update模块:
更新namesrv
更新topicRouteInfoFromServer:这里涉及到新增Subscribe
更新sendHeartbeat:注册consumer
更新persistAllConsumerSetInterval:更新offsetStore
更新线程池
6. 网络传输模块
MQClientAPIImpl
六、主要流程(Push+集群模式)
1. DefaultMQPushConsumer创建组&CID_001&
2. 调用subscribe,将会向rebalanceImpl中注册&topic,SubscriptionData&,用于后续的消息过滤
3. DefaultMQPushConsumerImpl.start()
3.1 copySubscription(): 将DefaultMQPushConsumer的subscribe信息复制到DefaultMQPushConsumerImpl里面
3.2 获取MQClientInstance
3.3 设置RebalanceImpl的信息
3.4 创建PullAIPWrapper
3.5 创建offsetStore,(BROADCATING)LocalFileOffsetStore,(CLUSTERING) RemoteBrokerOffsetStore
对应于,一个topic,对应了一个SubscriptionData,对应了很多的MessageQueue;
而每一个MessageQueue,又对应了ProcessQueue,ProcessQueue对应了每一个队列的消费进度
1.1 主要函数:lock, unlock,向函数给出的addr发出锁定,或者解锁mq的操作,以便于后续的消费
1.2 主要函数:doRebalance;遍历&String,SubscriptionData& subscriptionInner结构的每一个topic,调用rebalanceByTopic;
rebalanceByTopic:
1.2.1 如果是广播模式
1.2.2 如果是集群模式
1.2.2.1 首先得到topic对应的所有MessageQueue,mqAll,这个是消息队列
1.2.2.2 得到对应group下面所有的cidAll,这个是消费者队列
1.2.2.3 调用strategy.allocate得到该consumer要消费的Set&MessageQueue&allocateResultSet
1.2.2.4 调用updateProcessQueueTableRebalance(topic,allocateResultSet)来更新processQueueTable,
A.首先,遍历processQueueTable,找到其有,而allocateResultSet没有的,调用removeUnnecessaryMessageQueue将其删除;
B.其次,如果二者都有,但是在Push模式下,达到了pullExpired时间的,调用processQueueTable;
C. 遍历allocateResultSet,找到processQueueTable中没有的记录,将其加入到List&PullRequest&pullRequestList,同时将processQueueTable.put(mq, pullRequest.getProcessQueue())
D. 将上述新增的List&PullRequest&作为参数,调用dispatchPullRequest(pullRequestList);
未完待续,上述2个函数
removeUnnecessaryMessageQueue
dispatchPullRequest(pullRequestList);
七、一些实践阅读心得
1. HeartBeat:心跳需要进行加锁,因为心跳相当于注册,而unregister的时候相当于注销,加锁是防止在注销后,再进行注册,导致出问题,这里的临界变量是consumerTable
2. volatile:多线程操作某个变量时,使用该关键字可以防止由于编译器优化,导致从寄存器中读,而不是实时从内存读取
3. ConcurrentHashMap:分段加锁,保证线程安全
4. AtomicInteger:原子自增自减
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:1025769次
积分:4986
积分:4986
排名:第6003名
原创:65篇
转载:21篇
评论:148条
(1)(6)(1)(1)(2)(3)(1)(1)(1)(2)(1)(1)(2)(6)(1)(2)(3)(1)(10)(1)(4)(10)(1)(1)(2)(4)(1)(2)(2)(4)(2)(5)(3)(1)
(window.slotbydup = window.slotbydup || []).push({
id: '4740887',
container: s,
size: '250,250',
display: 'inlay-fix'一&机器部署
1、机器组成
7台机器,均为16G内存
每台服务器均有4个CPU,2核
2、运行环境配置
3、刷盘方式
每台机器master机器均采用异步刷盘方式
二&性能评测
1、评测目的
&&&测试consumer端的集群模式消费。
2、评测指标
&&&&(1)topic关联的readQueueNums读队列数值
&&&&(2)属于同一个consumerGroup的consumer个数
&&&&(3)所有consumer消费消息的总条数
&&&&(4)每个consumer消费消息,读取的队列Id
&&&&(5)部署集群中的master机器台数
3、评测逻辑
&&如果有&5&个队列,2&个&consumer,那么第一个&Consumer&消费&3&个队列,第二&consumer&消费&2&个队列。
&&&&如果Consumer&超过队列数量,那么多余的Consumer&将不能消费消息。
&&&&队列数量、Consumer数量、Replance结果如下表
Consumer数量
Reblance结果
C11-C20:0
4、评测过程
&&&&&&&(1)发送消息前,查看服务端的topic关联的队列个数。
&&&&&&&(2)producer端向topic名称为“clusterTopicTest”队列发送消息,定为20条,发送消息后并记录每条消息的msgId、queueId、offset等基本信息。
&&&&(3)配置consumer端,日志记录每个consumer端的instanceName、消息的offset、所消费队列queueId、消息的body、消息msgId,以及每个consumer消费消息的总条数。
&&&&(4)每次消费完之后,统计所有consumer端消费消息的总数,判断消息是否有丢失。
&&&&(5)每次消费完之后,分析每个consumer消费队列的queueId,判断队列是否达到了负载均衡。
&&&&(6)记topic的队列数为A,记consumer个数为B,做如下调整:
&&&&第一组:保持A不变,增加B,使得A&&&B,然后重复步骤1-5。
&&&&第二组:保持A不变,增加B,使得A&=&B,然后重复步骤1-5。
&&&&第三组:保持A不变,增加B,使得A&=&2&*&B,然后重复步骤1-5。
&&&&第三组:增加A,保持B不变,使得2&*&A&=&B,然后重复步骤1-5。
&&&&第五组:减少A,保持B不变,使得2&*&A&&&B,然后重复步骤1-5。
&&&&(7)注意:需要先启动所有consumer端,在启动producer端发送消息,这样才能在每个consumer端同时看到消息的消费情况,因为消息被消费的速率是很快的。
&&&&(8)注意:master机器个数,每台master机器上指定topic的队列数,两数值相乘,才是最终的rocketmq做负载均衡的队列个数。&(步骤6的master机器个数为2)
&&&&第一组,总发送条数20条
Consumer数量
Reblance结果
Reblance结果
Master机器
&&&&3个consumer消费消息总条数:8+3+3+3+3&=&20条
&&&&2台master机器,每个topic有8个队列,&期望的队列个数&2*8=16个,实际的队列个数&4+3+3+3+3&=&16个,可以看出期望、实际的queue分布是相同的结果。
&&&&producer的发送记录:
&&&&consumer1的消费记录:
&&&&consumer2的消费记录:
&&&&consumer3的消费记录:
&&&&consumer4的消费记录:
&&&&consumer5的消费记录:
&&&&第二组,总发送条数20条
Consumer数量
Reblance结果
Reblance结果
Master机器
&&&&8个consumer消费消息总条数:8+3+3+3+3&=&20条
&&&&2台master机器,每个topic有8个队列,&期望的队列个数&2*8=16个,实际的队列个数&2+2+2+2+2+2+2+2&=&16个,可以看出期望、实际的queue分布是相同的结果。
&&&&8个consumer消费消息总条数:4+2+2+2+2+2+4+2+2&=&20条
&&&&producer的发送记录:
&&&&consumer1的消费记录:
&&&&consumer2的消费记录:
&&&&consumer3的消费记录:
&&&&consumer4的消费记录:
&&&&consumer5的消费记录:
&&&&consumer6的消费记录:
&&&&consumer7的消费记录:
&&&&consumer8的消费记录:
第三组,总发送条数20条
Consumer数量
Reblance结果
Reblance结果
Master机器
&&&&8个consumer消费消息总条数:8+3+3+3+3&=&20条
&&&&2台master机器,每个topic有8个队列,&期望的队列个数&2*8=16个,实际的队列个数&4+4+4+4&=&16个,可以看出期望、实际的queue分布是相同的结果。
&&&&8个consumer消费消息总条数:8+4+4+4&=&20条
&&&&producer的发送记录:
&&&&consumer1的消费记录:
&&&&consumer2的消费记录:
&&&&consumer3的消费记录:
&&&&consumer4的消费记录:
第四组,总发送条数20条
Consumer数量
Reblance结果
Reblance结果
Master机器
&&&&8个consumer消费消息总条数:8+3+3+3+3&=&20条
&&&&2台master机器,每个topic有8个队列,&期望的队列个数&2*4=8个,实际的队列个数&1+1+1+1+1+1+1+1=&8个,可以看出期望、实际的queue分布是相同的结果。
&&&&8个consumer消费消息总条数:3+3+2+2+2+2+3+3&=&20条
&&&&producer的发送记录:
&&&&consumer1的消费记录:
&&&&consumer2的消费记录:
&&&&consumer3的消费记录:
&&&&consumer4的消费记录:
&&&&consumer5的消费记录:
&&&&consumer6的消费记录:
&&&&consumer7的消费记录:
&&&&consumer8的消费记录:
第五组,总发送条数20条
Consumer数量
Reblance结果
Reblance结果
Master机器
&&&&8个consumer消费消息总条数:8+3+3+3+3&=&20条
&&&&2台master机器,每个topic有8个队列,&期望的队列个数&2*3=6个,实际的队列个数&1+1+1+1+1+1+0&=&6个,可以看出期望、实际的queue分布是相同的结果。
&&&&8个consumer消费消息总条数:3+4+3+3+4+3+0&=&20条
&&&&producer的发送记录:
&&&&consumer1的消费记录:
&&&&consumer2的消费记录:
&&&&consumer3的消费记录:
&&&&consumer4的消费记录:
&&&&consumer5的消费记录:
&&&&consumer6的消费记录:
&&&&consumer7的消费记录:
二&评测结果
&&&&1、rocketmq集群消费模式,订阅消息的确达到了队列负载均衡,与这种负载均衡消费相关的因素有:&master机器个数、&特定topic的queue个数,这两个数值相乘,才是rocketmq最终计算队列的总数。
&&&&2、rocketmq的集群消费能力,保证消息准确性,完整性,所有被消费的消息总数与producer端发送的消息总数是一致的,不存在消息丢弃的情况。
&&&&3、分析consumer消费日志,说明每条消息在相同consumerGroup组的不同consumer端中仅仅只会被消费一次。
&&&&4、在集群消费模式下,如果consumer的总数,超过了队列总数,那么多余的consumer端将不能消费消息。
本文已收录于以下专栏:
相关文章推荐
Consumer-集群Push模式-简介:
0、背景介绍
         Consumer主要用于向Broker请求Producer产生的消息,对其进行消费;对于RocketMQ,我们一定很好奇,如...
C端先启动和C端后启动
消息重试机制:P端和C端2中重试
Broker集群部署方式主要有以下几种:(Slave 不可写,但可读)
我们知道,在早期的RocketMQ版本中,是有依赖ZK的。而现在的版本中,是去掉了对ZK的依赖,转而使用自己开发的NameSrv。并且这个NameSrv是无状态的,你可以随意的部署多台,其代码也非常简...
dubbo连接池爆满
标签: 连接池爆dubbo
08:45 2259人阅读 评论(1) 举报
 分类:
故障处理(1) 
版权声明...
一 机器部署
1、机器组成
7台机器,均为16G内存
每台服务器均有4个CPU,2核
2、运行环境配置
3、刷盘方式
每台机器master机器均采用异步刷盘方式
说到分布式事务,就会谈到那个经典的”账号转账”问题:2个账号,分布处于2个不同的DB,或者说2个不同的子系统里面,A要扣钱,B要加钱,如何保证原子性?
一般的思路都是通过消息中间件来实现“最终一...
早就听说过rocketmq,一直没时间去研究它
最近打算从hello world开始,,深入的学习rocketmq 
0.git下载源码本地编译
git地址  https://github.c...
rocketmq  广播消息
注册一个MessageListenerConcurrently 
* Licensed to the Apache Software Foundatio...
说到分布式事务,就会谈到那个经典的”账号转账”问题:2个账号,分布处于2个不同的DB,或者说2个不同的子系统里面,A要扣钱,B要加钱,如何保证原子性?一般的思路都是通过消息中间件来实现“最终一致性”:...
他的最新文章
讲师:刘文志
讲师:陈伟
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)

我要回帖

更多关于 rocketmq topic 的文章

 

随机推荐