ActiveMQ

高级特性

异步投递

ActiveMQ支持同步、异步两种发送的模式将消息发送到Broker,模式的选择对发送延时有巨大的影响。producer能够达到怎样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。

ActiveMQ默认使用异步发送的模式,除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。

如果没有使用事务且发送的是持久化消息,每一次发送都是同步发送的且会阻塞producer直到broker返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端,带来了很大的延时。

很多高性能的应用,允许在失败的情况下有少量的数据丢失,如果你的应用满足这个这点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。

异步发送可以最大化producer端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升producer性能。

不过这也带来了额外的问题,就是需要消耗较多的Client端内存,同时也会导致broker端的性能增加。

此外它不能有效的确保消息的发送成功,在useAsyncSend=true的情况下客户端需要容忍消息丢失的可能。

可以参考官网的三种异步投递配置方式:

activemq-23

异步发送确认机制

异步发送丢失消息的场景是:生产者设置了UseAsyncSend=true,使用producer.send(msg)持久发送消息。由于消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。

如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失所以正确的异步发送方式是需要接收回调的

同步发送和异步发送的区别:

  • 同步发送等send不阻塞了就表示一定发送成功了。
  • 异步发送需要接收回执并由客户端再判断一次是否发送成功。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class JmsProducer {
//ActiveMQ服务器的链接地址
private static final String ACTIVE_URL = "tcp://192.168.1.132:61616";
//队列名称
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVE_URL);
//设置异步投递
factory.setUseAsyncSend(true);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
//注意:强转成ActiveMQMessageProducer类
ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(queue);
for (int i = 1; i <= 3; i++) {
TextMessage message = session.createTextMessage("message---" + i);
message.setJMSMessageID(UUID.randomUUID.toString());
String messageID = message.getJMSMessageID();
//使用具备回调函数的API发送消息
producer.send(message, new AsyncCallback() {
@Override
public void onSuccess() {
System.out.println(messageID + "has been send");
}
@Override
public void onException(JMSException exception) {
System.out.println(messageID + "fail to send");
}
});
}
producer.close();
session.close();
connection.close();
System.out.println("生产者发送消息队列queue01完毕");
}
}

延迟投递和定时投递

1
2
<!--activemq.xml在boker属性上配置schedulerSupport="true"-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
Property name type description
AMQ_SCHEDULED_DELAY long 延迟投递的时间
AMQ_SCHEDULED_PERIOD long 重复投递的时间间隔
AMQ_SCHEDULED_REPEAT int 重复投递次数
AMQ_SCHEDULED_CRON String Cron表达式

通过在ActiveMQ的配置文件中开启定时调度SchedulerSupport="true",默认为false。然后使用ScheduleMessage类进行消息属性配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//生产者
public class JmsProducer {
private static final String ACTIVE_URL = "tcp://192.168.1.132:61616";
private static final String QUEUE_NAME = "queue-delay";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
//设置延迟投递时间
long delay = 3 * 1000;
//设置重复投递的时间间隔
long period = 4 * 1000;
//重复投递次数
int repeat = 5;
for (int i = 1; i <= 3; i++) {
TextMessage message = session.createTextMessage("delay-message---" + i);
message.setLongProperty(ScheduleMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduleMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduleMessage.AMQ_SCHEDULED_REPEAT, repeat);
producer.send(message);
}
producer.close();
session.close();
connection.close();
}
}

重试机制

那些情况会引发消息重发?

  • Client用了transactions且在session中调用了rollback()
  • Client用了transactions且在调用commit()之前关闭或者没有commit
  • ClientCLINET_ACKNOWLEDGE的传递模式下,在session中调用了recover()

默认消息重发的时间间隔是每秒钟,重发次数是6次

有毒消息Poison ACK

一个消息被redelivedred超过默认的最大重发次数(默认6次)时。消费端会给MQ发送一个Poison ack表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列),详情请参照官网

activemq-24

属性 默认值 描述
collisionAvoidanceFactor 0.15 设置防止冲突范围的政府百分比,只有启动UseCollisionAvoidance参数时才生效,也就是在延迟时间再加一个
maximumRedelivers 6 最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。
maximumRedeliveryDelay -1 最大传送延迟,只在UseExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,当重连时间间隔的大于最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。
initialRedeliveryDelay 1000L 初始重发延迟时间
redeliveryDelay 1000L 重发延迟时间,当initialRedeliveryDelay=0时生效
useCollisionAvoidance false 启用防止冲装功能
useExponentialBackOff false 启用指数倍数递增的方式增加延迟时间
backOffMultiplier 5 重连时间间隔递增倍数,只有值大于1和启动useExponentialBackOff参数时才生效

