ActiveMQ

消息队列的产品种类

activemq-1

常见的四大消息队列,其中RabbitMQ是使用erlang语言编写的,其他三个是Java语言编写的,kafka在大数据场景下比较常用,ActiveMQApache公司研发的消息队列,而RocketMQ是阿里基于ActiveMQkafka的研发的。

activemq-2

消息队列种类繁多,但是从技术的维度来讲,每个消息队列应该都具备以上各种机制,本篇文章主要针对ActiveMQ进行讲解,举一反三。

为什么要引入MQ

在没引入MQ之前,举个生活场景的例子,学生排队向老师请教问题,每个学生需要耗费5分钟的时间,这样导致学生将会被占用很长的时间,如下图:

activemq-3

在引入MQ后,可以将学生的问题以某种约定约束的格式进行收集,收集到指定的问题库中,当老师空闲出时间时将会去回答学生的问题,如下图:

activemq-4

在这种场景下,每个学生都只需要提交问题到问题库,然后可以做自己想做的事情,而不需要在原地等待老师解决完问题再离开,实现了微服务的异步通信。

微服务架构后,链式调用是我们写程序的一般流程,为了完成一个整体功能会将其拆分成多个函数(子模块),比如模块A调用模块B,模块B调用模块C,模块C调用模块D。

但是在大型分布式应用中,一个功能别后要调用许多接口,从单机架构过渡到分布式微服务架构的时候,会产生哪些问题?

  • 系统之间接口耦合比较严重
  • 面对大流量并发时,容易被冲垮
  • 等待同步存在性能问题

根据上述的问题,在设计系统时可以明确要达到的目标:

  • 要做到系统解耦,当新模块接进来时,可以做到代码改动最小:能够解耦
  • 设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮:能够削峰
  • 强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力:能够异步

MQ的作用定义

面向消息的中间件(message-orented middleware)MOM能够很好的解决以上的问题。

是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

通过提供消息传递消息排队模型在分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等功能。

大致的过程是这样的:

发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题中,在合适的时候,消息服务器会将消息转发给接受者。

在这个过程中,发布和接受是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然关系;

尤其在发布pub/订阅sub模式下,也可以完成一对多的通信,即让一个消息有多个接受者。

activemq-5

ActiveMQ

基础

ActiveMQ官网 ActiveMQ下载 通过解压下载包,进入bin目录执行activemq即可启动

  • 默认进程端口是61616
  • WEB网页地址:http://IP:8161
  • 默认用户名/密码:admin/admin

备注:ActiveMQ采用61616端口提供JMS服务,采用8161端口提供管理控制台服务

MQ标准API讲解

1
2
3
4
5
6
7
8
9
10
11
<!-- activeMQ所需要的jar包配置-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>

activemq-6

通过连接工厂ConnectionFactory创建连接,获取Session会话,Session会话按照约定格式创建消息Message、生产者Producer、消费者Consumer, 通过将消息发送给目的地Destination(队列Queue/主题Topic),生产者消费者通过目的地 生产 / 消费 消息。

在点对点的消息传递域中,目的地被称为队列(Queue)

在发布订阅消息作用域中,目的地被称为主题(Topic)

创建连接工厂

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//1.ActiveMQConnectionFactory的构造器
public ActiveMQConnectionFactory() {
this(DEFAULT_BROKER_URL);
}

public ActiveMQConnectionFactory(String brokerURL) {
this(createURI(brokerURL));
}

public ActiveMQConnectionFactory(URI brokerURL) {
setBrokerURL(brokerURL.toString());
}

public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) {
setUserName(userName);
setPassword(password);
setBrokerURL(brokerURL.toString());
}

public ActiveMQConnectionFactory(String userName, String password, String brokerURL) {
setUserName(userName);
setPassword(password);
setBrokerURL(brokerURL);
}

可以看到,创建连接工厂的构造器支持传参ActiveMQ的URL,当不传参用户名和密码时,将采用默认admin/admin,其中当使用无参的构造器时,会默认使用DEFAULT_BROKER_URL常量中的URL当做链接串。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;
public static final String DEFAULT_BROKER_BIND_URL;

