ActiveMQ

消息持久化

消息持久化是一种MQ服务器宕机了,消息不会丢失的机制。为避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制

ActiveMQ消息持久化机制有JDBC,AMQKahaDBLevelDB,无论使用哪种持久化方式,消息的存储逻辑都是一致的。

就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等再试图将消息发送给接收者,成功则将消息从存储中删除,失败则继续尝试发送。

消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。

AMQ Message Store

AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个个文件中,文件的默认大小为32M,当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本,现在已经不使用了。

KahaDB消息存储

KahaDBActiveMQ5.4开始到至今的默认持久化插件,可用于任何场景,提高性能和恢复能力。

消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址

KahaDB是一个专门针对消息持久化的解决方案,它对典型的消息使用模型进行了优化。

数据被追加到data logs中,当不在需要log文件中的数据的时候,log会被丢弃。

1
2
3
4
<!--activemq.xml的配置-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb" />
</persistenceAdapter>

ActiveMQ安装目录下的data/kahadb文件夹中,只有四类文件和一个lock,跟ActiveMQ的其他几种文件存储引擎相比十分简洁。

activemq-16

  1. db-<Number>.log

    KahaDB存储消息到预定义大小的数据记录文件中,文件命名为db-<Number>.log。当数据文件已满时,一个新的文件会随之创建,Number数值也会随之递增,它随着消息数量的增加,如每32M一个文件,文件名按照数字进行编号,如db-1.logdb-2.logdb-3.log……当不再有引用到数据文件中的任何消息时,文件会被删除或者归档。

  2. db.data

    该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,他是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-<Number>.log里面存储的消息。

  3. db.free

    db.data文件里哪些页面是空闲的,文件具体内容是所有的空闲页的ID,方便后续db.data建立索引时使用,保证索引的连续性,没有碎片。

  4. db.redo

    用来进行消息恢复,如果KahaDB消息存储在强制退出后启动,用于恢复Btree索引。

  5. Lock

    文件锁,标识当前获得KahaDB读取权限的Broker

LevelDB消息存储

这种文件系统时从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库储存形式,但是它提供比KahaDB更快的持久性。

但它不能使用自定义B-Tree实现来索引与写日志,而是使用基于LevelDB的索引。

1
2
3
4
<!--默认的配置如下:-->
<persistenceAdapter>
<levelDB directory="activemq-data" />
</persistenceAdapter>

JDBC消息存储

Mysql数据库为例,需要将Mysql数据库的驱动包mysql-connector-java-5.1.38.jar添加到ActiveMQ目录下的lib文件夹中。

然后修改activemq.xml配置文件,按照如下修改:

1
2
3
4
5
6
7
8
<!--原KahaDB的配置-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb" />
</persistenceAdapter>
<!--修改成JDBC配置-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true" />
</persistenceAdapter>

dataSource指定将要引用的持久化数据库的bean名称,createTablesOnStartup参数表示是否在启动的时候创建数据库表,默认值是true,这样每次启动都会去创建数据库表了,一般是第一次启动的时候设置为true,之后改成false

上文指定了一个数据库实例mysql-ds,所以需要创建一个mysql-ds的实例,通过activemq.xml中的<broker>标签外设置bean

1
2
3
4
5
6
7
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> 
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
<property name="poolPreparedStatements" value="true"/>
</bean>

其中的org.apache.commons.dbcp2.BasicDataSourceJDBC驱动包自带的,当然也可以替换成C3P0或者Druid,但是lib文件夹就需要再添加C3p0或者Druid相关的依赖包。

然后创建一个与上方配置相同的数据库(activemq),执行语句CREATE DATABASE activemq

如果配置正常且启动成功,将会在数据库创建三张表ACTIVEMQ_MSGSACTIVEMQ_ACKSACTIVEMQ_LOCK

ACTIVE_MSGS

列名 意义
ID 自增的数据库主键
CONTAINER 消息的Destination
MSGID_PROD 消息发送者的主键
MSG_SEQ 消息发送的顺序,MSGID_PROD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION 消息过期时间,存储的是从1970-01-01到现在的毫秒数
MSG 消息本体的Java序列化对象的二进制数据
PRIORITY 优先级,0-9,数值越大优先级越高