配置重试次数

1
2
3
4
5
6
7
8
9
10
11
12
13
//修改最大重试次数
public class JmsConsumer_Redelivery {
private static final String ACTIVE_URL = "tcp://192.168.1.132:61616";
private static final String QUEUE_NAME = "queue-redelivery";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVE_URL);
//使用自身的配置类,设置重试次数3次
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
//省略后续代码...
}
}

整合Spring

1
2
3
4
5
6
7
8
9
<!--定义reDelivery重发机制-->
<bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<property name="maximumRedeliveries" value="3"></property>
</bean>
<!--创建连接工厂并指定配置-->
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.1.132:61616"></property>
<property name="redeliveryPolicy" ref="activeMQRedelivery" />
</bean>

死信队列

ActiveMQ中引入了死信队列Dead Letter queue)的概念,即一条消息再被重发了多次后(默认为重发6次redeliveryConter==6),将会被ActiveMQ移入私信队列,开发人员可以在这个Queue中查看处理出错的消息,进行人工干预,主要用来处理失败的消息,详情请查看官网

activemq-25

activemq-26

  • 一般生产环境中在使用MQ的时候设计两个队列:一个是核心业务队列,一个是死信队列
  • 核心业务队列,就是比如上图专门用来让订单系统发送订单消息的,然后另外一个私信队列就是用来处理异常情况的。

假设第三方物流系统故障了,此时无法请求,那么仓储系统每次消费到一条订单消息,尝试通知发货和配送都会遇到对方的接口报错。此时仓储系统就可以把这条消息拒绝访问或者标志位处理失败。一旦表这条消息处理失败后,MQ就会把这条消息转入提前设置好的一个死信队列中。

然后看到的就是,在第三方物流系统故障期间,所有订单消息全部处理失败,全部都会转入私信队列,然后你的仓储系统得专门有一个后台线程,监控第三方系统是否正常。一旦发现对方回复正常,这个后台线程就从私信队列消费处理失败的订单,重新执行发货和配送的通知逻辑。

共享死信队列

SharedDeadLetterStrategy(共享死信队列),将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ broker端默认的策略。

共享队列默认为ActiveMQ.DLQ,可以通过deadLetterQueue属性来设定。

1
2
3
<deadLetterStrategy>
<sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE" />
</deadLetterStrategy>

个人死信队列

IndividualDeadLetterStrategy(个人死信队列),把DeadLetter放入各自的死信通道中。

  • 对于Queue而言,死信通道的前缀默认为ActiveMQ.DLQ.Queue.
  • 对于Topic而言,死信通道的前缀默认为ActiveMQ.DLQ.Topic.

比如队列Order,那么它对应的死信队列通道为ActiveMQ.DLQ.Queue.Order,我们使用queuePrefixtopicPrefix来指定上述前缀。

默认情况下,无论是Topic还是QueueBroker将使用Queue来保存DeadLetter,即死信通道通常为Queue,不过开发也可以指定为Topic

1
2
3
4
5
<policyEntry queue="order">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="false" />
</deadLetterStrategy>
</policyEntry>

将队列Order中出现的DeadLetter保存在DLQ.Order中,不过此时DLQ.OrderTopic

属性useQueueForQueueMessages设置使用队列保存死信队列,还可以设置useQueueForTopicMessages,使用Topic来保存死信队列,默认为true

自动删除过期消息

有时需要直接删除过期的消息而不需要发送到死信队列中,processExpired表示是否将过期消息放入私信队列,默认为true

1
2
3
4
5
6
<!--"> "类似SQL的 * -->
<policyEntry queue="> " >
<deadLetterStrategy>
<sharedDeadLetterStragegy processExpired="false" />
</deadLetterStrategy>
</policyEntry>

存放非持久消息

默认情况下,ActiveMQ不会把非持久的死消息发送到死信队列中。processNonPersistent表示是否将非持久化消息放入死信队列,默认为false

如果想把非持久化的消息发送到死信队列中,需要设置属性processNonPersistent="true"

1
2
3
4
5
<policyEntry queue="> ">
<deadLetterStrategy>
<sharedDeadLetterStrategy processNonPersistent="true" />
</deadLetterStrategy>
</policyEntry>

重复消费

如何保证消息不被重复消费?幂等性问题?

​ 网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。

如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消息的情况,就会导致主键冲突,避免数据库出现脏数据。

或者准备个第三方来做消息记录,以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>K-V形式写入redis,消费者开始消费前,先去redis中查询有没有消费记录即可。

最后更新: 2020年11月12日 12:20

原始链接: https://midkuro.gitee.io/2020/05/23/activemq-feature/

× 请我吃糖~
打赏二维码