static{
//可以看到,默认的URL是以TCP协议开头
final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT;
//用户配置的MQ服务器地址
String bindURL = null;
/*
省略部分代码
*/
//当不存在用户配置的地址时,使用默认地址defaultURL
bindURL = (bindURL == null || bindURL.isEmpty()) ? defaultURL : bindURL;
DEFAULT_BROKER_BIND_URL = bindURL;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static final String DEFAULT_BROKER_HOST;
private static final int DEFAULT_BROKER_PORT;
static{
String host = null;
String port = null;
/*
省略部分代码
*/
//当找不到用户配置的IP、端口时,使用默认的localhost和61616
host = (host == null || host.isEmpty()) ? "localhost" : host;
port = (port == null || port.isEmpty()) ? "61616" : port;
DEFAULT_BROKER_HOST = host;
DEFAULT_BROKER_PORT = Integer.parseInt(port);
}

生产者编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class JmsProducer {
//ActiveMQ服务器的链接地址
private static final String ACTIVE_URL = "tcp://192.168.1.132:61616";
//队列名称
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1.创建连接工厂,按照给定的URL地址,采用默认的用户名和密码
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVE_URL);
//2.通过连接工厂,获得连接Connection并访问
Connection connection = factory.createConnection();
connection.start();
//3.创建会话session
//两个参数,第一个叫事务,第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4.创建目的地(具体是队列还是主题)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消息的生产者
MessageProducer producer = session.createProducer(queue);
//6.使用MessageProducer生产3条消息发送到MQ队列里面
for (int i = 1; i <= 3; i++) {
//7.创建消息
TextMessage message = session.createTextMessage("message---" + i);
//8.通过MessageProducer发送给MQ
producer.send(message);
}
//9.关闭资源
producer.close();
session.close();
connection.close();
System.out.println("生产者发送消息队列queue01完毕");
}
}

通过访问http://192.168.1.132:8161,登录后能够看到产生了3条消息

activemq-7

消费者编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class JmsConsumer {
private static final String ACTIVE_URL = "tcp://192.168.1.132:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVE_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
//创建消费者
MessageConsumer consumer = session.createConsumer(queue);
//循环监听
while (true) {
//发送的什么消息类型,就需要使用什么消息类型接收
//接收消息等待四秒,当四秒后依旧没消息返回则不等待
TextMessage message = (TextMessage) consumer.receive(4000L);
if (message != null) {
System.out.println(message.getText());
} else {
break;
}
}
consumer.close();
session.close();
connection.close();
System.out.println("消费者消费消息队列query01完毕");
}
}

activemq-8

当执行消费者之后,被消费的消息Messages Dequeued为3。消息被消费,然后等待四秒后消费者退出循环,所以待消费者数量Number Of Consumers为0,没有消费者继续等待消费消息。

1
2
3
4
5
//消费者接收消息的方式
//1.阻塞式接收消息,当无消息则一直等待直到有消息返回
Message receive() throws JMSException;
//2.设置等待时长,当超过该时长则不等待,直接返回
Message receive(long timeout) throws JMSException;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//通过使用监听的方式消费消息,用于替换上述while循环
//异步非阻塞方式(监听器onMessage())
//订阅者或接受者通过MessageConsumer的setMessage(MessageListener)注册一个消息监听器
//当消息到达之后,系统自动调用监听器的onMessage(Message message)方法
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});

//或者使用lambda表达式
consumer.setMessageListener((message) -> {
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});

JMS开发步骤

  1. 创建一个ConnectionFactory工厂
  2. 通过ConnectionFactory来创建JMS Connection
  3. 启动JMS Connection
  4. 通过Connection创建JMS Session
  5. 创建JMS Destination
  6. 创建JMS Producer或者创建JMS Message并设置Destination
  7. 创建JMS Consumer或者注册JMS Message Listener
  8. 发送或者接受JMS message(s)
  9. 关闭所有JMS资源(Connection、Session、Producer、Consumer

队列

在点对点的消息传递域中,目的地被称为队列(queue

每个消息只能有一个消费者,类似1对1的关系。好比个人快递自己领取自己的。

消息的生产者和消费者之间没有时间上的相关性、无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。好比我们的发送短信,发送者发送后不见得接受者会即收即看。

当有多个消费者时,将采取类似负载均衡的策略,将消息均分到各个消费者中

消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息

activemq-9

主题

在发布订阅消息传递域中,目的地被称为主题(topic

生产者将消息发布到topic中,每个消息可以有多个消费者,属于 1:N的关系

生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息

生产者生产时,topic不保存消息,它是无状态的不落地,假设无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者

JMS规范允许客户创建持久订阅,在这一程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息,好比如微信公众号订阅

activemq-10

1
2
3
//与队列不同的编码在于创建目的地
//Queue queue = session.createQueue(QUEUE_NAME);
Topic topic = session.createTopic(TOPIC_NAME);

最后更新: 2021年01月20日 22:05

原始链接: https://midkuro.gitee.io/2020/05/23/activemq-mq/

× 请我吃糖~
打赏二维码