ActiveMQ - hunters007
定义:
默认提供JMS服务端口(即后台端口):61616。默认外部访问端口:8161。默认用户名和密码:admin,admin
作用:解耦,削峰,异步
安装:安装解压后,普通启动./bin/activemq start 有日志记录启动:./bin/activemq start > /activeMQ/log/run_activemq.log
pom文件
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> </dependency> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>3.16</version> </dependency> <!--内嵌ActiveMQ--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.5</version> </dependency> <!--spring整合activemq--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.23.RELEASE</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.9</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>4.3.23.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.3.23.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>4.3.23.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-orm</artifactId> <version>4.3.23.RELEASE</version> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjrt</artifactId> <version>1.6.1</version> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>1.5.4</version> </dependency> <dependency> <groupId>cglib</groupId> <artifactId>cglib</artifactId> <version>2.1_2</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> <scope>provided</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies>
View Code
Application文件
ApplicationContext.yml文件
application.yml
server:
port: 8888
spring:
activemq:
broker-url: tcp://120.79.233.40:61616
user: admin
password: admin
jms:
pub-sub-domain: false #false = Queue true = Topic
#自定义队列名
myqueue: boot-activemq-queue
View Code
队列Queue和主题Topic区别(发送和接受消息的类型都保持一致):
Queue:生产者生成在这个队列,消费者消费这个队列。
Topic:消费者订阅这个主题,生产者生成的消息会被每个订阅了这个主题的消费者消费掉。(无订阅的Topic发送的消息为废消息)
5种消息类型:
TextMessage,MapMessage,BytesMessage,StreamMessage,ObjectMessage
消息可靠性
持久性:
持久(服务器宕机,消息还在) 生产者.setDeliveryMode(DeliveryMode.PERSISTENT)
非持久(服务器宕机,消息不存在) 生产者.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
事务:
开启了事务需要进行提交commit()
生产者不commit会导致消息不发送。消费者不commit会导致重复消费
签收(ack):
自动签收
手动签收 ---- 消费者需要ack签收不然会重复消费。有事务状态下,签收为自动签收,写手动无效。
允许重复消息
传输协议:
支持的协议:UDP,SSL,HTTPS,VM,TCP,NIO
修改协议(2选1):
- 添加新协议NIO,端口和其他协议不同(配置文件activemq.xml配置<transportConnectors>,name=”openwrite”表示默认协议)
修改/conf/activemq.xml#在<transportConnectors>里添加
<transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true">
#重启activemq #生产者和消费者的链接tcp://... 改为为 nio://... - 添加 一个端口支持TCP,NIO等多种协议
#在<transportConnectors>里添加
<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000& wireFormat.maxFrameSize=104857600&org.activemq.transport.nio.SelectorManager.corePoolSize=20& org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50">
#生产者和消费者的连接可以为tcp或者nio。其他特殊的协议需要有对应的特殊代码
存储持久化JDBC(Mysql):
- 将mysql的驱动包放在mq的lib文件夹下
- 持久化配置
修改前先备份:cp /conf/activemq.xml /conf/activemq.xml.bk
修改 : vim /conf/activemq.xml#替换persistenceAdapter的默认kahadb
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> #与下面的bean的id对应 #jdbcPersistenceAdapter有个createTablesOnStartip不写默认为true,启动就在mysql创建表。第一次启动设置为true,一般启动完改为false </persistenceAdapter> #将下面这段复制在</broker> 和 </beans> 之间 <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
#注意修改为自己的 ip,端口,数据库,账号,密码 <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="poolPreparedStatements" value="true"/>
</bean> - 创建你所配置的数据库
- 重启activemq后打开activemq管理页面,能打开管理页面则去数据库查看数据库是否有生成三张表。
- 执行上面的生产者代码,注意需要开启持久化(否则不进数据库)
正常效果是,queue生产者生成消息后,数据库(activemq_msgs表)有消息的数据,消费者消费后表中数据被删除。
topic生产者生产的订阅记录放在activemq_acks,记录订阅的人员。 -
不足:JDBC每次消息都要进行读写。通过journal告诉缓存提高性能(原理:先在journal消费。一定时间后把剩下的写入mysql)
加入journal:()
注释掉其他的persistenceAdapter
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="activemq-data" />
</persistenceFactory>
面试题:
如何保证高可用?
高可用也就是部署多个,挂了有另一个顶替。可采用Zookeeper + LevelDB进行多节节点集群。
有关异步投递的问题?
ActiveMQ默认用的是异步消息投递。【开启异步投递:tcp://…?jms.useAsyncSend=true,或者ActiveMQConnectionFactory对象.setUseAsYncSend(true)】
没有使用事务发送持久化消息(此时的消息发送方式会变成同步) / 允许失败有少量数据丢失如何保证消息发送成功?
ActiveMQMessageProducer对象有提供一个send()方法,通过重写AsyncCallback()方法,但消息发送成功、发送失败都会调用这个回调方法,此时可以根据需求进行判断消息是否发送成功。(如给消息ID,回调的时候拿到的ID是否和发送的消息一致)。
ActiveMQMessageProducer对象.send(message,new AsyncCallback(){
重写成功和失败的方法
})
注意:同步发送等 send不阻塞为发送成功,异步所有消息都是视为成功。未发送的数据遇到宕机会全部丢失
将MessageProducer 改为 ActiveMQMessageProducer。
延迟投递和定时投递?
配置开启(软件conf文件夹下activemq.xml):<broker …. schedulerSupport=”true”>
代码设置(生产者和消费者):、message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 延迟时间);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 重复投递间隔时间);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 重复投递次数);
消息重试机制?
触发:
无commit() / 调用commit()之前关闭了Client;
调用了rollback();
需要手动签收情况下调用了recover();
默认:重发间隔1秒;重发次数为6次(超过次数会被列入DLQ,也就是死信队列。消费端会给broker发送“poison ack” 表示消息有毒,放入DLQ后不再给消费者消费。DLQ的作用是处理失败消息)。如果要修改默认则实例化RedeliveryPolicy进行设置,也可通过bean方式。
如何不被重复消费?(2种思路)
1.如果是需要插入数据库的。给消息一个唯一ID,当插入时有重复会触发主键冲突来避免脏数据。.
2.将要发送的消息的id和message以K,V键值对存入redis数据库,下次消息消费查询redis是否有重复来避免重复消费。