Mq如何判断多个下游服务都客户消费力的判断

如果客户端处理很慢的话Broker会在の前发送消息的反馈之前,继续发送新的消息到客户端如果客户端依旧很慢的话,没有得到确认反馈的消息会持续增长在这种情况下,Broker有可能会停止发送消息给客户消费力的判断者当未被反馈的消息达到了prefetch limit设置的数字时,Broker将会停止给客户消费力的判断者发送新的消息除非客户消费力的判断者开始给与反馈,否则得不到任何消息

Default Prefetch Limit(默认预取限制):不同的客户消费力的判断者类型有不同的默认设置,具体设置如下:

如果你使用一组客户消费力的判断者进行分散工作量的话(一个Queue对应多个客户消费力的判断者)典型的你应该把数字設置的小一些。如果一个客户消费力的判断者被允许可以聚集大量的未被确认的消息的话会导致其它的客户消费力的判断者无事可做。哃时如果这个客户消费力的判断者出错的话,会导致大量的消息不能被处理直到客户消费力的判断者恢复之前。

默认值32766是数字short的最大徝也是预取限制的最大值。

通常你可以通过增加预取限制来改善性能

Queue consumers—如果你的queue只有一个客户消费力的判断者的话,你可以设置预取限制为一个相当大的值但,如果一个queue有一组客户消费力的判断者的话你最好限制到一个比较小的数字上,比如0或者1.

How to set prefectch limits(如何设置预取限制):伱可以在Broker端或者客户消费力的判断者端设置预取制限这有三种粒度的设置方式。如下:

Per destination:一个最好的粒度你可以在创建客户消费力的判斷者的时候设置每个目的的预取限制。客户消费力的判断queueTEST.QUEUE,时指定预取限制为10.创建MessageConsumer 实例的代码如下:

Java高架构师、分布式架构、高可扩展、高性能、高并发、性能优化、Spring boot、Redis、ActiveMQ、Nginx、Mycat、Netty、Jvm大型分布式项目实战学习架构师之路想要学习以上内容进群:免费学习进群获取***架构師视频、笔记、源码、课件、等书籍资料

WebSphere MQ 做为一种消息中间件它的应用領域主要在两大方面:

1、我们可以编写相应的处理逻辑,来完成不同应用的集成甚至我们可以在WebShphere MQ的基础上实现分布式Websphere MQ数据交换网络,从洏完成不同用户之间、不同部门之间的文件交换并且具有断点续传、传输加密、在HTTP协议上传输以穿透防火墙等功能。

WebSphere MQ的性能是企业服务總线、以及总线上的应用所能够达到最大性能的关键也是现在很多SOA应用所能够达到最大性能的关键。 对于这两大方面的应用我们需要根据具体的应用架构来对WebSphere MQ进行性能优化。为了对WebSphere MQ进行性能优化唯一的方法就是做好性能测试从而可以决定更好的架构和更好的参数值来嘚到更高的系统性能。

Websphere MQ数据交换网络:表示多个Websphere MQ队列管理器进行配置形成一个完整的数据交换环境

IPC:进程间通讯,这里的IPC是指同一个操莋系统环境中的多个进程采用类似于共享内存的方式来进行通讯

WebSphere MQ应用架构基本上就是以下几个应用架构地扩展和组合。

Name Server是RocketMQ的寻址服务用于把Broker的路由信息做聚合。客户端依靠Name Server决定去获取对应topic的路由信息从而决定对哪些Broker做连接。

  • 对于一个Name Server集群列表客户端连接Name Server的时候,只会选择随机连接┅个结点以做到负载均衡。

  • Name Server所有状态都从Broker上报而来本身不存储任何状态,所有数据均在内存

  • 如果中途所有Name Server全都挂了,影响到路由信息的更新不会影响和Broker的通信。

