小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

使用MQTT協(xié)議+Redis緩存實現(xiàn)APP登錄頂號功能 | jwcqc個人筆記 | IT癮

 WindySky 2020-01-15

大家在玩游戲或使用QQ等IM工具時,想必都見到過彈出被頂號或者是您的賬號于xx時間在另一設(shè)備登錄,您已被迫下線這樣的提示,然后不得不點退出按鈕退出整個應(yīng)用,或者點擊重新登錄把另一設(shè)備再頂下來。最近我參與的一個項目,正好就有這樣的需求,而且,由于我們項目中已經(jīng)使用到了MQTT協(xié)議進行消息推送,實現(xiàn)遠程控制,后臺用Java實現(xiàn),緩存使用了Redis,因此,正好可以利用現(xiàn)有的技術(shù)來實現(xiàn)這個功能。

實現(xiàn)的思路大概如下:首先,登錄時不僅需要賬號密碼,還可以將設(shè)備關(guān)鍵信息記錄下來,如設(shè)備型號(Android|iPhone)、登錄時間、登錄IP、設(shè)備唯一標識(UUID)等,這就需要前臺登錄功能與后臺接口一起配合實現(xiàn),并在后臺把userId已經(jīng)相關(guān)設(shè)備信息保存到Redis中,當在另外一臺新設(shè)備上登錄同一賬號時,將userId對應(yīng)的相關(guān)登錄設(shè)備信息直接進行覆蓋,此時如果舊設(shè)備進行重連時,因為該uuid已經(jīng)不是當前服務(wù)端的uuid了,所以直接返回下線通知,為了進行友好提示,也可以將新登錄設(shè)備的主要信息(設(shè)備型號、登錄時間)進行返回。

下面簡單介紹一下實現(xiàn)的方法。

軟件安裝

Linux下mqtt服務(wù)器Apollo的安裝

下載

選擇一個目錄用來下載保存
下載地址: http://activemq./apollo/download.html
官網(wǎng)教程: http://activemq./apollo/documentation/getting-started.html
目前版本是 apache-apollo-1.7.1-unix-distro.tar .gz

創(chuàng)建broker

一個broker實例是一個文件夾,其中包含所有的配置文件及運行時的數(shù)據(jù),不如日志和消息數(shù)
據(jù)。Apollo強烈建議不要把實例同安裝文件放在一起。在linux操作系統(tǒng)下面,建議將實例建在
/var/lib/目錄下面

首先解壓:tar -zxvf apache-apollo-1.7.1-unix-distro.tar.gz
選擇一個目錄存放解壓后的文件,我放在了/server/下,解壓后的文件夾為 apache-apollo-1.7.1

開始創(chuàng)建broker實例:

             
1
2
             
cd /var/lib
sudo /server/apache-apollo-1.7.1/bin/apollo create mybroker

下圖是Apache官方給的一些建議截圖:

啟動broker實例

啟動broker實例可以有兩種方法,如下圖中所示:

可以執(zhí)行

             
1
             
/var/lib/mybroker/bin/apollo-broker run

或者

             
1
2
             
sudo ln -s "/var/lib/mybroker/bin/apollo-broker-service" /etc/init.d/
/etc/init.d/apollo-broker-service start

使其作為一個service進行啟動,以后系統(tǒng)重啟后只需運行/etc/init.d/apollo-broker-service start

訪問Apollo的監(jiān)控頁面: http://localhost:61680/默認用戶名、密碼為為 admin/password

Linux下Redis的安裝與配置

Redis的安裝非常簡單,已經(jīng)有現(xiàn)成的Makefile文件,解壓后在src目錄下使用make命令完成編譯即可,redis-benchmark、redis-cli、redis-server、redis-stat 這四個文件,加上一個 redis.conf 就構(gòu)成了整個redis的最終可用包。它們的作用如下:

redis-server:Redis服務(wù)器的daemon啟動程序
redis-cli:Redis命令行操作工具。當然,你也可以用telnet根據(jù)其純文本協(xié)議來操作
redis-benchmark:Redis性能測試工具,測試Redis在你的系統(tǒng)及你的配置下的讀寫性能
redis-stat:Redis狀態(tài)檢測工具,可以檢測Redis當前狀態(tài)參數(shù)及延遲狀況

下載安裝:

            
1
2
3
4
5
            
wget http://download./redis-stable.tar.gz
tar xzf redis-stable.tar.gz
cd redis-stable
make
make install

啟動

編譯后生成的可執(zhí)行文件:
redis-server 是Redis的服務(wù)器,啟動Redis即運行redis-server
redis-cli 是Redis自帶的Redis命令行客戶端,學習Redis的重要工具

