ActiveMQ
高级特性
异步投递
ActiveMQ
支持同步、异步两种发送的模式将消息发送到Broker
,模式的选择对发送延时有巨大的影响。producer
能够达到怎样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。
ActiveMQ
默认使用异步发送的模式,除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。
如果没有使用事务且发送的是持久化消息,每一次发送都是同步发送的且会阻塞producer
直到broker
返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端,带来了很大的延时。
很多高性能的应用,允许在失败的情况下有少量的数据丢失,如果你的应用满足这个这点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。
异步发送可以最大化producer
端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升producer
性能。
不过这也带来了额外的问题,就是需要消耗较多的Client
端内存,同时也会导致broker
端的性能增加。
此外它不能有效的确保消息的发送成功,在useAsyncSend=true
的情况下客户端需要容忍消息丢失的可能。
可以参考官网的三种异步投递配置方式:
异步发送确认机制
异步发送丢失消息的场景是:生产者设置了UseAsyncSend=true
,使用producer.send(msg)
持久发送消息。由于消息不阻塞,生产者会认为所有send
的消息均被成功发送至MQ。
如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。所以正确的异步发送方式是需要接收回调的。
同步发送和异步发送的区别:
- 同步发送等
send
不阻塞了就表示一定发送成功了。 - 异步发送需要接收回执并由客户端再判断一次是否发送成功。
1 | public class JmsProducer { |
延迟投递和定时投递
1 | <!--activemq.xml在boker属性上配置schedulerSupport="true"--> |
Property name | type | description |
---|---|---|
AMQ_SCHEDULED_DELAY | long | 延迟投递的时间 |
AMQ_SCHEDULED_PERIOD | long | 重复投递的时间间隔 |
AMQ_SCHEDULED_REPEAT | int | 重复投递次数 |
AMQ_SCHEDULED_CRON | String | Cron表达式 |
通过在ActiveMQ
的配置文件中开启定时调度SchedulerSupport="true"
,默认为false
。然后使用ScheduleMessage
类进行消息属性配置。
1 | //生产者 |
重试机制
那些情况会引发消息重发?
Client
用了transactions
且在session
中调用了rollback()
Client
用了transactions
且在调用commit()
之前关闭或者没有commit
Client
在CLINET_ACKNOWLEDGE
的传递模式下,在session
中调用了recover()
默认消息重发的时间间隔是每秒钟,重发次数是6次。
有毒消息Poison ACK
:
一个消息被redelivedred
超过默认的最大重发次数(默认6次)时。消费端会给MQ发送一个Poison ack
表示这个消息有毒,告诉broker
不要再发了。这个时候broker
会把这个消息放到DLQ
(死信队列),详情请参照官网。
属性 | 默认值 | 描述 |
---|---|---|
collisionAvoidanceFactor | 0.15 | 设置防止冲突范围的政府百分比,只有启动UseCollisionAvoidance参数时才生效,也就是在延迟时间再加一个 |
maximumRedelivers | 6 | 最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。 |
maximumRedeliveryDelay | -1 | 最大传送延迟,只在UseExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,当重连时间间隔的大于最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 |
initialRedeliveryDelay | 1000L | 初始重发延迟时间 |
redeliveryDelay | 1000L | 重发延迟时间,当initialRedeliveryDelay=0时生效 |
useCollisionAvoidance | false | 启用防止冲装功能 |
useExponentialBackOff | false | 启用指数倍数递增的方式增加延迟时间 |
backOffMultiplier | 5 | 重连时间间隔递增倍数,只有值大于1和启动useExponentialBackOff参数时才生效 |
配置重试次数
1 | //修改最大重试次数 |
整合Spring
1 | <!--定义reDelivery重发机制--> |
死信队列
ActiveMQ
中引入了死信队列(Dead Letter queue
)的概念,即一条消息再被重发了多次后(默认为重发6次redeliveryConter==6
),将会被ActiveMQ
移入私信队列,开发人员可以在这个Queue
中查看处理出错的消息,进行人工干预,主要用来处理失败的消息,详情请查看官网。
- 一般生产环境中在使用MQ的时候设计两个队列:一个是核心业务队列,一个是死信队列。
- 核心业务队列,就是比如上图专门用来让订单系统发送订单消息的,然后另外一个私信队列就是用来处理异常情况的。
假设第三方物流系统故障了,此时无法请求,那么仓储系统每次消费到一条订单消息,尝试通知发货和配送都会遇到对方的接口报错。此时仓储系统就可以把这条消息拒绝访问或者标志位处理失败。一旦表这条消息处理失败后,MQ就会把这条消息转入提前设置好的一个死信队列中。
然后看到的就是,在第三方物流系统故障期间,所有订单消息全部处理失败,全部都会转入私信队列,然后你的仓储系统得专门有一个后台线程,监控第三方系统是否正常。一旦发现对方回复正常,这个后台线程就从私信队列消费处理失败的订单,重新执行发货和配送的通知逻辑。
共享死信队列
SharedDeadLetterStrategy
(共享死信队列),将所有的DeadLetter
保存在一个共享的队列中,这是ActiveMQ broker端默认的策略。
共享队列默认为ActiveMQ.DLQ
,可以通过deadLetterQueue
属性来设定。
1 | <deadLetterStrategy> |
个人死信队列
IndividualDeadLetterStrategy
(个人死信队列),把DeadLetter
放入各自的死信通道中。
- 对于
Queue
而言,死信通道的前缀默认为ActiveMQ.DLQ.Queue.
- 对于
Topic
而言,死信通道的前缀默认为ActiveMQ.DLQ.Topic.
比如队列Order
,那么它对应的死信队列通道为ActiveMQ.DLQ.Queue.Order
,我们使用queuePrefix
、topicPrefix
来指定上述前缀。
默认情况下,无论是Topic
还是Queue
,Broker
将使用Queue
来保存DeadLetter
,即死信通道通常为Queue
,不过开发也可以指定为Topic
。
1 | <policyEntry queue="order"> |
将队列Order
中出现的DeadLetter
保存在DLQ.Order
中,不过此时DLQ.Order
为Topic
。
属性useQueueForQueueMessages
设置使用队列保存死信队列,还可以设置useQueueForTopicMessages
,使用Topic来保存死信队列,默认为true
。
自动删除过期消息
有时需要直接删除过期的消息而不需要发送到死信队列中,processExpired
表示是否将过期消息放入私信队列,默认为true
。
1 | <!--"> "类似SQL的 * --> |
存放非持久消息
默认情况下,ActiveMQ
不会把非持久的死消息发送到死信队列中。processNonPersistent
表示是否将非持久化消息放入死信队列,默认为false
。
如果想把非持久化的消息发送到死信队列中,需要设置属性processNonPersistent="true"
1 | <policyEntry queue="> "> |
重复消费
如何保证消息不被重复消费?幂等性问题?
网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。
如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消息的情况,就会导致主键冲突,避免数据库出现脏数据。
或者准备个第三方来做消息记录,以redis
为例,给消息分配一个全局id
,只要消费过该消息,将<id,message>
以K-V
形式写入redis
,消费者开始消费前,先去redis
中查询有没有消费记录即可。
最后更新: 2020年11月12日 12:20