ActiveMQ

 

定义:

  默认提供JMS服务端口(即后台端口):61616。默认外部访问端口:8161。默认用户名和密码:admin,admin

作用:解耦,削峰,异步

安装:安装解压后,普通启动./bin/activemq start    有日志记录启动:./bin/activemq start > /activeMQ/log/run_activemq.log  

pom文件

<dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.9</version>
        </dependency>
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>
        <!--内嵌ActiveMQ-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.5</version>
        </dependency>
        <!--spring整合activemq-->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.9</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-orm</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjrt</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>1.5.4</version>
        </dependency>
        <dependency>
            <groupId>cglib</groupId>
            <artifactId>cglib</artifactId>
            <version>2.1_2</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>

View Code

 

Application文件

ApplicationContext.yml文件
       application.yml
        server:
          port: 8888
        spring:
          activemq:
            broker-url: tcp://120.79.233.40:61616
            user: admin
            password: admin
          jms:
            pub-sub-domain: false     #false = Queue  true = Topic
        #自定义队列名
        myqueue: boot-activemq-queue

View Code

 

队列Queue和主题Topic区别(发送和接受消息的类型都保持一致):

Queue:生产者生成在这个队列,消费者消费这个队列。

Topic:消费者订阅这个主题,生产者生成的消息会被每个订阅了这个主题的消费者消费掉。(无订阅的Topic发送的消息为废消息)

5种消息类型:

TextMessage,MapMessage,BytesMessage,StreamMessage,ObjectMessage

消息可靠性

    持久性:
持久(服务器宕机,消息还在) 生产者.setDeliveryMode(DeliveryMode.PERSISTENT)
非持久(服务器宕机,消息不存在) 生产者.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
事务:
开启了事务需要进行提交commit()
生产者不commit会导致消息不发送。消费者不commit会导致重复消费
签收(ack):
自动签收
手动签收 ---- 消费者需要ack签收不然会重复消费。有事务状态下,签收为自动签收,写手动无效。
允许重复消息

传输协议:

支持的协议:UDP,SSL,HTTPS,VM,TCP,NIO

修改协议(2选1):

  1. 添加新协议NIO,端口和其他协议不同(配置文件activemq.xml配置<transportConnectors>,name=”openwrite”表示默认协议)
    修改/conf/activemq.xml
    #在<transportConnectors>里添加
    <transportConnector name="nio" uri="nio://0.0.0.0:61618?trace=true">
    #重启activemq #生产者和消费者的链接tcp://... 改为为 nio://...
  2. 添加 一个端口支持TCP,NIO等多种协议
    #在<transportConnectors>里添加
    <transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&amp; wireFormat.maxFrameSize=104857600&amp;org.activemq.transport.nio.SelectorManager.corePoolSize=20&amp; org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50">
    #生产者和消费者的连接可以为tcp或者nio。其他特殊的协议需要有对应的特殊代码

存储持久化JDBC(Mysql):

  1. 将mysql的驱动包放在mq的lib文件夹下
  2. 持久化配置 
    修改前先备份:cp /conf/activemq.xml /conf/activemq.xml.bk 
    修改              :  vim /conf/activemq.xml
    #替换persistenceAdapter的默认kahadb
    <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> #与下面的bean的id对应 #jdbcPersistenceAdapter有个createTablesOnStartip不写默认为true,启动就在mysql创建表。第一次启动设置为true,一般启动完改为false </persistenceAdapter> #将下面这段复制在</broker> 和 </beans> 之间 <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
       #注意修改为自己的 ip,端口,数据库,账号,密码
    <property name="url" value="jdbc:mysql://localhost:3306/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="poolPreparedStatements" value="true"/>
    </bean>
  3. 创建你所配置的数据库
  4. 重启activemq后打开activemq管理页面,能打开管理页面则去数据库查看数据库是否有生成三张表。
  5. 执行上面的生产者代码,注意需要开启持久化(否则不进数据库)

     正常效果是,queue生产者生成消息后,数据库(activemq_msgs表)有消息的数据,消费者消费后表中数据被删除。
    topic生产者生产的订阅记录放在activemq_acks,记录订阅的人员。

  6. 不足:JDBC每次消息都要进行读写。通过journal告诉缓存提高性能(原理:先在journal消费。一定时间后把剩下的写入mysql)
    加入journal:()
    注释掉其他的persistenceAdapter
    <persistenceFactory>
    <journalPersistenceAdapterFactory
    journalLogFiles="4"
    journalLogFileSize="32768"
    useJournal="true"
    useQuickJournal="true"
    dataSource="#mysql-ds"
    dataDirectory="activemq-data" />
    </persistenceFactory>

面试题:

     如何保证高可用?

高可用也就是部署多个,挂了有另一个顶替。可采用Zookeeper + LevelDB进行多节节点集群。

     有关异步投递的问题?

ActiveMQ默认用的是异步消息投递。【开启异步投递:tcp://…?jms.useAsyncSend=true,或者ActiveMQConnectionFactory对象.setUseAsYncSend(true)】

没有使用事务发送持久化消息(此时的消息发送方式会变成同步) /  允许失败有少量数据丢失如何保证消息发送成功?

ActiveMQMessageProducer对象有提供一个send()方法,通过重写AsyncCallback()方法,但消息发送成功、发送失败都会调用这个回调方法,此时可以根据需求进行判断消息是否发送成功。(如给消息ID,回调的时候拿到的ID是否和发送的消息一致)。

ActiveMQMessageProducer对象.send(message,new AsyncCallback(){

                  重写成功和失败的方法

})

注意:同步发送等 send不阻塞为发送成功,异步所有消息都是视为成功。未发送的数据遇到宕机会全部丢失

将MessageProducer 改为 ActiveMQMessageProducer。

     延迟投递和定时投递?

配置开启(软件conf文件夹下activemq.xml):<broker …. schedulerSupport=”true”>

代码设置(生产者和消费者):、message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 延迟时间);

message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 重复投递间隔时间);

message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 重复投递次数);

     消息重试机制?

触发:

无commit()  /  调用commit()之前关闭了Client;

调用了rollback();

需要手动签收情况下调用了recover();

默认:重发间隔1秒;重发次数为6次(超过次数会被列入DLQ,也就是死信队列。消费端会给broker发送“poison ack” 表示消息有毒,放入DLQ后不再给消费者消费。DLQ的作用是处理失败消息)。如果要修改默认则实例化RedeliveryPolicy进行设置,也可通过bean方式。

如何不被重复消费?(2种思路)

1.如果是需要插入数据库的。给消息一个唯一ID,当插入时有重复会触发主键冲突来避免脏数据。.

2.将要发送的消息的id和message以K,V键值对存入redis数据库,下次消息消费查询redis是否有重复来避免重复消费。

 

 

 

版权声明:本文为hunter-007原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/hunter-007/p/11131465.html