RocketMQ 4.x

生产者

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
1
2
3
# application.properties
rocketmq.name-server=192.168.1.131:9876
rocketmq.producer.group=my-group
1
2
3
4
5
6
@SpringBootApplication
public class MQProducerApplication {
public static void main(String[] args) {
SpringApplication.run(MQSpringBootApplication.class);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MQSpringBootApplication.class})
public class ProducerTest {

@Autowired
private RocketMQTemplate rocketMQTemplate;

@Test
public void test1(){
rocketMQTemplate.convertAndSend("springboot-mq","hello springboot rocketmq");
}
}

消费者

1
2
3
4
5
6
7
8
9
10
@Slf4j
@Component
@RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "springboot-mq-consumer-1")
public class Consumer implements RocketMQListener<String> {

@Override
public void onMessage(String message) {
log.info("Receive message:"+message);
}
}

事务消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//绑定rocketMQ的bean
@RocketMQTransactionListener(rocketMQTemplateBeanName = "rocketMQTemplate")
public class MyTransactionListener implements RocketMQLocalTransactionListener {

// arg 是调用时传递的参数
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
try {
//业务逻辑
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}

@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}

通过实现RocketMQLocalTransactionListener类,触发调度事务消息发送成半消息时,回调接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Service
public class ProducerServiceImpl{
@Autowired
private RocketMQTemplate rocketMQTemplate;


public void sendMessage(String message) {
Map<String, Object> headers = new HashMap<>();
headers.put("myHeader", "myHeader");

Message<String> message = MessageBuilder.withPayload(message).copyHeaders(headers).build();
rocketMQTemplate.sendMessageInTransaction("myDestnation", message, basic);
}
}

顺序消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Service
public class ProducerServiceImpl{
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送同步顺序消息
*
* @param destination 目的地
* @param payload 消息对象
* @param hashKey 顺序消息相同的key值
*/
public void syncSendOrderly(String destination, Object payload, String hashKey) {
MessageQueueSelector queueSelector = (List<MessageQueue> mqs, Message msg, Object arg) -> {
int queueNum = Integer.valueOf(String.valueOf(arg)) % mqs.size();
return mqs.get(queueNum);
};

rocketMQTemplate.setMessageQueueSelector(queueSelector);
rocketMQTemplate.syncSendOrderly(destination, payload, hashKey);
}
}

参考资料

最后更新: 2020年12月31日 11:48

原始链接: https://midkuro.gitee.io/2020/07/20/rocketmq-springboot/

× 请我吃糖~
打赏二维码