RocketMQ 4.x 1 2 3 4 5 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-client</artifactId > <version > 4.4.0</version > </dependency >
消息发送者步骤分析
1 2 3 4 5 6 1.创建消息生产者producer,并制定生产者组名 2.指定Nameserver地址 3.启动producer 4.创建消息对象,指定主题Topic、Tag和消息体 5.发送消息 6.关闭生产者producer
消息消费者步骤分析
1 2 3 4 5 1.创建消费者Consumer,制定消费者组名 2.指定Nameserver地址 3.订阅主题Topic和Tag 4.设置回调函数,处理消息 5.启动消费者consumer
生产者 发送同步消息 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class SyncProducer { public static void main (String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name" ); producer.setNamesrvAddr("localhost:9876" ); producer.start(); for (int i = 0 ; i < 100 ; i++) { Message msg = new Message("TopicTest" , "TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n" , sendResult); } producer.shutdown(); } }
发送异步消息 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class AsyncProducer { public static void main (String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name" ); producer.setNamesrvAddr("localhost:9876" ); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0 ); for (int i = 0 ; i < 100 ; i++) { final int index = i; Message msg = new Message("TopicTest" , "TagA" , "OrderID188" , "Hello world" .getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess (SendResult sendResult) { System.out.printf("%-10d OK %s %n" , index, sendResult.getMsgId()); } @Override public void onException (Throwable e) { System.out.printf("%-10d Exception %s %n" , index, e); e.printStackTrace(); } }); } producer.shutdown(); } }
发送单向消息 这种方式主要用在不特别关心发送结果的场景,例如日志发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class OnewayProducer { public static void main (String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name" ); producer.setNamesrvAddr("localhost:9876" ); producer.start(); for (int i = 0 ; i < 100 ; i++) { Message msg = new Message("TopicTest" , "TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); producer.sendOneway(msg); } producer.shutdown(); } }
消费者 负载均衡模式 消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static void main (String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1" ); consumer.setNamesrvAddr("localhost:9876" ); consumer.subscribe("Test" , "*" ); consumer.setMessageModel(MessageModel.CLUSTERING); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n" , Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n" ); }
广播模式 消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static void main (String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1" ); consumer.setNamesrvAddr("localhost:9876" ); consumer.subscribe("Test" , "*" ); consumer.setMessageModel(MessageModel.BROADCASTING); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n" , Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n" ); }
顺序消息 RocketMQ
每个主题都会有若干个队列,分布于集群中各个broker
上,分布规律如下:
每个queue
存储了主题Topic
的消息,在默认的情况下,消息发送会采取Round Robin
轮询方式把消息发送到不同的queue
( 分区队列)中,而消费消息的时候,从多个queue
上拉取消息,这种情况发送和消费是不能保证顺序的 ,因为消费者在处于多线程的情况下,无法完全地按照发送消息的queue
顺序消费消息。
举个例子:一个订单的顺序流程是:创建(A)、付款(B)、推送(C)、完成(D)。在业务上,需要保证相同订单号,他们消费消息的顺序严格一致,而在默认的情况下,由于多线程地从不同的queue
中消费消息,顺序就无法与 A、B、C、D保持一致。
消息有序指的是可以按照消息的发送顺序来消费(FIFO
)。RocketMQ
可以严格的保证消息有序,可以分为分区有序或者全局有序。
如何保证消息顺序:
如果控制发送的顺序消息依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。
一个主题中,它只使用一个queue,发送和消费的策略均使用同一个queue,则表示是全局有序。一般情况下,不需要使用全局有序,因为一个主题只使用一个queue的情况下,会严重降低了系统的并发度。
一个主题中,有多个queue参与,同一个业务编号的消息需要被发送到同一个queue中,不同业务编号可以使用不同的queue,则为分区有序,即相对每个queue,消息都是有序的。
如何实现消息顺序:
第一点,消息顺序发送,多线程发送的消息无法保证有序性,因此,需要业务方在发送时,针对同一个业务编号(如同一笔订单)的消息需要保证在一个线程内顺序发送,在上一个消息发送成功后,在进行下一个消息的发送。对应到mq中,消息发送方法就得使用同步发送,异步发送无法保证顺序性
第二点,消息顺序存储,Topic
主题下会存在多个queue
,要保证消息的顺序存储,同一个业务编号的消息需要被发送到一个queue
中。对应到编码中,需要使用MessageQueueSelector
来选择要发送的queue
,即对业务编号进行hash,然后根据队列数量对hash值取余,将消息发送到一个queue中
第三点,消息顺序消费,要保证消息顺序消费,同一个queue
就只能被一个消费者所消费,因此对broker
中消费队列加锁是无法避免的。同一时刻,一个消费队列只能被一个消费者消费,消费者内部,也只能有一个消费线程来消费该队列。即,同一时刻,一个消费队列只能被一个消费者中的一个线程消费。
下面用订单进行分区有序的示例。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
顺序消息生产 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 public class Producer { public static void main (String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name" ); producer.setNamesrvAddr("127.0.0.1:9876" ); producer.start(); String[] tags = new String[]{"TagA" , "TagC" , "TagD" }; List<OrderStep> orderList = new Producer().buildOrders(); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss" ); String dateStr = sdf.format(date); for (int i = 0 ; i < 10 ; i++) { String body = dateStr + " Hello RocketMQ " + orderList.get(i); Message msg = new Message("TopicTest" , tags[i % tags.length], "KEY" + i, body.getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select (List<MessageQueue> mqs, Message msg, Object arg) { Long id = (Long) arg; long index = id % mqs.size(); return mqs.get((int ) index); } }, orderList.get(i).getOrderId()); System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s" , sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), body)); } producer.shutdown(); } private static class OrderStep { private long orderId; private String desc; public long getOrderId () { return orderId; } public void setOrderId (long orderId) { this .orderId = orderId; } public String getDesc () { return desc; } public void setDesc (String desc) { this .desc = desc; } @Override public String toString () { return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}' ; } } private List<OrderStep> buildOrders () { List<OrderStep> orderList = new ArrayList<OrderStep>(); OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L ); orderDemo.setDesc("创建" ); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L ); orderDemo.setDesc("创建" ); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L ); orderDemo.setDesc("付款" ); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L ); orderDemo.setDesc("创建" ); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L ); orderDemo.setDesc("付款" ); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L ); orderDemo.setDesc("付款" ); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111065L ); orderDemo.setDesc("完成" ); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L ); orderDemo.setDesc("推送" ); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103117235L ); orderDemo.setDesc("完成" ); orderList.add(orderDemo); orderDemo = new OrderStep(); orderDemo.setOrderId(15103111039L ); orderDemo.setDesc("完成" ); orderList.add(orderDemo); return orderList; } }
顺序消费消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class ConsumerInOrder { public static void main (String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3" ); consumer.setNamesrvAddr("127.0.0.1:9876" ); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest" , "TagA || TagC || TagD" ); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage (List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true ); for (MessageExt msg : msgs) { System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(random.nextInt(10 )); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started." ); } }
延时消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class ScheduledMessageProducer { public static void main (String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup" ); producer.start(); int totalMessagesToSend = 100 ; for (int i = 0 ; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic" , ("Hello scheduled message " + i).getBytes()); message.setDelayTimeLevel(3 ); producer.send(message); } producer.shutdown(); } }
1 2 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" ;
现在RocketMQ
并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
批量消息 批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK
,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:
1 2 3 4 5 6 7 8 9 10 11 String topic = "BatchTest" ; List<Message> messages = new ArrayList<>(); messages.add(new Message(topic, "TagA" , "OrderID001" , "Hello world 0" .getBytes())); messages.add(new Message(topic, "TagA" , "OrderID002" , "Hello world 1" .getBytes())); messages.add(new Message(topic, "TagA" , "OrderID003" , "Hello world 2" .getBytes())); try { producer.send(messages); } catch (Exception e) { e.printStackTrace(); }
如果消息的总长度可能大于4MB时,这时候最好把消息进行分割
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class ListSplitter implements Iterator <List <Message >> { private final int SIZE_LIMIT = 1024 * 1024 * 4 ; private final List<Message> messages; private int currIndex; public ListSplitter (List<Message> messages) { this .messages = messages; } @Override public boolean hasNext () { return currIndex < messages.size(); } @Override public List<Message> next () { int nextIndex = currIndex; int totalSize = 0 ; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = message.getTopic().length() + message.getBody().length; Map<String, String> properties = message.getProperties(); for (Map.Entry<String, String> entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20 ; if (tmpSize > SIZE_LIMIT) { if (nextIndex - currIndex == 0 ) { nextIndex++; } break ; } if (tmpSize + totalSize > SIZE_LIMIT) { break ; } else { totalSize += tmpSize; } } List<Message> subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; return subList; } } ListSplitter splitter = new ListSplitter(messages); while (splitter.hasNext()) { try { List<Message> listItem = splitter.next(); producer.send(listItem); } catch (Exception e) { e.printStackTrace(); } }
过滤消息 在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:
1 2 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE" ); consumer.subscribe("TOPIC" , "TAGA || TAGB || TAGC" );
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ------------ | message | |----------| a > 5 AND b = 'abc' | a = 10 | --------------------> Gotten | b = 'abc'| | c = true | ------------ ------------ | message | |----------| a > 5 AND b = 'abc' | a = 1 | --------------------> Missed | b = 'abc'| | c = true | ------------
SQL基本语法 RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
数值比较,比如:>,>=,<,<=,BETWEEN,=;
字符比较,比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;
常量支持类型为:
数值,比如:123,3.1415;
字符,比如:‘abc’,必须用单引号包裹起来;
NULL ,特殊的常量
布尔值,TRUE 或 FALSE
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
1 public void subscribe (finalString topic, final MessageSelector messageSelector)
消息生产者 发送消息时,你能通过putUserProperty
来设置消息的属性
1 2 3 4 5 6 7 8 9 10 11 DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name" ); producer.start(); Message msg = new Message("TopicTest" , tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); msg.putUserProperty("a" , String.valueOf(i)); SendResult sendResult = producer.send(msg); producer.shutdown();
消息消费者 用MessageSelector.bySql来使用sql筛选消息
1 2 3 4 5 6 7 8 9 10 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4" ); consumer.subscribe("TopicTest" , MessageSelector.bySql("a between 0 and 3" ); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
事务消息 流程分析
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
事务消息发送及提交 (1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
事务补偿 (1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
事务消息状态 事务消息共有三种状态,提交状态、回滚状态、中间状态:
TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
发送事务消息 创建事务性生产者 使用 TransactionMQProducer
类创建生产者,并指定唯一的 ProducerGroup
,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class Producer { public static void main (String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("group6" ); producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876" ); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[]{"TagA" , "TagB" , "TagC" }; for (int i = 0 ; i < 3 ; i++) { try { Message msg = new Message("TransactionTopic" , tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null ); System.out.printf("%s%n" , sendResult); TimeUnit.SECONDS.sleep(1 ); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } } }
实现事务的监听接口 当发送半消息成功时,我们使用 executeLocalTransaction
方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTranscation
方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class TransactionListenerImpl implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction (Message msg, Object arg) { System.out.println("执行本地事务" ); if (StringUtils.equals("TagA" , msg.getTags())) { return LocalTransactionState.COMMIT_MESSAGE; } else if (StringUtils.equals("TagB" , msg.getTags())) { return LocalTransactionState.ROLLBACK_MESSAGE; } else { return LocalTransactionState.UNKNOW; } } @Override public LocalTransactionState checkLocalTransaction (MessageExt msg) { System.out.println("MQ检查消息Tag【" +msg.getTags()+"】的本地事务执行结果" ); return LocalTransactionState.COMMIT_MESSAGE; } }
使用限制
事务消息不支持延时消息和批量消息。
为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax
参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax
) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener
类来修改这个行为。
事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout
参数。
事务性消息可能不止一次被检查或消费。
提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
参考资料