ActiveMQ

JMS

JavaEE是一套使用JAVA进行企业级Web应用开发的大家一致遵循的工业标准。JavaEE平台提供了一个基于组件的方法来加快设计、开发、装配及部署企业应用程序。

JavaEE的13种核心技术规范:

  1. JDBC(Java Database)数据库连接
  2. JNDI(Java Naming and Directory Interfaces)Java 的命名和目录接口
  3. EJB(Enterprise JavaBean)
  4. RMI(Remote Method Invoke)远程方法调用
  5. Java IDL(Interface Description Language)/CORBA(Common Object Broker Architecture)Java 接口定义语言/公用对象请求代理程序体系结构
  6. JSP(Java Server Pages)
  7. Servlet
  8. XML(Extensible Markup Language)可扩展标记语言
  9. JMS(Java Message Service)Java 消息服务
  10. JTA(Java Transaction API)Java 事务 API
  11. JTS(Java Transaction Service)Java 事务服务
  12. JavaMail
  13. JAF(JavaBean Activation Framework)

JMS全称是Java Message Service(Java消息服务是JAVA EE的一门技术)。JMS的客户端之间可以通过JMS服务进行异步的消息传输。JMS用于和面向消息的中间件相互通信的应用程序接口(API)。它既支持点对点的域,有支持发布/订阅(publish/subscribe)类型的域,并且提供对下列类型的支持:

  • 经认可的消息传递
  • 事务型消息的传递
  • 一致性消息和具有持久性的订阅者支持。

JMS消息系统带来的好处:

  1. 提供消息灵活性;
  2. 松散耦合
  3. 异步性。

JMS消息系统带来的好处:1、提供消息灵活性;2、松散耦合;3、异步性。

activemq-11

消息队列的比较

特性 ActiveMQ RabbitMQ Kafka RocketMQ
PRODUCER-COMSUMER 支持 支持 支持 支持
PUBLISH-SUBSCRIBE 支持 支持 支持 支持
REQUEST-REPLY 支持 支持 - 支持
API完备性 低(静态配置)
多语言支持 支持,JAVA优先 语言无关 支持,JAVA优先 支持
单机呑吐量 万级 万级 十万级 单机万级
消息延迟 - 微秒级 毫秒级 -
可用性 高(主从) 高(主从) 非常高(分布式)
消息丢失 - 理论上不会丢失 -
消息重复 - 可控制 理论上会有重复 -
文档的完备性
提供快速入门
首次部署难度 -

JMS的组成结构和特点

  1. JMS provider:实现JMS接口和规范的消息中间件
  2. JMS producer:消息生产者,创建和发送JMS消息的客户端应用
  3. JMS consumer:消息消费者,接受和处理JMS消息的客户端应用
  4. JMS message :消息,包括消息头、消息体、消息属性

消息头

  1. JMSDestination:消息发送的目的地,主要指QueueTopic

  2. JMSDeliveryMode:非持久和持久模式

    一条持久性消息:应该被传送一次仅仅一次,意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。

    一条非持久的消息:最多传送一次,这意味着服务器出现故障,该消息将永远丢失。

  3. JMSExpiration:消息过期时间,默认永不过期

    消息过期时间等于Destinationsend方法中的timeToLive值加上发送时刻的GMT时间值。

    如果消息TimeToLive值等于零,则JMSExpiration被设置为零,表示该消息永不过期。

    如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。

  4. JMSPriority:消息优先级

    从0-9 十个级别,0到4是普通消息,5到9是加急消息。

    JMS不要求MQ严格按照这是个优先级发送消息,但必须保证加急消息要先于普通消息到达,默认是4级

  5. JMSMessageID:唯一识别每个消息的标识,由MQ产生。

消息体

  1. TextMessage:普通字符串消息,包含一个String
  2. MapMessage:一个Map类型的消息,KeyString类型,而值为Java的基本类型
  3. BytesMessage:二进制数组消息,包含一个byte[]
  4. StreamMessage:Java数据流,用标准流操作来顺序的填充和读取
  5. ObjectMessage:对象消息,包含一个可序列化的Java对象
1
2
3
//例子
MapMessage mapMessage = session.createMapMessage();
mapMessage.setBoolean("key", false);

发送和接受的消息体类型必须一致对应。

消息属性

如果需要除消息头字段以外的值,那么可以使用消息属性,主要用于识别/去重/重点标注消息

他们是以属性名和属性值对的形式制定的。可以将属性,可以看做是消息头的扩展,属性指定一些消息头没有包括的附加消息。比如可以在属性里指定消息选择器。

1
2
3
4
TextMessage message = session.createTextMessage();
message.setText(text);
//自定义属性
message.setStringProperty("userName","张三");

JMS的可靠性

持久性

Persistent持久性参数说明:

  • 非持久:当服务器宕机,消息不存在

    1
    2
    MessageProducer producer = session.createProducer(queue);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  • 持久:当服务器宕机,消息依然存在

    1
    2
    MessageProducer producer = session.createProducer(queue);
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);

当不配置持久化时,队列默认的消息默认是持久化消息,此模式保证这些消息只被传送一次和成功使用一次。对这些消息,可靠性是优先考虑因素。

