ActiveMQ
消息持久化
消息持久化是一种MQ服务器宕机了,消息不会丢失的机制。为避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。
ActiveMQ消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。
就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等再试图将消息发送给接收者,成功则将消息从存储中删除,失败则继续尝试发送。
消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。
AMQ Message Store
AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32M,当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本,现在已经不使用了。
KahaDB消息存储
KahaDB是从ActiveMQ5.4开始到至今的默认持久化插件,可用于任何场景,提高性能和恢复能力。
消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。
KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模型进行了优化。
数据被追加到data logs中,当不在需要log文件中的数据的时候,log会被丢弃。
1 | <!--activemq.xml的配置--> |
在ActiveMQ安装目录下的data/kahadb文件夹中,只有四类文件和一个lock,跟ActiveMQ的其他几种文件存储引擎相比十分简洁。

db-<Number>.log:KahaDB存储消息到预定义大小的数据记录文件中,文件命名为db-<Number>.log。当数据文件已满时,一个新的文件会随之创建,Number数值也会随之递增,它随着消息数量的增加,如每32M一个文件,文件名按照数字进行编号,如db-1.log、db-2.log、db-3.log……当不再有引用到数据文件中的任何消息时,文件会被删除或者归档。db.data:该文件包含了持久化的
BTree索引,索引了消息数据记录中的消息,他是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-<Number>.log里面存储的消息。db.free:当
db.data文件里哪些页面是空闲的,文件具体内容是所有的空闲页的ID,方便后续db.data建立索引时使用,保证索引的连续性,没有碎片。db.redo:用来进行消息恢复,如果
KahaDB消息存储在强制退出后启动,用于恢复Btree索引。Lock:文件锁,标识当前获得
KahaDB读取权限的Broker。
LevelDB消息存储
这种文件系统时从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库储存形式,但是它提供比KahaDB更快的持久性。
但它不能使用自定义B-Tree实现来索引与写日志,而是使用基于LevelDB的索引。
1 | <!--默认的配置如下:--> |
JDBC消息存储
以Mysql数据库为例,需要将Mysql数据库的驱动包mysql-connector-java-5.1.38.jar添加到ActiveMQ目录下的lib文件夹中。
然后修改activemq.xml配置文件,按照如下修改:
1 | <!--原KahaDB的配置--> |
dataSource指定将要引用的持久化数据库的bean名称,createTablesOnStartup参数表示是否在启动的时候创建数据库表,默认值是true,这样每次启动都会去创建数据库表了,一般是第一次启动的时候设置为true,之后改成false。
上文指定了一个数据库实例mysql-ds,所以需要创建一个mysql-ds的实例,通过activemq.xml中的<broker>标签外设置bean
1 | <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> |
其中的org.apache.commons.dbcp2.BasicDataSource是JDBC驱动包自带的,当然也可以替换成C3P0或者Druid,但是lib文件夹就需要再添加C3p0或者Druid相关的依赖包。
然后创建一个与上方配置相同的数据库(activemq),执行语句CREATE DATABASE activemq 。
如果配置正常且启动成功,将会在数据库创建三张表ACTIVEMQ_MSGS、ACTIVEMQ_ACKS、ACTIVEMQ_LOCK。
ACTIVE_MSGS
| 列名 | 意义 |
|---|---|
| ID | 自增的数据库主键 |
| CONTAINER | 消息的Destination |
| MSGID_PROD | 消息发送者的主键 |
| MSG_SEQ | 消息发送的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID |
| EXPIRATION | 消息过期时间,存储的是从1970-01-01到现在的毫秒数 |
| MSG | 消息本体的Java序列化对象的二进制数据 |
| PRIORITY | 优先级,0-9,数值越大优先级越高 |
ACTIVEMQ_ACKS
activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。
| 列名 | 意义 |
|---|---|
| CONTAINER | 消息的Destination |
| SUB_DEST | 如果是使用Static集群,这个字段会有集群其他系统的信息 |
| CLIENT_ID | 每个订阅者都必须有一个唯一的客户端ID用以区分 |
| SUB_NAME | 订阅者名称 |
| SELECTOR | 选择器,可以只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作 |
| LAST_ACKED_ID | 记录消费过的消息的ID |
ACTIVEMQ_LOCK
表activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能称为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker。
队列
在点对点类型中:
- 当
DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中; - 当
DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。 - 而且点对点类型中消息一旦被
Consumer消费就从broker中删除。
1 | //开启消息持久化 |
能够看到开启消息持久化后,生产者发送消息到队列中,通过查询activemq_msgs表能够看到数据变化情况。

主题
开启消息持久化,先启动消费者订阅再运行生产者,可以看到activemq_acks的变化情况。

总结
一定要开启消息持久化:
如果是队列:
在没有消费者消费的情况下会将消息保存到
activemq_msgs表中,只要有任意一个消费者已经消费过了,消费之后这些消息将会被立刻删除。如果是主题:
一般是先启动消费者订阅然后再生产的情况下会将消息保存到
activemq_acks。数据库
Jar包:记得需要使用到的相关
Jar文件放置到ActiveMQ安装路径下的lib目录。mysql-jdbc驱动包和对应的数据库连接池Jar包createTablesOnStartup属性:在
jdbcPersistenceAdapter标签中设置了这个属性为true时,在第一次启动ActiveMQ时,ActiveMQ服务节点会自动创建所需要的数据表,启动完成后可以更改为false。下划线:
java.lang.illeglStateException:BeanFactory not initalized or already closed,这是因为操作系统机器名中有“_”符号,请更改机器名并重启后即可解决问题。
JDBC增强版
JDBC Message store With ActiveMQ Journal,简称JDBC增强版,这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库和读库。
ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。
当消费者的消费速度能够及时跟上生产者消息的生产速度时,Journal文件能够大大减少需要写入到DB中的消息。
举个例子:
生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%以上的消息,那么这个时候只需要同步剩余的10%的消息到DB。
如果消费者消费的速度很慢,这时候journal文件可以使消息以批量方式写到DB。
1 | <!--原JDBC的配置--> |
总结
持久化消息主要是指MQ所在的服务器宕机后消息不会丢失的机制。
持久化机制演化过程:
从最初的AMQ Message Store方案到ActiveMQ V4版本中推出的High performance journal(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。
ActiveMQ V5.3版本中又推出了对KahaBD的支持(V5.4版本后成为ActiveMQ默认的持久化方案),后来AciveMQ V5.8版本开始支持LevelDB,到现在,v5.9+版本提供了标准的Zookeeper+LevelDB集群化方案。
ActiveMQ的消息持久化机制:
AMQ: 基于日志文件KahaDB:基于日志文件,从ActiveMQ 5.4开始默认的持久化插件JDBC:基于第三方数据库- LevelDB:基于文件的本地数据库储存,从
ActiveMQ 5.8版本之后又推出了LevelDB的持久化引擎性能高于KahaDB Replicated LevelDB Store:从ActiveMQ 5.9提供了基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。
无论使用哪种持久化方式,消息的存储逻辑都是一致的:
就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。消息中心启动以后首先要检查指定的存储位置,如果有未发送的消息,则需要把消息发送出去。
最后更新: 2021年01月26日 15:01
原始链接: https://midkuro.gitee.io/2020/05/23/activemq-persistent/