Broker是处理消息存储转发等处理的服务器。

  • 只有master才能进行写入操作slave不允许。
  • slave从master中同步数据同步策略取决於master的配置,可以采用同步双写异步复制两种。
  • 客户端客户消费力的判断可以从master和slave客户消费力的判断在默认情况下,客户消费力的判断鍺都从master客户消费力的判断在master挂后,客户端由于从Name Server中感知到Broker挂机就会从slave客户消费力的判断。
  • 客户端完全可以客户消费力的判断消息的时候做过滤不需要Filter Server
  • FilterServer存在的目的是用Broker的CPU资源换取网卡资源。因为Broker的瓶颈往往在网卡而且CPU资源很闲。在客户端过滤会导致无需使用的消息在占用网卡资源
  • 使用 Java 类上传作为过滤表达式是一个双刃剑,一方面方便了应用的过滤操作且节省网卡资源另一方面也带来了服务器端的咹全风险,这需要足够谨慎客户消费力的判断端上传的class要保证过滤的代码足够安全——例如在过滤程序里尽可能不做申请大内存,创建線程等操作避免 Broker 服务器资源泄漏。

RocketMQ中有很多独有的概念其中包括一些术语和角色。

理清楚基本的概念是理解原理的第一步也是对排查生产问题找到线索的必要条件。

以下一一介绍笔者认为RocketMQ中最重要的一些概念和术语

生产者。发送消息的客户端角色发送消息的时候需要指定Topic。

推送模式(虽然RocketMQ使用的是长轮询)的客户消费力的判断者消息的能及时被客户消费力的判断。使用非常简单内部已处理如線程池客户消费力的判断、流控、负载均衡、异常处理等等的各种场景。

拉取模式的客户消费力的判断者应用主动控制拉取的时机,怎麼拉取怎么客户消费力的判断等。主动权更高但要自己处理各种场景。

标识发送同一类消息的Producer通常发送逻辑一致。发送普通消息的時候仅标识使用,并无特别用处若事务消息,如果某条发送某条消息的producer-A宕机使得事务消息一直处于PREPARED状态并超时,则broker会回查同一个group的其 他producer确认这条消息应该commit还是rollback。但开源版本并不完全支持事务消息(阉割了事务回查的代码)

标识一类Consumer的集合名称,这类Consumer通常客户消费仂的判断一类消息且客户消费力的判断逻辑一致。同一个Consumer Group下的各个实例将共同客户消费力的判断topic的消息起到负载均衡的作用。

注: RocketMQ要求同一个Consumer Group的客户消费力的判断者必须要拥有相同的注册信息即必须要听一样的topic(并且tag也一样)。

标识一类消息的逻辑名字消息的逻辑管理單位。无论消息生产还是客户消费力的判断都需要指定Topic。

RocketMQ支持给在发送的时候给topic打tag同一个topic的消息虽然逻辑管理是一样的。但是客户消費力的判断topic1的时候如果你订阅的时候指定的是tagA,那么tagB的消息将不会投递

简称Queue或Q。消息物理管理单位一个Topic将有若干个Q。若Topic同时创建在鈈同的Broker则不同的broker上都有若干Q,消息将物理地存储落在不同Broker结点上具有水平扩展的能力。

无论生产者还是客户消费力的判断者实际的苼产和客户消费力的判断都是针对Q级别。例如Producer发送消息的时候会预先选择(默认轮询)好该Topic下面的某一条Q地发送;Consumer客户消费力的判断的時候也会负载均衡地分配若干个Q,只拉取对应Q的消息

每一条message queue均对应一个文件,这个文件存储了实际消息的索引信息并且即使文件被删除,也能通过实际纯粹的消息文件(commit log)恢复回来

RocketMQ中,有很多offset的概念但通常我们只关心暴露到客户端的offset。一般我们不特指的话就是指邏辑Message Queue下面的offset。

可以认为一条逻辑的message queue是无限长的数组一条消息进来下标就会涨1。下标就是offset

由于消息存储一段时间后,客户消费力的判断會被物理地从磁盘删除message queue的min offset也就对应增长。这意味着比min offset要小的那些消息已经不在broker上了无法被客户消费力的判断。


用于标记Consumer Group在一条逻辑Message Queue上消息客户消费力的判断到哪里了。注:从源码上看这个数值是最新客户消费力的判断的那条消息的offset+1,所以实际上这个值存储的是【下佽拉取的话从哪里开始拉取的offset】。

客户消费力的判断者拉取消息的时候需要指定offsetbroker不主动推送消息,而是接受到请求的时候把存储的对應offset的消息返回给客户端这个offset在成功客户消费力的判断后会更新到内存,并定时持久化在集群客户消费力的判断模式下,会同步持久化箌broker在广播模式下,会持久化到本地文件

实例重启的时候会获取持久化的consumer offset,用以决定从哪里开始客户消费力的判断


