ActiveMQ
消息持久化
消息持久化是一种MQ
服务器宕机了,消息不会丢失的机制。为避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。
ActiveMQ
消息持久化机制有JDBC
,AMQ
,KahaDB
和LevelDB
,无论使用哪种持久化方式,消息的存储逻辑都是一致的。
就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等再试图将消息发送给接收者,成功则将消息从存储中删除,失败则继续尝试发送。
消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。
AMQ Message Store
AMQ
是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32M,当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3
之前的版本,现在已经不使用了。
KahaDB消息存储
KahaDB
是从ActiveMQ5.4
开始到至今的默认持久化插件,可用于任何场景,提高性能和恢复能力。
消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。
KahaDB
是一个专门针对消息持久化的解决方案,它对典型的消息使用模型进行了优化。
数据被追加到data logs
中,当不在需要log
文件中的数据的时候,log
会被丢弃。
1 | <!--activemq.xml的配置--> |
在ActiveMQ
安装目录下的data/kahadb
文件夹中,只有四类文件和一个lock
,跟ActiveMQ
的其他几种文件存储引擎相比十分简洁。
db-<Number>.log
:KahaDB
存储消息到预定义大小的数据记录文件中,文件命名为db-<Number>.log
。当数据文件已满时,一个新的文件会随之创建,Number
数值也会随之递增,它随着消息数量的增加,如每32M一个文件,文件名按照数字进行编号,如db-1.log
、db-2.log
、db-3.log
……当不再有引用到数据文件中的任何消息时,文件会被删除或者归档。db.data
:该文件包含了持久化的
BTree
索引,索引了消息数据记录中的消息,他是消息的索引文件,本质上是B-Tree
(B树),使用B-Tree
作为索引指向db-<Number>.log
里面存储的消息。db.free
:当
db.data
文件里哪些页面是空闲的,文件具体内容是所有的空闲页的ID,方便后续db.data
建立索引时使用,保证索引的连续性,没有碎片。db.redo
:用来进行消息恢复,如果
KahaDB
消息存储在强制退出后启动,用于恢复Btree
索引。Lock
:文件锁,标识当前获得
KahaDB
读取权限的Broker
。
LevelDB消息存储
这种文件系统时从ActiveMQ5.8
之后引进的,它和KahaDB
非常相似,也是基于文件的本地数据库储存形式,但是它提供比KahaDB
更快的持久性。
但它不能使用自定义B-Tree
实现来索引与写日志,而是使用基于LevelDB
的索引。
1 | <!--默认的配置如下:--> |
JDBC消息存储
以Mysql
数据库为例,需要将Mysql
数据库的驱动包mysql-connector-java-5.1.38.jar
添加到ActiveMQ
目录下的lib
文件夹中。
然后修改activemq.xml
配置文件,按照如下修改:
1 | <!--原KahaDB的配置--> |
dataSource
指定将要引用的持久化数据库的bean
名称,createTablesOnStartup
参数表示是否在启动的时候创建数据库表,默认值是true
,这样每次启动都会去创建数据库表了,一般是第一次启动的时候设置为true
,之后改成false
。
上文指定了一个数据库实例mysql-ds
,所以需要创建一个mysql-ds
的实例,通过activemq.xml
中的<broker>
标签外设置bean
1 | <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> |
其中的org.apache.commons.dbcp2.BasicDataSource
是JDBC
驱动包自带的,当然也可以替换成C3P0
或者Druid
,但是lib
文件夹就需要再添加C3p0
或者Druid
相关的依赖包。
然后创建一个与上方配置相同的数据库(activemq
),执行语句CREATE DATABASE activemq
。
如果配置正常且启动成功,将会在数据库创建三张表ACTIVEMQ_MSGS
、ACTIVEMQ_ACKS
、ACTIVEMQ_LOCK
。
ACTIVE_MSGS
列名 | 意义 |
---|---|
ID | 自增的数据库主键 |
CONTAINER | 消息的Destination |
MSGID_PROD | 消息发送者的主键 |
MSG_SEQ | 消息发送的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID |
EXPIRATION | 消息过期时间,存储的是从1970-01-01到现在的毫秒数 |
MSG | 消息本体的Java序列化对象的二进制数据 |
PRIORITY | 优先级,0-9,数值越大优先级越高 |
ACTIVEMQ_ACKS
activemq_acks
用于存储订阅关系。如果是持久化Topic
,订阅者和服务器的订阅关系在这个表保存。
列名 | 意义 |
---|---|
CONTAINER | 消息的Destination |
SUB_DEST | 如果是使用Static集群,这个字段会有集群其他系统的信息 |
CLIENT_ID | 每个订阅者都必须有一个唯一的客户端ID用以区分 |
SUB_NAME | 订阅者名称 |
SELECTOR | 选择器,可以只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作 |
LAST_ACKED_ID | 记录消费过的消息的ID |
ACTIVEMQ_LOCK
表activemq_lock
在集群环境中才有用,只有一个Broker
可以获得消息,称为Master Broker
,其他的只能作为备份等待Master Broker
不可用,才可能称为下一个Master Broker
。这个表用于记录哪个Broker
是当前的Master Broker
。
队列
在点对点类型中:
- 当
DeliveryMode
设置为NON_PERSISTENCE
时,消息被保存在内存中; - 当
DeliveryMode
设置为PERSISTENCE
时,消息保存在broker
的相应的文件或者数据库中。 - 而且点对点类型中消息一旦被
Consumer
消费就从broker
中删除。
1 | //开启消息持久化 |
能够看到开启消息持久化后,生产者发送消息到队列中,通过查询activemq_msgs
表能够看到数据变化情况。
主题
开启消息持久化,先启动消费者订阅再运行生产者,可以看到activemq_acks
的变化情况。
总结
一定要开启消息持久化:
如果是队列:
在没有消费者消费的情况下会将消息保存到
activemq_msgs
表中,只要有任意一个消费者已经消费过了,消费之后这些消息将会被立刻删除。如果是主题:
一般是先启动消费者订阅然后再生产的情况下会将消息保存到
activemq_acks
。数据库
Jar
包:记得需要使用到的相关
Jar
文件放置到ActiveMQ
安装路径下的lib
目录。mysql-jdbc
驱动包和对应的数据库连接池Jar包createTablesOnStartup
属性:在
jdbcPersistenceAdapter
标签中设置了这个属性为true
时,在第一次启动ActiveMQ
时,ActiveMQ
服务节点会自动创建所需要的数据表,启动完成后可以更改为false
。下划线:
java.lang.illeglStateException:BeanFactory not initalized or already closed
,这是因为操作系统机器名中有“_”符号,请更改机器名并重启后即可解决问题。
JDBC增强版
JDBC Message store With ActiveMQ Journal
,简称JDBC增强版,这种方式克服了JDBC Store
的不足,JDBC
每次消息过来,都需要去写库和读库。
ActiveMQ Journal
,使用高速缓存写入技术,大大提高了性能。
当消费者的消费速度能够及时跟上生产者消息的生产速度时,Journal
文件能够大大减少需要写入到DB中的消息。
举个例子:
生产者生产了1000条消息,这1000条消息会保存到journal
文件,如果消费者的消费速度很快的情况下,在journal
文件还没有同步到DB之前,消费者已经消费了90%以上的消息,那么这个时候只需要同步剩余的10%的消息到DB。
如果消费者消费的速度很慢,这时候journal
文件可以使消息以批量方式写到DB。
1 | <!--原JDBC的配置--> |
总结
持久化消息主要是指MQ所在的服务器宕机后消息不会丢失的机制。
持久化机制演化过程:
从最初的AMQ Message Store
方案到ActiveMQ V4
版本中推出的High performance journal
(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。
ActiveMQ V5.3
版本中又推出了对KahaBD
的支持(V5.4
版本后成为ActiveMQ
默认的持久化方案),后来AciveMQ V5.8
版本开始支持LevelDB
,到现在,v5.9+
版本提供了标准的Zookeeper
+LevelDB
集群化方案。
ActiveMQ
的消息持久化机制:
AMQ
: 基于日志文件KahaDB
:基于日志文件,从ActiveMQ 5.4
开始默认的持久化插件JDBC
:基于第三方数据库- LevelDB:基于文件的本地数据库储存,从
ActiveMQ 5.8
版本之后又推出了LevelDB
的持久化引擎性能高于KahaDB
Replicated LevelDB Store
:从ActiveMQ 5.9
提供了基于LevelDB
和Zookeeper
的数据复制方式,用于Master-slave
方式的首选数据复制方案。
无论使用哪种持久化方式,消息的存储逻辑都是一致的:
就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。消息中心启动以后首先要检查指定的存储位置,如果有未发送的消息,则需要把消息发送出去。
最后更新: 2021年01月26日 15:01
原始链接: https://midkuro.gitee.io/2020/05/23/activemq-persistent/