小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

ActiveMQ學(xué)習(xí)系列(四)----消息持久化到mysql

 WindySky 2018-05-03

前記:目前學(xué)習(xí)還比較雜亂,還未找到系統(tǒng)化地學(xué)習(xí)ActiveMq的方法。在網(wǎng)上看到消息持久化的demo,了解了一下,在此記錄。

 

一、目前ActiveMq支持的持久化方法

url:http://activemq./persistence.html

1、Replicated LevelDB Store 

2、LevelDB Store

3、KahaDB

4、JDBC 配合其自帶的 high performance journal;根據(jù)官方說(shuō)法,它內(nèi)置的高性能journal的工作類似于在緩存層工作,消息會(huì)優(yōu)先寫入到j(luò)ournal,后臺(tái)的定時(shí)任務(wù)會(huì)每隔一段時(shí)間間隔去

查看需要寫入到j(luò)dbc的消息。

二、配置activemq.xml

1、修改persistenceAdapter

        <persistenceAdapter>
            <!-- <kahaDB directory="${activemq.data}/kahadb"/>-->
            <jdbcPersistenceAdapter dataSource="#my-ds"/>
        </persistenceAdapter>

上面我們注釋了默認(rèn)的kahadb,添加了jdbc數(shù)據(jù)源。

2、增加數(shù)據(jù)源

復(fù)制代碼
<bean id="my-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
                <property name="driverClassName" value="com.mysql.jdbc.Driver" />
                <property name="url" value="jdbc:mysql://192.168.2.140:3306/activemq?characterEncoding=utf-8" />
                <property name="username" value="root" />
                <property name="password" value="123456" />
                <property name="initialSize" value="5" />
                <property name="maxTotal" value="100" />
                <property name="maxIdle" value="30" />
                <property name="maxWaitMillis" value="10000" />
                <property name="minIdle" value="1" />
</bean>
復(fù)制代碼

這邊有幾個(gè)注意點(diǎn):

2.1:上面配置的數(shù)據(jù)源類對(duì)應(yīng)于dbcp2,如果1.x系列的jar包會(huì)報(bào)錯(cuò),classNotFound異常。

2.2:在主安裝目錄的lib目錄下,將mysql的jar包、dbcp的jar包放到該路徑下

/usr/local/apache-activemq-5.14.4/lib

2.3:建議用bin/activemq console方式啟動(dòng),可以及時(shí)查看錯(cuò)誤信息。我在啟動(dòng)時(shí),報(bào)以下錯(cuò)誤:

復(fù)制代碼
 WARN | Failure Details: Table 'xckk_star_act.ACTIVEMQ_ACKS' doesn't exist
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'xckk_star_act.ACTIVEMQ_ACKS' doesn't exist
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)[:1.8.0_121]
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)[:1.8.0_121]
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)[:1.8.0_121]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)[:1.8.0_121]
        at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)[mysql-connector-java-5.1.38.jar:5.1.38]
        at com.mysql.jdbc.Util.getInstance(Util.java:387)[mysql-connector-java-5.1.38.jar:5.1.38]
        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:939)[mysql-connector-java-5.1.38.jar:5.1.38]
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3878)[mysql-connector-java-5.1.38.jar:5.1.38]
        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3814)[mysql-connector-java-5.1.38.jar:5.1.38]
        at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2478)[mysql-connector-java-5.1.38.jar:5.1.38]
        at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2625)[mysql-connector-java-5.1.38.jar:5.1.38]
        at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2551)[mysql-connector-java-5.1.38.jar:5.1.38]
        at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861)[mysql-connector-java-5.1.38.jar:5.1.38]
        at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2073)[mysql-connector-java-5.1.38.jar:5.1.38]
        at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2009)[mysql-connector-java-5.1.38.jar:5.1.38]
        at com.mysql.jdbc.PreparedStatement.executeLargeUpdate(PreparedStatement.java:5094)[mysql-connector-java-5.1.38.jar:5.1.38]
        at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:1994)[mysql-connector-java-5.1.38.jar:5.1.38]
        at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:98)[commons-dbcp2-2.1.1.jar:2.1.1]
        at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:98)[commons-dbcp2-2.1.1.jar:2.1.1]
        at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doDeleteOldMessages(DefaultJDBCAdapter.java:832)[activemq-jdbc-store-5.14.4.jar:5.14.4]
        at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.cleanup(JDBCPersistenceAdapter.java:349)[activemq-jdbc-store-5.14.4.jar:5.14.4]
        at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter$3.run(JDBCPersistenceAdapter.java:327)[activemq-jdbc-store-5.14.4.jar:5.14.4]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)[:1.8.0_121]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)[:1.8.0_121]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)[:1.8.0_121]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)[:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_121]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_121]
        at java.lang.Thread.run(Thread.java:745)[:1.8.0_121]