./redis-server & 不指定配置直接運行,這時采用默認配置,無密碼
./redis-server –port 6379 僅指定端口
./redis-server ../redis.conf 指定配置文件

最好還是使用最后一種方式進行啟動

如果只是在本機連接,那麼使用默認配置文件不會有什么問題,但是,如果是連接遠程服務(wù)器端的Redis,則需要對配置文件進行一些修改:

             
1
2
3
             
requirepass foobared
#bind 127.0.0.1 ##注釋掉
protected-mode no ##從yes改成no

至于如何將Redis設(shè)置后臺服務(wù),開機自啟等,這里就不介紹了,可以去搜索一下。

功能實現(xiàn)

后臺接口

Redis客戶端使用的是Jedis,如下代碼是一個對Jedis簡單的封裝

            
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
            
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisException;
import java.util.ResourceBundle;
/**
* Jedis Cache 工具類
*/
public class JedisUtils {
private static Logger logger = LoggerFactory.getLogger(JedisUtils.class);
private static JedisPool jedisPool;
/**
* 讀取相關(guān)的配置
*/
static {
ResourceBundle resourceBundle = ResourceBundle.getBundle("redis");
int maxActive = Integer.parseInt(resourceBundle.getString("redis.pool.maxActive"));
int maxIdle = Integer.parseInt(resourceBundle.getString("redis.pool.maxIdle"));
int maxWait = Integer.parseInt(resourceBundle.getString("redis.pool.maxWait"));
int port = Integer.parseInt(resourceBundle.getString("redis.port"));
int timeout = Integer.parseInt(resourceBundle.getString("redis.timeout"));
String ip = resourceBundle.getString("redis.ip");
String auth = resourceBundle.getString("redis.auth");
JedisPoolConfig config = new JedisPoolConfig();
//設(shè)置最大連接數(shù)
config.setMaxTotal(maxActive);
//設(shè)置最大空閑數(shù)
config.setMaxIdle(maxIdle);
//設(shè)置超時時間
config.setMaxWaitMillis(maxWait);
//初始化連接池
jedisPool = new JedisPool(config, ip, port, timeout, auth);
}
/**
* 獲取緩存
* @param key 鍵
* @return 值
*/
public static String get(String key) {
String value = null;
Jedis jedis = null;
try {
jedis = getResource();
if (jedis.exists(key)) {
value = jedis.get(key);
value = StringUtils.isNotBlank(value) && !"nil".equalsIgnoreCase(value) ? value : null;
logger.debug("get {} = {}", key, value);
}
} catch (Exception e) {
logger.warn("get {} = {}", key, value, e);
} finally {
returnResource(jedis);
}
return value;
}
/**
* 設(shè)置緩存
* @param key 鍵
* @param value 值
* @param cacheSeconds 超時時間,0為不超時
* @return
*/
public static String set(String key, String value, int cacheSeconds) {
String result = null;
Jedis jedis = null;
try {
jedis = getResource();
result = jedis.set(key, value);
if (cacheSeconds != 0) {
jedis.expire(key, cacheSeconds);
}
logger.debug("set {} = {}", key, value);
} catch (Exception e) {
logger.warn("set {} = {}", key, value, e);
} finally {
returnResource(jedis);
}
return result;
}
/**
* 刪除緩存
* @param key 鍵
* @return
*/
public static long del(String key) {
long result = 0;
Jedis jedis = null;
try {
jedis = getResource();
if (jedis.exists(key)){
result = jedis.del(key);
logger.debug("del {}", key);
}else{
logger.debug("del {} not exists", key);
}
} catch (Exception e) {
logger.warn("del {}", key, e);
} finally {
returnResource(jedis);
}
return result;
}
/**
* 緩存是否存在
* @param key 鍵
* @return
*/
public static boolean exists(String key) {
boolean result = false;
Jedis jedis = null;
try {
jedis = getResource();
result = jedis.exists(key);
logger.debug("exists {}", key);
} catch (Exception e) {
logger.warn("exists {}", key, e);
} finally {
returnResource(jedis);
}
return result;
}
/**
* 獲取資源
* @return
* @throws JedisException
*/
public static Jedis getResource() throws JedisException {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
} catch (JedisException e) {
logger.warn("getResource.", e);
returnBrokenResource(jedis);
throw e;
}
return jedis;
}
/**
* 歸還資源
* @param jedis
*/
public static void returnBrokenResource(Jedis jedis) {
if (jedis != null) {
jedisPool.returnBrokenResource(jedis);
}
}
/**
* 釋放資源
* @param jedis
*/
public static void returnResource(Jedis jedis) {
if (jedis != null) {
jedisPool.returnResource(jedis);
}
}
}