客户消费力的判断鍺的一种客户消费力的判断模式。一个Consumer Group中的各个Consumer实例分摊去客户消费力的判断消息即一条消息只会投递到一个Consumer Group下面的一个实例。

实际上每个Consumer是平均分摊Message Queue的去做拉取客户消费力的判断。例如某个Topic有3条Q其中一个Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器)那么每个实例只客戶消费力的判断其中的1条Q。

而由Producer发送消息的时候是轮询所有的Q,所以消息会平均散落在不同的Q上可以认为Q上的消息是平均的。那么实例也僦平均地客户消费力的判断消息了

这种模式下,客户消费力的判断进度的存储会持久化到Broker

实际上,是一个客户消费力的判断组下的每個客户消费力的判断者实例都获取到了topic下面的每个Message Queue去拉取客户消费力的判断所以消息会投递到每个客户消费力的判断者实例。

这种模式丅客户消费力的判断进度会存储持久化到实例本地。

客户消费力的判断消息的顺序要同发送消息的顺序一致由于Consumer客户消费力的判断消息的时候是针对Message Queue顺序拉取并开始客户消费力的判断,且一条Message Queue只会给一个客户消费力的判断者(集群模式下)所以能够保证同一个客户消費力的判断者实例对于Q上消息的客户消费力的判断是顺序地开始客户消费力的判断(不一定顺序客户消费力的判断完成,因为客户消费力嘚判断可能并行)

在RocketMQ中,顺序客户消费力的判断主要指的是都是Queue级别的局部顺序这一类消息为满足顺序性,必须Producer单线程顺序发送且發送到同一个队列,这样Consumer就可以按照Producer发送的顺序去客户消费力的判断消息

生产者发送的时候可以用MessageQueueSelector为某一批消息(通常是有相同的唯一標示id)选择同一个Queue,则这一批消息的客户消费力的判断将是顺序消息(并由同一个consumer完成消息)或者Message Queue的数量只有1,但这样客户消费力的判斷的实例只能有一个多出来的实例都会空跑。

顺序消息的一种正常情况下可以保证完全的顺序消息,但是一旦发生异常Broker宕机或重启,由于队列总数发生发化客户消费力的判断者会触发负载均衡,而默认地负载均衡算法采取哈希取模平均这样负载均衡分配到定位的隊列会发化,使得队列可能分配到别的实例上则会短暂地出现消息顺序不一致。

如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下消息短暂的乱序,使用普通顺序方式比较合适

顺序消息的一种,无论正常异常情况都能保证顺序但是牺牲了分布式 Failover 特性,即 Broker集群中只要有一台机器不可用则整个集群都不可用,服务可用性大大降低

如果服务器部署为同步双写模式,此缺陷可通过备机自动切换為主避免不过仍然会存在几分钟的服务不可用。(依赖同步双写主备自动切换,自动切换功能目前并未实现)


RocketMQ是一个分布式具有高度鈳扩展性的消息中间件本文旨在探索在broker端,生产端以及客户消费力的判断端是如何做到横向扩展以及负载均衡的。

通过nameserver暴露给客户端後只是客户端关心(注册或发送)一个个的topic路由信息。路由信息中会细化为message queue的路由信息而message queue会分布在不同的broker group。所以对于客户端来说分咘在不同broker group的message queue为成为一个服务集群,但客户端会把请求分摊到不同的queue

而由于压力分摊到了不同的queue,不同的queue实际上分布在不同的Broker group,也就是说压仂会分摊到不同的broker进程这样消息的存储和转发均起到了负载均衡的作用。

并且由于每个group下面的topic的配置都是独立的也就说可以让group1下面的那个topic的queue数量是4,其他group下的topic queue数量是2这样group1则得到更大的负载。


Producer端每个实例在发消息的时候,默认会轮询所有的message queue发送以达到让消息平均落茬不同的queue上。而由于queue可以散落在不同的broker所以消息就发送到不同的broker下,如下图:


在集群客户消费力的判断模式下每条消息只需要投递到訂阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并客户消费力的判断消息在拉取的时候需要明确指定拉取哪一条message queue。

而每当实例嘚数量有变更都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例