復(fù)制代碼

在網(wǎng)上查詢結(jié)果,原來(lái)是數(shù)據(jù)庫(kù)需要字符集設(shè)置為latin1.

 

于是,新建了一個(gè)數(shù)據(jù)庫(kù),字符集設(shè)為L(zhǎng)atin1.然后啟動(dòng)ActiveMq,查看數(shù)據(jù)庫(kù),發(fā)現(xiàn)多了三張表:

在這邊將結(jié)構(gòu)導(dǎo)出來(lái),供手動(dòng)建表:

復(fù)制代碼
/*
SQLyog v10.2 
MySQL - 5.1.71 : Database - activemq
*********************************************************************
*/


/*!40101 SET NAMES utf8 */;

/*!40101 SET SQL_MODE=''*/;

/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`activemq` /*!40100 DEFAULT CHARACTER SET latin1 COLLATE latin1_bin */;

USE `activemq`;

/*Table structure for table `ACTIVEMQ_ACKS` */

DROP TABLE IF EXISTS `ACTIVEMQ_ACKS`;

CREATE TABLE `ACTIVEMQ_ACKS` (
  `CONTAINER` varchar(250) COLLATE latin1_bin NOT NULL,
  `SUB_DEST` varchar(250) COLLATE latin1_bin DEFAULT NULL,
  `CLIENT_ID` varchar(250) COLLATE latin1_bin NOT NULL,
  `SUB_NAME` varchar(250) COLLATE latin1_bin NOT NULL,
  `SELECTOR` varchar(250) COLLATE latin1_bin DEFAULT NULL,
  `LAST_ACKED_ID` bigint(20) DEFAULT NULL,
  `PRIORITY` bigint(20) NOT NULL DEFAULT '5',
  `XID` varchar(250) COLLATE latin1_bin DEFAULT NULL,
  PRIMARY KEY (`CONTAINER`,`CLIENT_ID`,`SUB_NAME`,`PRIORITY`),
  KEY `ACTIVEMQ_ACKS_XIDX` (`XID`)
) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_bin;

/*Table structure for table `ACTIVEMQ_LOCK` */

DROP TABLE IF EXISTS `ACTIVEMQ_LOCK`;