可靠性的另一个重要方面是确保持久化消息传递至目标后,消息服务在向消费者传送他们之前不会丢失这些消息。

持久化Topic-订阅主题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//生产者
public class JmsTopicProducer {
private static final String ACTIVE_URL = "tcp://192.168.1.132:61616";
private static final String TOPIC_NAME = "topic-Persist";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
//设置生产者的持久化消息
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//设置完了持久化配置,然后再启动连接
connection.start();
for (int i = 1; i <= 3; i++) {
TextMessage message = session.createTextMessage("持久化消息:message---" + i);
producer.send(message);
}
producer.close();
session.close();
connection.close();
System.out.println("生产者发送消息队列topic完毕");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//订阅者
public class JmsTopicConsumer {
private static final String ACTIVE_URL = "tcp://192.168.1.132:61616";
private static final String TOPIC_NAME = "topic-Persist";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = factory.createConnection();
//设置订阅者的ID
connection.setClientID("zhangsan");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
//创建持久化的订阅者
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "remark...");
//设置完毕后再启动start
connection.start();
//监听主题
subscriber.setMessageListener((message) -> {
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("收到持久化订阅消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
// 控制台不灭
System.in.read();
subscriber.close();
session.close();
connection.close();
System.out.println("订阅消费完毕");
}
}

需要先启动订阅者,再启动生产者,订阅主题需要为每个订阅者设置ClientID并创建订阅者,这时候生产消息时可以看到,订阅者处于激活状态并接收并消费了三条消息。

activemq-12

这时候把订阅者后台关闭,订阅者从激活状态变为离线状态,可以看到如下图:

activemq-13

当订阅者重新订阅时,会从离线状态重新变为激活状态。

切记:一定要先运行一次消费者,等于向MQ注册,类似于我订阅了这个主题,然后再运行生产者发送信息,此时无论消费者是否在线,都会接收到,不在线的话,下次链接时,会把没有收过的消息都接收下来

事务

Transaction事务偏生产者,签收偏消费者

1
2
3
4
5
6
//第一个参数是事务开关,第二个是签收参数
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//提交事务
session.commit();
//回滚事务
session.rollback();
  • false:只要执行send,就会进入到队列中。关闭事务,那第二个签收参数的设置需要有效
  • true:先执行send再执行commit,消息才被真正的提交到队列中,消息需要批量发送,需要缓冲区处理

生产者/消费者设置了事务开关等于true时,只有执行了session.commit();消息才会被生产/消费,所以事务的级别是比签收的级别高的。

签收

1
2
3
4
5
6
7
8
9
//Acknowledge签收方式
//1.自动签收--常用
Session.AUTO_ACKNOWLEDGE;
//2.手动签收--常用
Session.CLIENT_ACKNOWLEDGE;
//3.可允许重复的签收
Session.UPS_OK_ACKNOWLEDGE;
//4.和事务组合的签收
Session.SESSION_TRANSACTED;

非事务的情况下,默认行为是自动签收,当使用手动签收时,客户端需要调用message.acknowledge()方法手动签收。

在事务的情况下,只有commit后才能将全部消息变为已消费。在事务性会话中,当一个事务被成功提交则消息被自动签收,如果事务回滚,则消息会再次被传送

非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode

JMS的点对点总结

点对点模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使消息的异步传输称为可能。

如果在Session关闭是有部分消息已被收到但还没有被签收(acknowledged),那么当消费者下次连接到相同的队列时,这些消息还会被再次接收。

队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢西而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势

JMS的发布订阅总结

JMS Pub/Sub模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic

主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。

非持久订阅

主题使得消息订阅和消息发布者保持互相独立,不需要接触 即可保证消息的传送。

非持久订阅只有当客户处于激活状态,也就是和MQ保持连接状态才能收到发送到某个主题的消息。

如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。

持久订阅

客户端首先向MQ注册一个自己的身份ID识别号,当这个客户端处于离线时,生产者会为这个ID保存所有发送到主题的消息,当客户端再次连接到MQ时会根据消费者的ID得到所有当自己处于离线时发送到主题的消息。

非持久订阅状态下,不能恢复或重新派送一个未签收的消息。持久订阅才能恢复或重新派送一个未签收的消息。

Broker

MQ消息服务器实例被称作Broker,作为server提供消息核心服务。

在linux环境下通过指定不同的配置文件启动多个MQ服务器实例的命令如下

1
./activemq start xbean:file/apache-active-5.15.9/conf/my-activemq.xml

嵌入式Broker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!-- pom.xml增加依赖-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
1
2
3
4
5
6
7
8
9
public class EmbedBroker {
public static void main(String[] args) throws Exception{
//ActiveMQ也支持在vm中通信基于嵌入式的broker
BrokerService brokerService = newBrokerService();
brokerService.setUseJmx(true);
brokerService.addConnector("tcp://localhost:61616");
brokerService.start();
}
}

最后更新: 2021年01月20日 20:39

原始链接: https://midkuro.gitee.io/2020/05/23/activemq-jms/

× 请我吃糖~
打赏二维码