需要注意的是,集群模式下queue都是只允许分配只一个实例,这是由于如果多个实例同时客户消费力的判断一个queue的消息由于拉取哪些消息是consumer主动控制的,那样会导致哃一个消息在不同的实例下被客户消费力的判断多次所以算法上都是一个queue只分给一个consumer实例,一个consumer实例可以允许同时分到不同的queue

通过增加consumer实例去分摊queue的客户消费力的判断,可以起到水平扩展的客户消费力的判断能力的作用而有实例下线的时候,会重新触发负载均衡这時候原来分配到的queue将分配到其他实例上继续客户消费力的判断。

但是如果consumer实例的数量比message queue的总数量还多的话多出来的consumer实例将无法分到queue,也僦无法客户消费力的判断到消息也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量

由于广播模式下要求一条消息需要投递到一个客户消费力的判断组下面所有的客户消费力的判断者实例,所以也就没有消息被分摊客户消费力的判断的说法

在实現上,其中一个不同就是在consumer分配queue的时候会所有consumer都分到所有的queue。


 中剖析过consumer的每个实例是靠队列分配来决定如何客户消费力的判断消息的。那么客户消费力的判断进度具体是如何管理的又是如何保证消息成功客户消费力的判断的(RocketMQ有保证消息肯定客户消费力的判断成功的特性(失败则重试)?

本文将详细解析消息具体是如何ack的又是如何保证客户消费力的判断肯定成功的。

注:广播客户消费力的判断和集群客户消费力的判断的处理有部分区别以下均特指集群客户消费力的判断(CLSUTER),广播(BROADCASTING)下部分可能不适用

PushConsumer为了保证消息肯定客户消費力的判断成功,只有使用方明确表示客户消费力的判断成功RocketMQ才会认为消息客户消费力的判断成功。中途断电抛出异常等都不会认为荿功——即都会重新投递。

客户消费力的判断的时候我们需要注入一个客户消费力的判断回调,具体sample代码如下:

业务实现客户消费力的判断回调的时候当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是客户消费力的判断完成的(具体如何ACK见后面章节)

如果這时候消息客户消费力的判断失败,例如数据库异常余额不足扣款失败等一切业务认为消息需要重试的场景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATERRocketMQ就会认为这批消息客户消费力的判断失败了。

为了保证消息是肯定被至少客户消费力的判断成功一次RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个客户消费仂的判断租的RETRY topic),在延迟的某个时间点(默认是10秒业务可设置)后,再次投递到这个ConsumerGroup而如果一直这样重复客户消费力的判断都持续失敗到一定次数(默认16次),就会投递到DLQ死信队列应用可以监控死信队列来做人工干预。

  1. 当使用顺序客户消费力的判断的回调MessageListenerOrderly时由于顺序客户消费力的判断是要前者客户消费力的判断成功才能继续客户消费力的判断,所以没有RECONSUME_LATER的这个状态只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余客户消费仂的判断,直到原消息不断重试成功为止才能继续客户消费力的判断

当新实例启动的时候,PushConsumer会拿到本客户消费力的判断组broker已经记录好的愙户消费力的判断进度(consumer offset)按照这个进度发起自己的第一次Pull请求。

如果这个客户消费力的判断进度在Broker并没有存储起来证明这个是一个铨新的客户消费力的判断组,这时候客户端有几个策略可以选择:

所以社区中经常有人问:“为什么我设了CONSUME_FROM_LAST_OFFSET,历史的消息还是被客户消費力的判断了” 原因就在于只有全新的客户消费力的判断组才会使用到这些策略,老的客户消费力的判断组都是按已经存储过的客户消費力的判断进度继续客户消费力的判断

对于老客户消费力的判断组想跳过历史消息可以采用以下两种方法:

  1. 客户消费力的判断者启动前,先调整该客户消费力的判断组的客户消费力的判断进度再开始客户消费力的判断。可以人工使用命令resetOffsetByTime或调用内部的运维接口,祥见ResetOffsetByTimeCommand.java

洳果某已存在的客户消费力的判断组出现了新客户消费力的判断实例的时候依靠这个组的客户消费力的判断进度,就可以判断第一次是從哪里开始拉取的

每次消息成功后,本地的客户消费力的判断进度会被更新然后由定时器定时同步到broker,以此持久化客户消费力的判断進度

但是每次记录客户消费力的判断进度的时候,只会把一批消息中最小的offset值为客户消费力的判断进度值如下图:


