RocketMQ 4.x
生产者
1 2 3 4 5
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency>
|
1 2 3
| rocketmq.name-server=192.168.1.131:9876 rocketmq.producer.group=my-group
|
1 2 3 4 5 6
| @SpringBootApplication public class MQProducerApplication { public static void main(String[] args) { SpringApplication.run(MQSpringBootApplication.class); } }
|
1 2 3 4 5 6 7 8 9 10 11 12
| @RunWith(SpringRunner.class) @SpringBootTest(classes = {MQSpringBootApplication.class}) public class ProducerTest {
@Autowired private RocketMQTemplate rocketMQTemplate;
@Test public void test1(){ rocketMQTemplate.convertAndSend("springboot-mq","hello springboot rocketmq"); } }
|
消费者
1 2 3 4 5 6 7 8 9 10
| @Slf4j @Component @RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "springboot-mq-consumer-1") public class Consumer implements RocketMQListener<String> {
@Override public void onMessage(String message) { log.info("Receive message:"+message); } }
|
事务消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate") public class MyTransactionListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) { try { return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } }
@Override public RocketMQLocalTransactionState checkLocalTransaction(Message message) { return RocketMQLocalTransactionState.ROLLBACK; } }
|
通过实现RocketMQLocalTransactionListener
类,触发调度事务消息发送成半消息时,回调接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Service public class ProducerServiceImpl{ @Autowired private RocketMQTemplate rocketMQTemplate;
public void sendMessage(String message) { Map<String, Object> headers = new HashMap<>(); headers.put("myHeader", "myHeader");
Message<String> message = MessageBuilder.withPayload(message).copyHeaders(headers).build(); rocketMQTemplate.sendMessageInTransaction("myDestnation", message, basic); } }
|
顺序消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Service public class ProducerServiceImpl{ @Autowired private RocketMQTemplate rocketMQTemplate;
public void syncSendOrderly(String destination, Object payload, String hashKey) { MessageQueueSelector queueSelector = (List<MessageQueue> mqs, Message msg, Object arg) -> { int queueNum = Integer.valueOf(String.valueOf(arg)) % mqs.size(); return mqs.get(queueNum); };
rocketMQTemplate.setMessageQueueSelector(queueSelector); rocketMQTemplate.syncSendOrderly(destination, payload, hashKey); } }
|
参考资料