ACTIVEMQ_ACKS

activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。

列名 意义
CONTAINER 消息的Destination
SUB_DEST 如果是使用Static集群,这个字段会有集群其他系统的信息
CLIENT_ID 每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME 订阅者名称
SELECTOR 选择器,可以只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作
LAST_ACKED_ID 记录消费过的消息的ID

ACTIVEMQ_LOCK

activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能称为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker

队列

在点对点类型中:

  • DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中;
  • DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。
  • 而且点对点类型中消息一旦被Consumer消费就从broker中删除。
1
2
//开启消息持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

能够看到开启消息持久化后,生产者发送消息到队列中,通过查询activemq_msgs表能够看到数据变化情况。

activemq-17

主题

开启消息持久化,先启动消费者订阅再运行生产者,可以看到activemq_acks的变化情况。

activemq-18

总结

一定要开启消息持久化

  • 如果是队列:

    在没有消费者消费的情况下会将消息保存到activemq_msgs表中,只要有任意一个消费者已经消费过了,消费之后这些消息将会被立刻删除

  • 如果是主题:

    一般是先启动消费者订阅然后再生产的情况下会将消息保存到activemq_acks

  • 数据库Jar包:

    记得需要使用到的相关Jar文件放置到ActiveMQ安装路径下的lib目录。mysql-jdbc驱动包和对应的数据库连接池Jar包

  • createTablesOnStartup属性:

    jdbcPersistenceAdapter标签中设置了这个属性为true时,在第一次启动ActiveMQ时,ActiveMQ服务节点会自动创建所需要的数据表,启动完成后可以更改为false

  • 下划线:

    java.lang.illeglStateException:BeanFactory not initalized or already closed,这是因为操作系统机器名中有“_”符号,请更改机器名并重启后即可解决问题。

JDBC增强版

JDBC Message store With ActiveMQ Journal,简称JDBC增强版,这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库和读库。

ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。

当消费者的消费速度能够及时跟上生产者消息的生产速度时,Journal文件能够大大减少需要写入到DB中的消息。

举个例子:

​ 生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%以上的消息,那么这个时候只需要同步剩余的10%的消息到DB

如果消费者消费的速度很慢,这时候journal文件可以使消息以批量方式写到DB。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<!--原JDBC的配置-->
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true" />
</persistenceAdapter>
<!--修改成JDBC Journal配置-->
<persistenceAdapter>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFilSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="activemq-data" />
</persistenceAdapter>

总结

持久化消息主要是指MQ所在的服务器宕机后消息不会丢失的机制。

持久化机制演化过程:

从最初的AMQ Message Store方案到ActiveMQ V4版本中推出的High performance journal(高性能事务支持)附件,并且同步推出了关于关系型数据库的存储方案。

ActiveMQ V5.3版本中又推出了对KahaBD的支持(V5.4版本后成为ActiveMQ默认的持久化方案),后来AciveMQ V5.8版本开始支持LevelDB,到现在,v5.9+版本提供了标准的Zookeeper+LevelDB集群化方案。

ActiveMQ的消息持久化机制:

  • AMQ: 基于日志文件
  • KahaDB:基于日志文件,从ActiveMQ 5.4开始默认的持久化插件
  • JDBC:基于第三方数据库
  • LevelDB:基于文件的本地数据库储存,从ActiveMQ 5.8版本之后又推出了LevelDB的持久化引擎性能高于KahaDB
  • Replicated LevelDB Store:从ActiveMQ 5.9提供了基于LevelDBZookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。

无论使用哪种持久化方式,消息的存储逻辑都是一致的:

就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,然后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。消息中心启动以后首先要检查指定的存储位置,如果有未发送的消息,则需要把消息发送出去。

最后更新: 2021年01月26日 15:01

原始链接: https://midkuro.gitee.io/2020/05/23/activemq-persistent/

× 请我吃糖~
打赏二维码