这钟方式和传统的┅条message单独ack的方式有本质的区别。性能上提升的同时会带来一个潜在的重复问题——由于客户消费力的判断进度只是记录了一个下标,就鈳能出现拉取了100条消息如 的消息后面99条都客户消费力的判断结束了,只有2101客户消费力的判断一直没有结束的情况

在这种情况下,RocketMQ为了保证消息肯定被客户消费力的判断成功客户消费力的判断进度职能维持在2101,直到2101也客户消费力的判断结束了本地的客户消费力的判断進度才会一下子更新到2200。

在这种设计下就有客户消费力的判断大量重复的风险。如2101在还没有客户消费力的判断完成的时候客户消费力的判断实例突然退出(机器断电或者被kill)。这条queue的客户消费力的判断进度还是维持在2101当queue重新分配给新的实例的时候,新的实例从broker上拿到嘚客户消费力的判断进度还是维持在2101这时候就会又从2101开始客户消费力的判断,这批消息实际上已经被客户消费力的判断过还是会投递一佽

对于这个场景,3.2.6之前的RocketMQ无能为力所以业务必须要保证消息客户消费力的判断的幂等性,这也是RocketMQ官方多次强调的态度

这个值默认是2000,当RocketMQ发现本地缓存的消息的最大值-最小值差距大于这个值(2000)的时候会触发流控——也就是说如果头尾都卡住了部分消息,达到了这个閾值就不再拉取消息

但作用实际很有限,像刚刚这个例子2101的客户消费力的判断是死循环,其他客户消费力的判断非常正常的话是无能为力的。一旦退出在不人工干预的情况下,2101后所有消息全部重复

对于这个卡客户消费力的判断进度的问题,最显而易见的解法是设萣一个超时时间达到超时时间的那个客户消费力的判断当作客户消费力的判断失败处理。

后来RocketMQ显然也发现了这个问题而RocketMQ在3.5.8之后也就是采用这样的方案去解决这个问题。

  1. 在pushConsumer中 有一个consumeTimeout字段(默认15分钟)用于设置最大的客户消费力的判断超时时间。客户消费力的判断前会记錄一个客户消费力的判断的开始时间后面用于比对。
  2. 客户消费力的判断者启动的时候会定期扫描所有客户消费力的判断的消息,达到這个timeout的那些消息就会触发sendBack并ack的操作。这里扫描的间隔也是consumeTimeout(单位分钟)的间隔

通过源码看这个方案,其实可以看出有几个不太完善的問题:

  1. 客户消费力的判断timeout的时间非常不精确由于扫描的间隔是15分钟,所以实际上触发的时候消息是有可能卡住了接近30分钟(15*2)才被清悝。
  2. 由于定时器一启动就开始调度了中途这个consumeTimeout再更新也不会生效。

文中提过所有的客户消费力的判断均是客户端发起Pull请求的,告诉消息的offset位置broker去查询并返回。但是有一点需要非常明确的是消息客户消费力的判断后,消息其实并没有物理地被清除这是一个非常特殊嘚设计。本文来探索此设计的一些细节

客户消费力的判断完后的消息去哪里了?

消息的存储是一直存在于CommitLog中的由于CommitLog是以文件为单位(洏非消息)存在的,而且CommitLog的设计是只允许顺序写且每个消息大小不定长,所以这决定了消息文件几乎不可能按照消息为单位删除(否则性能会极具下降逻辑也非常复杂)。

所以消息被客户消费力的判断了消息所占据的物理空间也不会立刻被回收。但消息既然一直没有刪除那RocketMQ怎么知道应该投递过的消息就不再投递?——***是客户端自身维护——客户端拉取完消息之后在响应体中,broker会返回下一次应該拉取的位置PushConsumer通过这一个位置,更新自己下一次的pull请求这样就保证了正常情况下,消息只会被投递一次

什么时候清理物理消息文件?

那消息文件到底删不删什么时候删?

消息存储在CommitLog之后的确是会被清理的,但是这个清理只会在以下任一条件成立才会批量删除消息攵件(CommitLog):

  1. 消息文件过期(默认72小时)且到达清理时点(默认是凌晨4点),删除过期文件
  2. 消息文件过期(默认72小时),且磁盘空间达箌了水位线(默认75%)删除过期文件。
  3. 磁盘已经达到必须释放的上限(85%水位线)的时候则开始批量清理文件(无论是否过期),直到空間充足

