ActiveMQ
消息队列的产品种类
常见的四大消息队列,其中RabbitMQ
是使用erlang
语言编写的,其他三个是Java
语言编写的,kafka
在大数据场景下比较常用,ActiveMQ
是Apache
公司研发的消息队列,而RocketMQ
是阿里基于ActiveMQ
和kafka
的研发的。
消息队列种类繁多,但是从技术的维度来讲,每个消息队列应该都具备以上各种机制,本篇文章主要针对ActiveMQ
进行讲解,举一反三。
为什么要引入MQ
在没引入MQ之前,举个生活场景的例子,学生排队向老师请教问题,每个学生需要耗费5分钟的时间,这样导致学生将会被占用很长的时间,如下图:
在引入MQ后,可以将学生的问题以某种约定约束的格式进行收集,收集到指定的问题库中,当老师空闲出时间时将会去回答学生的问题,如下图:
在这种场景下,每个学生都只需要提交问题到问题库,然后可以做自己想做的事情,而不需要在原地等待老师解决完问题再离开,实现了微服务的异步通信。
微服务架构后,链式调用是我们写程序的一般流程,为了完成一个整体功能会将其拆分成多个函数(子模块),比如模块A调用模块B,模块B调用模块C,模块C调用模块D。
但是在大型分布式应用中,一个功能别后要调用许多接口,从单机架构过渡到分布式微服务架构的时候,会产生哪些问题?
- 系统之间接口耦合比较严重
- 面对大流量并发时,容易被冲垮
- 等待同步存在性能问题
根据上述的问题,在设计系统时可以明确要达到的目标:
- 要做到系统解耦,当新模块接进来时,可以做到代码改动最小:能够解耦
- 设置流量缓冲池,可以让后端系统按照自身吞吐能力进行消费,不被冲垮:能够削峰
- 强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力:能够异步
MQ的作用定义
面向消息的中间件(message-orented middleware
)MOM能够很好的解决以上的问题。
是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
通过提供消息传递和消息排队模型在分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等功能。
大致的过程是这样的:
发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题中,在合适的时候,消息服务器会将消息转发给接受者。
在这个过程中,发布和接受是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然关系;
尤其在发布pub
/订阅sub
模式下,也可以完成一对多的通信,即让一个消息有多个接受者。
ActiveMQ
基础
ActiveMQ官网 ActiveMQ下载 通过解压下载包,进入bin
目录执行activemq
即可启动
- 默认进程端口是61616;
- WEB网页地址:
http://IP:8161
- 默认用户名/密码:
admin/admin
备注:ActiveMQ
采用61616端口提供JMS服务,采用8161端口提供管理控制台服务
MQ标准API讲解
1 2 3 4 5 6 7 8 9 10 11
| <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> </dependency> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version> </dependency>
|
通过连接工厂ConnectionFactory
创建连接,获取Session
会话,Session
会话按照约定格式创建消息Message
、生产者Producer
、消费者Consumer
, 通过将消息发送给目的地Destination
(队列Queue
/主题Topic
),生产者消费者通过目的地 生产 / 消费 消息。
在点对点的消息传递域中,目的地被称为队列(Queue
)
在发布订阅消息作用域中,目的地被称为主题(Topic
)
创建连接工厂
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public ActiveMQConnectionFactory() { this(DEFAULT_BROKER_URL); }
public ActiveMQConnectionFactory(String brokerURL) { this(createURI(brokerURL)); }
public ActiveMQConnectionFactory(URI brokerURL) { setBrokerURL(brokerURL.toString()); }
public ActiveMQConnectionFactory(String userName, String password, URI brokerURL) { setUserName(userName); setPassword(password); setBrokerURL(brokerURL.toString()); }
public ActiveMQConnectionFactory(String userName, String password, String brokerURL) { setUserName(userName); setPassword(password); setBrokerURL(brokerURL); }
|
可以看到,创建连接工厂的构造器支持传参ActiveMQ的URL,当不传参用户名和密码时,将采用默认admin/admin
,其中当使用无参的构造器时,会默认使用DEFAULT_BROKER_URL
常量中的URL当做链接串。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL; public static final String DEFAULT_BROKER_BIND_URL;
static{ final String defaultURL = "tcp://" + DEFAULT_BROKER_HOST + ":" + DEFAULT_BROKER_PORT; String bindURL = null;
bindURL = (bindURL == null || bindURL.isEmpty()) ? defaultURL : bindURL; DEFAULT_BROKER_BIND_URL = bindURL; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| private static final String DEFAULT_BROKER_HOST; private static final int DEFAULT_BROKER_PORT; static{ String host = null; String port = null;
host = (host == null || host.isEmpty()) ? "localhost" : host; port = (port == null || port.isEmpty()) ? "61616" : port; DEFAULT_BROKER_HOST = host; DEFAULT_BROKER_PORT = Integer.parseInt(port); }
|
生产者编码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class JmsProducer { private static final String ACTIVE_URL = "tcp://192.168.1.132:61616"; private static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVE_URL); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); MessageProducer producer = session.createProducer(queue); for (int i = 1; i <= 3; i++) { TextMessage message = session.createTextMessage("message---" + i); producer.send(message); } producer.close(); session.close(); connection.close(); System.out.println("生产者发送消息队列queue01完毕"); } }
|
通过访问http://192.168.1.132:8161
,登录后能够看到产生了3条消息
消费者编码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class JmsConsumer { private static final String ACTIVE_URL = "tcp://192.168.1.132:61616"; private static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVE_URL); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(QUEUE_NAME); MessageConsumer consumer = session.createConsumer(queue); while (true) { TextMessage message = (TextMessage) consumer.receive(4000L); if (message != null) { System.out.println(message.getText()); } else { break; } } consumer.close(); session.close(); connection.close(); System.out.println("消费者消费消息队列query01完毕"); } }
|
当执行消费者之后,被消费的消息Messages Dequeued
为3。消息被消费,然后等待四秒后消费者退出循环,所以待消费者数量Number Of Consumers
为0,没有消费者继续等待消费消息。
1 2 3 4 5
|
Message receive() throws JMSException;
Message receive(long timeout) throws JMSException;
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if (message != null && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("消费消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } });
consumer.setMessageListener((message) -> { if (message != null && message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { System.out.println("消费消息:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } });
|
JMS开发步骤
- 创建一个
ConnectionFactory
工厂
- 通过
ConnectionFactory
来创建JMS Connection
- 启动
JMS Connection
- 通过
Connection
创建JMS Session
- 创建
JMS Destination
- 创建
JMS Producer
或者创建JMS Message
并设置Destination
- 创建
JMS Consumer
或者注册JMS Message Listener
- 发送或者接受
JMS message(s)
- 关闭所有
JMS
资源(Connection、Session、Producer、Consumer
)
队列
在点对点的消息传递域中,目的地被称为队列(queue
)
每个消息只能有一个消费者,类似1对1的关系。好比个人快递自己领取自己的。
消息的生产者和消费者之间没有时间上的相关性、无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。好比我们的发送短信,发送者发送后不见得接受者会即收即看。
当有多个消费者时,将采取类似负载均衡的策略,将消息均分到各个消费者中。
消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息。
主题
在发布订阅消息传递域中,目的地被称为主题(topic
)
生产者将消息发布到topic
中,每个消息可以有多个消费者,属于 1:N的关系
生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息
生产者生产时,topic
不保存消息,它是无状态的不落地,假设无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者。
JMS
规范允许客户创建持久订阅,在这一程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息,好比如微信公众号订阅。
1 2 3
|
Topic topic = session.createTopic(TOPIC_NAME);
|