然后在登錄接口中,當判斷完登錄的用戶名密碼正確后,可以參考如下代碼的思路去實現(xiàn),首先判斷Redis中是否已保存有這個userId對用的值,有的話說明當前已經(jīng)有登錄,需要被替換到,同時使用MQTT發(fā)送消息給客戶端使其退出,Redis中不存在則只需保存userId和uuidStr即可

            
1
2
3
4
5
6
7
8
9
10
11
12
            
String uuidStr = ""; //這個值從APP端傳過來
// 先判斷Redis中是否已經(jīng)有,有的話需要替換掉
if(JedisUtils.get(userId) != null && !JedisUtils .get(userId).equals(uuidStr)) {
MqttClient client = MyMqttClient.getInstance();
String topic = "TOPIC/LOGIN_LOGOUT";
client.subscribe(topic, 1);
MyMqttClient.sendMessage("Log out", topic);
client.unsubscribe(topic);
}
JedisUtils.set(userId, uuidStr, 0);

至于MQTT協(xié)議的實現(xiàn),這里使用的是Paho,如果后臺項目是使用Maven構(gòu)建的話,在pom.xml中加入如下幾行即可:

            
1
2
3
4
5
            
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.1.0</version>
</dependency>

然后對其進行了一個簡單的封裝

            
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
            
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MyMqttClient {
private MyMqttClient() {}
private static MqttClient mqttClientInstance = null;
private static MqttConnectOptions options;
//靜態(tài)工廠方法
public static synchronized MqttClient getInstance() {
try {
if (mqttClientInstance == null) {
mqttClientInstance = new MqttClient("tcp://125.216.242.151:61613",
MqttClient.generateClientId(), new MemoryPersistence());
options = new MqttConnectOptions();
//設(shè)置是否清空session,這里如果設(shè)置為false表示服務(wù)器會保留客戶端的連接記錄,這里設(shè)置為true表示每次連接到服務(wù)器都以新的身份連接
options.setCleanSession(true);
//設(shè)置連接的用戶名
options.setUserName("admin");
//設(shè)置連接的密碼
options.setPassword("password".toCharArray());
// 設(shè)置超時時間 單位為秒
options.setConnectionTimeout(10);
// 設(shè)置會話心跳時間 單位為秒 服務(wù)器會每隔1.5*20秒的時間向客戶端發(fā)送個消息判斷客戶端是否在線,但這個方法并沒有重連的機制
options.setKeepAliveInterval(20);
mqttClientInstance.connect(options);
}
return mqttClientInstance;
}catch (Exception e){
e.printStackTrace();
return null;
}
}
public static void sendMessage(String content, String myTopic) {
MqttTopic topic = getInstance().getTopic(myTopic);
MqttMessage message = new MqttMessage();
message.setQos(1);
message.setRetained(false);
message.setPayload(content.getBytes());
try {
MqttDeliveryToken token = topic.publish(message);
} catch (MqttException e) {
e.printStackTrace();
}
}
public static MqttConnectOptions getOptions(){
return options;
}
}

app端

客戶端的做法思路也很簡單,由于使用了MQTT,因此客戶端和服務(wù)器端其實已經(jīng)保持了一個長連接,可以為客戶端寫一個MQTTService,隨時監(jiān)聽服務(wù)器推送過來的消息進行處理

            
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
            
//為MTQQ client設(shè)置回調(diào)
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
//連接丟失后,一般在這里面進行重連
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//publish后會執(zhí)行到這里
}
@Override
public void messageArrived(String topicName, MqttMessage message) throws Exception {
if(message.toString().equals("Log out")) {
handler.post(new Runnable() {
@Override
public void run() {
AlertDialog.Builder builder = new AlertDialog.Builder(getApplicationContext());
builder.setMessage("被頂號了");
builder.setNegativeButton("退出", new DialogInterface.OnClickListener() {
@Override
public void onClick(DialogInterface dialog, int which) {
// TODO 退出當前賬號,在這里簡單粗暴的結(jié)束了應(yīng)用
stopSelf();
android.os.Process.killProcess(android.os.Process.myPid());
}
});
Dialog dialog = builder.create();
dialog.setCanceledOnTouchOutside(false);
dialog.getWindow().setType(WindowManager.LayoutParams.TYPE_SYSTEM_ALERT);
dialog.show();
}
});
}
}
});

總結(jié)

上述代碼可能在嚴謹性和可靠性上還會存在一些問題,還需要經(jīng)過不斷的完善,但思路是很明確的。在這里尤其要安利一下MTQQ,現(xiàn)在越來越多的產(chǎn)品都是基于這個協(xié)議進行開發(fā),進行消息推送等。它開銷很小,支持各種流行編程語言,能夠適應(yīng)不穩(wěn)定的網(wǎng)絡(luò)傳輸需求,在未來幾年,相信MQTT的應(yīng)用會越來越廣。

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多