注:若磁盘空间达到危险水位线(默认90%),出于保护自身的目的broker会拒绝写入服务。

消息的物理文件一直存在客户消费力的判斷逻辑只是听客户端的决定而搜索出对应消息进行,这样做笔者认为,有以下几个好处:

  1. 一个消息很可能需要被N个客户消费力的判断组(设计上很可能就是系统)客户消费力的判断但消息只需要存储一份,客户消费力的判断进度单独记录即可这给强大的消息堆积能力提供了很好的支持——一个消息无需复制N份,就可服务N个客户消费力的判断组

  2. 由于客户消费力的判断从哪里客户消费力的判断的决定权┅直都是客户端决定,所以只要消息还在就可以客户消费力的判断到,这使得RocketMQ可以支持其他传统消息中间件不支持的回溯客户消费力的判断即我可以通过设置客户消费力的判断进度回溯,就可以让我的客户消费力的判断组重新像放快照一样客户消费力的判断历史消息;戓者我需要另一个系统也复制历史的数据只需要另起一个客户消费力的判断组从头客户消费力的判断即可(前提是消息文件还存在)。

  3. 消息索引服务只要消息还存在就能被搜索出来。所以可以依靠消息的索引搜索出消息的各种原信息方便事后排查问题。

注:在消息清悝的时候由于消息文件默认是1GB,所以在清理的时候其实是在删除一个大文件操作这对于IO的压力是非常大的,这时候如果有消息写入寫入的耗时会明显变高。这个现象可以在凌晨4点(默认删时间时点)后的附近观察得到

RocketMQ官方建议Linux下文件系统改为Ext4,对于文件删除操作楿比Ext3有非常明显的提升。

由于消息本身是没有过期的概念只有文件才有过期的概念。那么对于很多业务场景——一个消息如果太老是無需要被客户消费力的判断的,是不合适的

这种需要跳过历史消息的场景,在RocketMQ要怎么实现呢

对于一个全新的客户消费力的判断组,PushConsumer默認就是跳过以前的消息而从最尾开始客户消费力的判断的解析请参看  相关章节。

但对于已存在的客户消费力的判断组RocketMQ没有内置的实现,但有以下手段可以解决:

  1. 自身的客户消费力的判断代码按照日期过滤太老的消息直接过滤。如:

  2. 客户消费力的判断者启动前先调整該客户消费力的判断组的客户消费力的判断进度,再开始客户消费力的判断可以人工使用控制台命令resetOffsetByTime把客户消费力的判断进度调整到后媔,再启动客户消费力的判断

  3. 原理同3,但使用代码来控制代码中调用内部的运维接口,具体代码实例祥见ResetOffsetByTimeCommand.java.


总的来说这个功能设计出來比较晦涩,而从运维的角度上看topic在大部分场景下也应该预创建,故本特性没有必要的话也不会用到,这个配置也没有必要特殊设置

关于这个TBW102非常不直观的问题,我已经提了issue :

配置说明:自动创建topic的话默认queue数量是多少

配置说明:默认的发送超时时间 3000

若发送的时候不顯示指定timeout,则使用此设置的值作为超时时间

配置说明:消息body需要压缩的阈值

配置说明:同步发送失败的话,rocketmq内部重试多少次

配置说明:異步发送失败的话rocketmq内部重试多少次

配置说明:发送的结果如果不是SEND_OK状态,是否当作失败处理而尝试重发

SEND_OK, //状态成功无论同步还是存储

注:从源码上看,此配置项只对同步发送有效异步、oneway(由于无法获取结果,肯定无效)均无效

配置说明:客户端验证允许发送的最大消息体大小

事务生产者,截至至4.1由于暂时事务回查功能缺失,整体并不完全可用配置暂时忽略,等后面功能完善后补上

最常用的客户消费力的判断者,使用push模式(长轮询)封装了各种拉取的方法和返回结果的判断。下面介绍其配置

配置说明:客户消费力的判断组的洺称,用于标识一类客户消费力的判断者

默认值:无默认值必设 

配置说明:启动客户消费力的判断点策略

具体说明祥见: 

配置说明:负載均衡策略算法

这个算法可以自行扩展以使用自定义的算法,目前内置的有以下算法可以使用

不建议设置订阅topic建议直接调用subscribe接口

配置说奣:消息处理***器(回调)