CREATE TABLE `ACTIVEMQ_LOCK` (
  `ID` bigint(20) NOT NULL,
  `TIME` bigint(20) DEFAULT NULL,
  `BROKER_NAME` varchar(250) COLLATE latin1_bin DEFAULT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_bin;

/*Table structure for table `ACTIVEMQ_MSGS` */

DROP TABLE IF EXISTS `ACTIVEMQ_MSGS`;

CREATE TABLE `ACTIVEMQ_MSGS` (
  `ID` bigint(20) NOT NULL,
  `CONTAINER` varchar(250) COLLATE latin1_bin DEFAULT NULL,
  `MSGID_PROD` varchar(250) COLLATE latin1_bin DEFAULT NULL,
  `MSGID_SEQ` bigint(20) DEFAULT NULL,
  `EXPIRATION` bigint(20) DEFAULT NULL,
  `MSG` longblob,
  `PRIORITY` bigint(20) DEFAULT NULL,
  `XID` varchar(250) COLLATE latin1_bin DEFAULT NULL,
  PRIMARY KEY (`ID`),
  KEY `ACTIVEMQ_MSGS_MIDX` (`MSGID_PROD`,`MSGID_SEQ`),
  KEY `ACTIVEMQ_MSGS_CIDX` (`CONTAINER`),
  KEY `ACTIVEMQ_MSGS_EIDX` (`EXPIRATION`),
  KEY `ACTIVEMQ_MSGS_PIDX` (`PRIORITY`),
  KEY `ACTIVEMQ_MSGS_XIDX` (`XID`)
) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_bin;

/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
復(fù)制代碼

 

三、測(cè)試消息持久化

復(fù)制代碼
package com.ckl.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class HelloActiveMQ {
    public static void main(String[] args) throws Exception {
        HelloWorldProducer producer = new HelloWorldProducer();
        HelloWorldConsumer consumer = new HelloWorldConsumer();

        Thread threadProducer = new Thread(producer);
        threadProducer.start();
      //注釋掉消費(fèi)者,不然的話,馬上消費(fèi)者馬上把消息消費(fèi)了,來(lái)不及持久化
//        Thread threadConsumer = new Thread(consumer);
//        threadConsumer.start();
    }

    public static class HelloWorldProducer implements Runnable {
        public void run() {
            try {
                // Create a ConnectionFactory
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.2.140:61616");

                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();

                // Create a Session
                Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("DemoQueue");

                // Create a MessageProducer from the Session to the Topic or Queue
                MessageProducer producer = session.createProducer(destination);
//                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);//消息需要持久化

                // Create a messages
                String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
                TextMessage message = session.createTextMessage(text);

                // Tell the producer to send the message
                System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName());
                producer.send(message);

                // Clean up
                session.close();
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }
    }
    public static class HelloWorldConsumer implements Runnable, ExceptionListener {
        public void run() {
            try {
                // Create a ConnectionFactory
                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                        "tcp://192.168.2.140:61616");
                // Create a Connection
                Connection connection = connectionFactory.createConnection();
                connection.start();
                connection.setExceptionListener(this);
                // Create a Session
                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // Create the destination (Topic or Queue)
                Destination destination = session.createQueue("DemoQueue");
                // Create a MessageConsumer from the Session to the Topic or
                // Queue
                MessageConsumer consumer = session.createConsumer(destination);
                // Wait for a message
                Message message = consumer.receive(1000);
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    String text = textMessage.getText();
                    System.out.println("Received: " + text);
                } else {
                    System.out.println("Received: " + message);
                }
                consumer.close();
                session.close();
                connection.close();
            } catch (Exception e) {
                System.out.println("Caught: " + e);
                e.printStackTrace();
            }
        }

        public synchronized void onException(JMSException ex) {
            System.out.println("JMS Exception occured.  Shutting down client.");
        }
    }
}
復(fù)制代碼

運(yùn)行后,效果如下:

 四、單獨(dú)啟動(dòng)消費(fèi)者,查看數(shù)據(jù)庫(kù)

注釋掉上面的生產(chǎn)者,單獨(dú)啟動(dòng)消費(fèi)者后,發(fā)現(xiàn)數(shù)據(jù)庫(kù)中的消息已被消費(fèi)。

 五、備注

復(fù)制代碼
Although the JDBC Store does not offer the best performance, it makes fairly simple to create a simple Master-Slave robust broker setup. 
When a group of Active MQ brokers is configured to use a shared database, they’ll all try to connect and grab a lock in the lock table,
but only one will succeed and become the master. The remaining brokers will be slaves, and will be in a wait state, not accepting client
connections until the master fails.
The lock table, is ACTIVEMQ_LOCK , and it is used to ensure that only one Active MQ broker instance can access the database at one time.
If an Active MQ broker can’t grab the database lock, that broker won’t initialize fully, and will wait until the lock becomes free, or it’s shut down.
復(fù)制代碼

看到一段資料補(bǔ)充下,涉及到ActiveMq配置為主從模式時(shí),都會(huì)去嘗試連接該數(shù)據(jù)源,為了防止資源爭(zhēng)用出現(xiàn)的問(wèn)題,加了ACTIVEMQ_LOCK表。

 

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多