|
經(jīng)過上面兩篇博客已經(jīng)對ZMQ和如何編譯ZMQ有所了解了,在第一篇文章中介紹了ZMQ有三種模式,這篇博客就寫一個關(guān)于發(fā)布訂閱模式的Demo,希望對讀者能夠起到學習的作用,那要是在深入學習的話,可以參照zmq的官方和它自身帶的實例代碼學習。 在寫DEMO之前需要將上篇博客編譯之后的lib文件配置到環(huán)境變量中, C:\Users\CJQ\Desktop\zeromq-2.2.0\lib;C:\Users\CJQ\Desktop\zeromq-2.2.0\zeromq-jzmq\lib 也就是將zmq的lib文件夾和jzmq的文件夾放到環(huán)境變量中。 之后我們來寫發(fā)布端代碼, - package com.wh.mq.demo2;
-
- import org.zeromq.ZMQ;
- import org.zeromq.ZMQ.Context;
- import org.zeromq.ZMQ.Socket;
-
- public class Sync_PUB {
-
- public static void main(String[] args) throws InterruptedException {
- Context context = ZMQ.context(1);
- Socket publisher = context.socket(ZMQ.PUB);
- publisher.bind("tcp://*:5561");
- //zmq發(fā)送速度太快,在訂閱者尚未與發(fā)布者建立聯(lián)系時,已經(jīng)開始了數(shù)據(jù)發(fā)布
- Thread.sleep(1000);
-
- int update_nbr;
-
- for (update_nbr = 20; update_nbr < 40; update_nbr++) {
- Stringa="{\"magicNum\":\"CHINSOFT\",\"varName\":\"ZJ_YD_1\",\"varType\":\"5\",\"varValue\":"+update_nbr+",\"varQuality\":\"1111\",\"varTime\":"+System.currentTimeMillis()/1000+"}";
- publisher.send(a.getBytes(), ZMQ.NOBLOCK);
- System.out.println(update_nbr);
- Thread.sleep(1000);
- }
-
-
- publisher.close();
- context.term();
- }
- }
發(fā)布端需要通過context.socket(ZMQ.PUB)表示為發(fā)布端,通過bind方法來創(chuàng)建發(fā)布端連接,等待訂閱者連接。 之后通過send方法將數(shù)據(jù)發(fā)送到出去。 之后來寫訂閱端代碼 - public classSync_SUB1 {
-
- publicstaticvoidmain(String[] args) {
- Context context = ZMQ.context(1);
- Socket subscriber = context.socket(ZMQ.SUB);
- subscriber.connect("tcp://localhost:5561");
-
- //設置訂閱條件"setsockopt"
- subscriber.subscribe("".getBytes());
- int update_nbr = 0;
- while (true) {
- byte[] stringValue = subscriber.recv(0);
-
- String string = new String(stringValue);
-
- update_nbr++;
- System.out.println("Received " + update_nbr + " updates. :"+ string);
- }
- }
- }
客戶端通過connect進行連接,之后通過recv來進行數(shù)據(jù)接收。 到此為止發(fā)布訂閱Demo就寫完了,通過這三篇博客能夠?qū)MQ有了初步的認識和簡單實用,希望這三篇博客對學習zmq的讀者有所幫助。
|