|
前記:目前學(xué)習(xí)還比較雜亂,還未找到系統(tǒng)化地學(xué)習(xí)ActiveMq的方法。在網(wǎng)上看到消息持久化的demo,了解了一下,在此記錄。
一、目前ActiveMq支持的持久化方法 url:http://activemq./persistence.html 3、KahaDB 4、JDBC 配合其自帶的 high performance journal;根據(jù)官方說(shuō)法,它內(nèi)置的高性能journal的工作類似于在緩存層工作,消息會(huì)優(yōu)先寫入到j(luò)ournal,后臺(tái)的定時(shí)任務(wù)會(huì)每隔一段時(shí)間間隔去 查看需要寫入到j(luò)dbc的消息。 二、配置activemq.xml 1、修改persistenceAdapter <persistenceAdapter>
<!-- <kahaDB directory="${activemq.data}/kahadb"/>-->
<jdbcPersistenceAdapter dataSource="#my-ds"/>
</persistenceAdapter>
上面我們注釋了默認(rèn)的kahadb,添加了jdbc數(shù)據(jù)源。 2、增加數(shù)據(jù)源 <bean id="my-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://192.168.2.140:3306/activemq?characterEncoding=utf-8" /> <property name="username" value="root" /> <property name="password" value="123456" /> <property name="initialSize" value="5" /> <property name="maxTotal" value="100" /> <property name="maxIdle" value="30" /> <property name="maxWaitMillis" value="10000" /> <property name="minIdle" value="1" /> </bean> 這邊有幾個(gè)注意點(diǎn): 2.1:上面配置的數(shù)據(jù)源類對(duì)應(yīng)于dbcp2,如果1.x系列的jar包會(huì)報(bào)錯(cuò),classNotFound異常。 2.2:在主安裝目錄的lib目錄下,將mysql的jar包、dbcp的jar包放到該路徑下 /usr/local/apache-activemq-5.14.4/lib
2.3:建議用bin/activemq console方式啟動(dòng),可以及時(shí)查看錯(cuò)誤信息。我在啟動(dòng)時(shí),報(bào)以下錯(cuò)誤: WARN | Failure Details: Table 'xckk_star_act.ACTIVEMQ_ACKS' doesn't exist
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'xckk_star_act.ACTIVEMQ_ACKS' doesn't exist
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)[:1.8.0_121]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)[:1.8.0_121]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)[:1.8.0_121]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)[:1.8.0_121]
at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)[mysql-connector-java-5.1.38.jar:5.1.38]
at com.mysql.jdbc.Util.getInstance(Util.java:387)[mysql-connector-java-5.1.38.jar:5.1.38]
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:939)[mysql-connector-java-5.1.38.jar:5.1.38]
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3878)[mysql-connector-java-5.1.38.jar:5.1.38]
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3814)[mysql-connector-java-5.1.38.jar:5.1.38]
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2478)[mysql-connector-java-5.1.38.jar:5.1.38]
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2625)[mysql-connector-java-5.1.38.jar:5.1.38]
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2551)[mysql-connector-java-5.1.38.jar:5.1.38]
at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861)[mysql-connector-java-5.1.38.jar:5.1.38]
at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2073)[mysql-connector-java-5.1.38.jar:5.1.38]
at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2009)[mysql-connector-java-5.1.38.jar:5.1.38]
at com.mysql.jdbc.PreparedStatement.executeLargeUpdate(PreparedStatement.java:5094)[mysql-connector-java-5.1.38.jar:5.1.38]
at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:1994)[mysql-connector-java-5.1.38.jar:5.1.38]
at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:98)[commons-dbcp2-2.1.1.jar:2.1.1]
at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:98)[commons-dbcp2-2.1.1.jar:2.1.1]
at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doDeleteOldMessages(DefaultJDBCAdapter.java:832)[activemq-jdbc-store-5.14.4.jar:5.14.4]
at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.cleanup(JDBCPersistenceAdapter.java:349)[activemq-jdbc-store-5.14.4.jar:5.14.4]
at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter$3.run(JDBCPersistenceAdapter.java:327)[activemq-jdbc-store-5.14.4.jar:5.14.4]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)[:1.8.0_121]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)[:1.8.0_121]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)[:1.8.0_121]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)[:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_121]
at java.lang.Thread.run(Thread.java:745)[:1.8.0_121]
在網(wǎng)上查詢結(jié)果,原來(lái)是數(shù)據(jù)庫(kù)需要字符集設(shè)置為latin1.
于是,新建了一個(gè)數(shù)據(jù)庫(kù),字符集設(shè)為L(zhǎng)atin1.然后啟動(dòng)ActiveMq,查看數(shù)據(jù)庫(kù),發(fā)現(xiàn)多了三張表:
在這邊將結(jié)構(gòu)導(dǎo)出來(lái),供手動(dòng)建表: /* SQLyog v10.2 MySQL - 5.1.71 : Database - activemq ********************************************************************* */ /*!40101 SET NAMES utf8 */; /*!40101 SET SQL_MODE=''*/; /*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */; /*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */; /*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */; /*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; CREATE DATABASE /*!32312 IF NOT EXISTS*/`activemq` /*!40100 DEFAULT CHARACTER SET latin1 COLLATE latin1_bin */; USE `activemq`; /*Table structure for table `ACTIVEMQ_ACKS` */ DROP TABLE IF EXISTS `ACTIVEMQ_ACKS`; CREATE TABLE `ACTIVEMQ_ACKS` ( `CONTAINER` varchar(250) COLLATE latin1_bin NOT NULL, `SUB_DEST` varchar(250) COLLATE latin1_bin DEFAULT NULL, `CLIENT_ID` varchar(250) COLLATE latin1_bin NOT NULL, `SUB_NAME` varchar(250) COLLATE latin1_bin NOT NULL, `SELECTOR` varchar(250) COLLATE latin1_bin DEFAULT NULL, `LAST_ACKED_ID` bigint(20) DEFAULT NULL, `PRIORITY` bigint(20) NOT NULL DEFAULT '5', `XID` varchar(250) COLLATE latin1_bin DEFAULT NULL, PRIMARY KEY (`CONTAINER`,`CLIENT_ID`,`SUB_NAME`,`PRIORITY`), KEY `ACTIVEMQ_ACKS_XIDX` (`XID`) ) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_bin; /*Table structure for table `ACTIVEMQ_LOCK` */ DROP TABLE IF EXISTS `ACTIVEMQ_LOCK`; CREATE TABLE `ACTIVEMQ_LOCK` ( `ID` bigint(20) NOT NULL, `TIME` bigint(20) DEFAULT NULL, `BROKER_NAME` varchar(250) COLLATE latin1_bin DEFAULT NULL, PRIMARY KEY (`ID`) ) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_bin; /*Table structure for table `ACTIVEMQ_MSGS` */ DROP TABLE IF EXISTS `ACTIVEMQ_MSGS`; CREATE TABLE `ACTIVEMQ_MSGS` ( `ID` bigint(20) NOT NULL, `CONTAINER` varchar(250) COLLATE latin1_bin DEFAULT NULL, `MSGID_PROD` varchar(250) COLLATE latin1_bin DEFAULT NULL, `MSGID_SEQ` bigint(20) DEFAULT NULL, `EXPIRATION` bigint(20) DEFAULT NULL, `MSG` longblob, `PRIORITY` bigint(20) DEFAULT NULL, `XID` varchar(250) COLLATE latin1_bin DEFAULT NULL, PRIMARY KEY (`ID`), KEY `ACTIVEMQ_MSGS_MIDX` (`MSGID_PROD`,`MSGID_SEQ`), KEY `ACTIVEMQ_MSGS_CIDX` (`CONTAINER`), KEY `ACTIVEMQ_MSGS_EIDX` (`EXPIRATION`), KEY `ACTIVEMQ_MSGS_PIDX` (`PRIORITY`), KEY `ACTIVEMQ_MSGS_XIDX` (`XID`) ) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_bin; /*!40101 SET SQL_MODE=@OLD_SQL_MODE */; /*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */; /*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */; /*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
三、測(cè)試消息持久化 package com.ckl.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class HelloActiveMQ { public static void main(String[] args) throws Exception { HelloWorldProducer producer = new HelloWorldProducer(); HelloWorldConsumer consumer = new HelloWorldConsumer(); Thread threadProducer = new Thread(producer); threadProducer.start(); //注釋掉消費(fèi)者,不然的話,馬上消費(fèi)者馬上把消息消費(fèi)了,來(lái)不及持久化 // Thread threadConsumer = new Thread(consumer); // threadConsumer.start(); } public static class HelloWorldProducer implements Runnable { public void run() { try { // Create a ConnectionFactory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.2.140:61616"); // Create a Connection Connection connection = connectionFactory.createConnection(); connection.start(); // Create a Session Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); // Create the destination (Topic or Queue) Destination destination = session.createQueue("DemoQueue"); // Create a MessageProducer from the Session to the Topic or Queue MessageProducer producer = session.createProducer(destination); // producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); producer.setDeliveryMode(DeliveryMode.PERSISTENT);//消息需要持久化 // Create a messages String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); TextMessage message = session.createTextMessage(text); // Tell the producer to send the message System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName()); producer.send(message); // Clean up session.close(); connection.close(); } catch (Exception e) { System.out.println("Caught: " + e); e.printStackTrace(); } } } public static class HelloWorldConsumer implements Runnable, ExceptionListener { public void run() { try { // Create a ConnectionFactory ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "tcp://192.168.2.140:61616"); // Create a Connection Connection connection = connectionFactory.createConnection(); connection.start(); connection.setExceptionListener(this); // Create a Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Create the destination (Topic or Queue) Destination destination = session.createQueue("DemoQueue"); // Create a MessageConsumer from the Session to the Topic or // Queue MessageConsumer consumer = session.createConsumer(destination); // Wait for a message Message message = consumer.receive(1000); if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println("Received: " + text); } else { System.out.println("Received: " + message); } consumer.close(); session.close(); connection.close(); } catch (Exception e) { System.out.println("Caught: " + e); e.printStackTrace(); } } public synchronized void onException(JMSException ex) { System.out.println("JMS Exception occured. Shutting down client."); } } } 運(yùn)行后,效果如下:
四、單獨(dú)啟動(dòng)消費(fèi)者,查看數(shù)據(jù)庫(kù) 注釋掉上面的生產(chǎn)者,單獨(dú)啟動(dòng)消費(fèi)者后,發(fā)現(xiàn)數(shù)據(jù)庫(kù)中的消息已被消費(fèi)。
五、備注 Although the JDBC Store does not offer the best performance, it makes fairly simple to create a simple Master-Slave robust broker setup. 看到一段資料補(bǔ)充下,涉及到ActiveMq配置為主從模式時(shí),都會(huì)去嘗試連接該數(shù)據(jù)源,為了防止資源爭(zhēng)用出現(xiàn)的問(wèn)題,加了ACTIVEMQ_LOCK表。
|
|
|
來(lái)自: WindySky > 《ActiveMQ》