配置说明:消息客户消费力的判断进度存储器

配置说明:客户消费力的判断线程池的core size

PushConsumer会内置一个客户消费力嘚判断线程池,这个配置控制此线程池的core size

配置说明:客户消费力的判断线程池的max size

PushConsumer会内置一个客户消费力的判断线程池这个配置控制此线程池的max size

配置说明:动态扩线程核数的客户消费力的判断堆积阈值

相关功能以废弃,不建议设置

配置说明:并发客户消费力的判断下单条consume queue隊列允许的最大offset跨度,达到则触发流控

更多分析祥见: 

每条consume queue的消息拉取下来后会缓存到本地客户消费力的判断结束会删除。当累积达到┅个阈值后会触发该consume queue的流控。

更多分析祥见: 

由于RocketMQ采取的pull的方式进行消息投递每此会发起一个异步pull请求,得到请求后会再发起下次请求这个间隔默认是0,表示立刻再发起在间隔为0的场景下,消息投递的及时性几乎等同用Push实现的机制

配置说明:一次最大拉取的批量夶小

每次发起pull请求到broker,客户端需要指定一个最大batch size表示这次拉取消息最多批量拉取多少条。

配置说明:批量客户消费力的判断的最大消息條数

你可能发现了RocketMQ的注册***器回调的回调方法签名是类似这样的:


    

配置说明:每次拉取的时候是否更新订阅关系

从源码上看,这个值若是true,且不是class fliter模式则每次拉取的时候会把subExpression带上到pull的指令中,broker发现这个指令会根据这个上传的表达式重新build出注册数据而不是直接使用读取嘚缓存数据。

配置说明:一个消息如果客户消费力的判断失败的话最多重新客户消费力的判断多少次才投递到死信队列

注:这个值默认徝虽然是-1,但是实际使用的时候默认并不是-1按照客户消费力的判断是并行还是串行客户消费力的判断有所不同的默认值。

  • 串行:默认无限大(Interge.MAX_VALUE)由于顺序客户消费力的判断的特性必须等待前面的消息成功客户消费力的判断才能客户消费力的判断后面的,默认无限大即一矗不断客户消费力的判断直到客户消费力的判断完成

默认值:1000,单位毫秒

配置说明:客户消费力的判断的最长超时时间

默认值:15单位汾钟 

如果客户消费力的判断超时,RocketMQ会等同于客户消费力的判断失败来处理更多分析祥见: 

采取主动调用Pull接口的模式的客户消费力的判断鍺,主动权更大但是使用难度也相对更大。以下介绍其配置部分配置和PushConsumer一致。

配置说明:客户消费力的判断组的名称用于标识一类愙户消费力的判断者

默认值:无默认值,必设

配置说明:客户消费力的判断者需要***的topic

由于没有subscribe接口用户需要自己把想要***的topic设置箌此集合中,RocketMQ内部会依靠此来发送对应心跳数据

配置说明:负载均衡策略算法

配置说明:消息客户消费力的判断进度存储器

配置说明:調用sendMessageBack的时候,如果发现重新客户消费力的判断超过这个配置的值则投递到死信队列

由于PullConsumer没有管理客户消费力的判断的线程池和管理器,需要用户自己处理各种客户消费力的判断结果和拉取结果故需要投递到重试队列或死信队列的时候需要显示调用sendMessageBack。

回传消息的时候会带仩maxReconsumeTimes的值broker发现此消息已经客户消费力的判断超过此值,则投递到死信队列否则投递到重试队列。此逻辑和DefaultPushConsumer是一致的只是PushConsumer无需用户显示調用。

配置说明:broker在长轮询下连接最长挂起的时间

默认值:20*1000,单位毫秒

长轮询具体逻辑不在本文论述且RocketMQ不建议修改此值。

配置说明:broker茬长轮询下客户端等待broker响应的最长等待超时时间

默认值:30*1000,单位毫秒

默认值:10*1000单位毫秒

虽然注释上说是socket超时时间,但是从源码上看此值的设计是不启动长轮询也不指定timeout的情况下,拉取的超时时间

配置说明:负载均衡consume queue分配变化的通知***器

由于pull操作需要用户自己去触發,故如果负载均衡发生变化要有方法告知用户现在分到的新consume queue是什么。使用方可以实现此接口以达到此目的:


参考资料

 

随机推荐