import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooKeeper;
/**
*
* @description Zookeeper Session演示類
* @author zhangchaoyang
* @date 2014-6-22
*/
public class SessionDemo {
/**
* zoo.cfg中的配置:
*
* <pre>
* tickTime=2000
* minSessionTimeout=4000(至少是tickTime的2倍)
* maxSessionTimeout=40000(最大是tickTime的20倍)
* </pre>
*
* 如果客戶端建立連接時(shí)指定的TIMEOUT不在[minSessionTimeout,maxSessionTimeout]區(qū)間內(nèi),
* 服務(wù)端會(huì)強(qiáng)制把它修改到該區(qū)間內(nèi)
*/
private static final int TIMEOUT = 40000; // Session
// Timeout設(shè)為40秒,因?yàn)樾奶芷跒?秒,所以如果server向client連續(xù)發(fā)送20個(gè)心跳都收不到回應(yīng),則Session過期失效
private static ZooKeeper zkp = null;
private static void connect() throws IOException {
zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, null);
}
private static void createNode() throws KeeperException,
InterruptedException {
if (zkp != null) {
zkp.create("/znodename", "znodedata".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
}
private static String getData() throws KeeperException,
InterruptedException {
if (zkp != null) {
Stat stat = zkp.exists("/znodename", false);
return new String(zkp.getData("/znodename", false, stat));
}
return null;
}
private static void disconnect() throws InterruptedException {
if (zkp != null) {
zkp.close();
}
}
/**
* 休息,在此期間我們有三種選擇:<br>
* <ol>
* <li>永久性斷開網(wǎng)絡(luò)連接
* <li>斷開網(wǎng)絡(luò)連接一段時(shí)間timespan后再連上,其中timespan<{@code TIMEOUT}
* <li>斷開網(wǎng)絡(luò)連接一段時(shí)間timespan后再連上,其中timespan>{@code TIMEOUT}
* </ol>
*/
private static void sleepForNetworkDisturbances() {
try {
Thread.sleep(2 * TIMEOUT);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
connect();
} catch (IOException e) {
System.err
.println("Can't create zookeeper client, please check the network.");
}
System.out.println("Session build.");
try {
createNode();
} catch (Exception e) {
System.err.println("Create znode failed.");
}
System.out.println("znode created.");
sleepForNetworkDisturbances();
try {
String data = getData();
if (data != null) {
// 在“休息”期間做了第2件事情,Sesion沒有過期,EPHEMERAL節(jié)點(diǎn)依然存在
System.out.println("data=" + data);
}
} catch (KeeperException e) {
e.printStackTrace();
// 在“休息”期間做了第1件事情
if (e instanceof ConnectionLossException) {
System.err
.println("Oops, network is disconnected. Retry getData().");
// 如果session沒有失效,而僅僅是網(wǎng)絡(luò)異常,則可以重新嘗試獲取數(shù)據(jù),可能在重試時(shí)網(wǎng)絡(luò)已經(jīng)正常了
try {
Thread.sleep(1000);
String data = getData();
if (data != null) {
System.out.println("data=" + data);
} else {
System.out.println("can't get data.");
}
} catch (Exception e1) {
e1.printStackTrace();
}
}
// 在“休息”期間做了第3件事情,則session會(huì)過期
else if (e instanceof SessionExpiredException) {
System.err
.println("Session Expired, client will reconnect and create znode again.");
// 當(dāng)發(fā)再Session Expired時(shí),必須重新建立連接,即new一個(gè)ZooKeeper
try {
connect();
createNode();
String data = getData();
if (data != null) {
System.out.println("data=" + data);
} else {
System.out.println("can't get data.");
}
} catch (Exception e1) {
e1.printStackTrace();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
disconnect();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Client disconnected.");
}
}
Watcher可以注冊(cè)watcher的方法:getData、exists、getChildren。
可以觸發(fā)watcher的方法:create、delete、setData。連接斷開的情況下觸發(fā)的watcher會(huì)丟失。
一個(gè)Watcher實(shí)例是一個(gè)回調(diào)函數(shù),被回調(diào)一次后就被移除了。如果還需要關(guān)注數(shù)據(jù)的變化,需要再次注冊(cè)watcher。
New ZooKeeper時(shí)注冊(cè)的watcher叫default watcher,它不是一次性的,只對(duì)client的連接狀態(tài)變化作出反應(yīng)。
什么樣的操作會(huì)產(chǎn)生什么類型的事件:
什么操作會(huì)觸發(fā)EventType.None? 事件類型與watcher的對(duì)應(yīng)關(guān)系:
操作與watcher的對(duì)應(yīng)關(guān)系:
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
/**
*
* @description Zookeeper Watcher演示類
* @author zhangchaoyang
* @date 2014-6-22
*/
public class WatcherDemo {
private static ZooKeeper zkp = null;
private static final int TIMEOUT = 6000;
private static Watcher getWatcher(final String msg) {
return new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(msg + "上的監(jiān)聽被觸發(fā)\t事件類型" + event.getType()
+ "\t發(fā)生變化的節(jié)點(diǎn)" + event.getPath());
}
};
}
public static void main(String[] args) throws IOException, KeeperException,
InterruptedException {
System.out.println("--------------1----------------");
zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT,
getWatcher("CONNECT"));
Thread.sleep(1000);
System.out.println("--------------2----------------");
zkp.create("/znodename", "znodedata".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
zkp.create("/znodename/childnode", new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
Stat stat = zkp.exists("/znodename", getWatcher("EXISTS"));
zkp.getChildren("/", getWatcher("GETCHILDREN"));
zkp.getData("/znodename", getWatcher("GETDATA"), stat);
stat = zkp.exists("/znodename/childnode", getWatcher("EXISTS"));
zkp.getChildren("/znodename", getWatcher("GETCHILDREN"));
zkp.getData("/znodename/childnode", getWatcher("GETDATA"), stat);
// zkp.close();
zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT,
getWatcher("CONNECT"));
Thread.sleep(1000);
System.out.println("--------------3----------------");
zkp.delete("/znodename/childnode", -1);
zkp.delete("/znodename", -1);
zkp.close();
}
}
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
*
* @description 自定義持久性的zookeeper watcher
* @author zhangchaoyang
* @date 2014-6-22
*/
public class PersistWatcher {
private static final int TIMEOUT = 6000;
private static final String znode = "/globalconfnode";
private static String globalConfData = "";
private static Watcher getConnectWatcher() {
return new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType().equals(EventType.None)) {
System.out.println("連接狀態(tài)發(fā)生變化。");
}
}
};
}
private static Watcher getExistsWatcher(final ZooKeeper zkp) {
return new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
if (event.getType().equals(EventType.NodeDataChanged)
|| event.getType().equals(EventType.NodeCreated)) {
// 節(jié)點(diǎn)被創(chuàng)建或修改時(shí)更新緩存中的值
Stat stat = zkp.exists(znode, this);// 再次注冊(cè)監(jiān)聽
String data = new String(
zkp.getData(znode, false, stat));
globalConfData = data;
} else if (event.getType().equals(EventType.NodeDeleted)) {
// 節(jié)點(diǎn)被刪除時(shí)報(bào)警
System.out
.println("global configuration node have been deleted!");
try {
// 再次注冊(cè)監(jiān)聽
zkp.exists(znode, this);
} catch (KeeperException e) {
if (e instanceof ConnectionLossException) {
System.out.println("連接已斷開。");
}
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}
public static void main(String[] args) {
try {
ZooKeeper zkp = new ZooKeeper("192.168.119.96:2181/app/learn",
TIMEOUT,
getConnectWatcher());
zkp.exists(znode, getExistsWatcher(zkp));
zkp.create(znode, "config_value".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
Thread.sleep(500);// 修改節(jié)點(diǎn)后必須sleep,等待watcher回調(diào)完成
System.out.println(globalConfData);
for (int i = 0; i < 4; i++) {
zkp.setData(znode, ("config_value" + i).getBytes(), -1);
Thread.sleep(500);// 修改節(jié)點(diǎn)后必須sleep,等待watcher回調(diào)完成
System.out.println(globalConfData);
}
zkp.close();// EPHEMERAL節(jié)點(diǎn)會(huì)被刪除,但Session并不會(huì)馬上失效(只不過ConnectionLoss了),所以還是會(huì)觸發(fā)watcher
try {
// 此時(shí)Session已失效
zkp.exists(znode, false);
} catch (KeeperException e) {
if (e instanceof SessionExpiredException)
System.out.println("Session已失效。");
}
} catch (IOException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ACL每個(gè)節(jié)點(diǎn)有單獨(dú)的ACL,子節(jié)點(diǎn)不能繼承父節(jié)點(diǎn)的ACL
ACL有三個(gè)維度:schema,id,permision
Schema有7種:
world: 它下面只有一個(gè)id, 叫anyone
auth: 它不需要id
digest: 它對(duì)應(yīng)的id為username:BASE64(SHA1(password))
ip: 它對(duì)應(yīng)的id為客戶機(jī)的IP地址,設(shè)置的時(shí)候可以設(shè)置一個(gè)ip段,比如ip:192.168.1.0/16, 表示匹配前16個(gè)bit的IP段
super: 在這種scheme情況下,對(duì)應(yīng)的id擁有超級(jí)權(quán)限
sasl: sasl的對(duì)應(yīng)的id,是一個(gè)通過了kerberos認(rèn)證的用戶id
Permission有5種:
CREATE(c),DELETE(d),READ(r),WRITE(w),ADMIN(a)
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoAuthException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
/**
*
* @description Zookeeper ACL演示類
* @author zhangchaoyang
* @date 2014-6-22
*/
public class AclDemo {
private static final int TIMEOUT = 6000;
public static void main(String[] args) throws IOException, KeeperException,
InterruptedException {
ZooKeeper zkp = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT,
null);
String schema = "digest";// schema類型有:world,auth,digest,ip,super
String auth = "username:password";
zkp.addAuthInfo(schema, auth.getBytes());
List<ACL> acls = new ArrayList<ACL>();
for (ACL id : Ids.CREATOR_ALL_ACL) {
acls.add(id);
}
zkp.create("/znodename", "znodedata".getBytes(), acls,
CreateMode.PERSISTENT);
ZooKeeper zoo = null;
try {
zoo = new ZooKeeper("192.168.119.96:2181/app/learn", TIMEOUT, null);
System.out.println("采用不合法的認(rèn)證方式:");
String badAuthentication = "username:wrongpass";
zoo.addAuthInfo(schema, badAuthentication.getBytes());
zoo.getData("/znodename", null, null);
} catch (KeeperException e) {
if (e instanceof NoAuthException) {
System.out.println("認(rèn)證失?。? + e.getMessage());
}
System.out.println("采用合法的認(rèn)證方式:");
zoo.addAuthInfo(schema, auth.getBytes());
String data = new String(zoo.getData("/znodename", null, null));
if (data != null) {
System.out.println("認(rèn)證成功:data=" + data);
}
} finally {
if (zoo != null && zoo.getState().isAlive()) {
zoo.close();
}
}
zkp.delete("/znodename", -1);
zkp.close();
}
}
開源工具menagerie基于ZooKeeper實(shí)現(xiàn)了分布式的:
ReentrantZkLock
ReentrantZkReadWriteLock
Semaphore
CyclicBarrier
CountDownLatch
BlockingQueue
HashMap
ListSet
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
/**
*
* @description 使用ZooKeeper實(shí)現(xiàn)分布式鎖
* @author zhangchaoyang
* @date 2014-6-22
*/
public class ZooKeeperLock {
private static Logger logger = Logger.getLogger(ZooKeeperLock.class);
private static ZooKeeper zk = null;
private static final int TIMEOUT = 1000 * 60;
private static String connStr = null;
public static void setServerPath(String path) {
connStr = path + "/app/bqas/lock";
logger.info("ZooKeeperLock zookeeper node:" + connStr);
}
public static boolean getLock(String lockname) throws KeeperException,
InterruptedException, IOException {
connect(connStr, TIMEOUT);
if (lockname.contains("-")) {
throw new RuntimeException("鎖名稱不能包含'-'");
}
boolean lock = false;
String path = zk.create("/" + lockname + "-", new byte[0],
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
int selfIndex = getIndex(path);
List<String> children = zk.getChildren("/", false);
int min = getMinIndex(children);
if (min == selfIndex) {
lock = true;
}
return lock;
}
public static void releaseLock(String lockname)
throws InterruptedException, KeeperException {
disconnect();
}
private static int getIndex(String str) {
int index = -1;
int pos = str.lastIndexOf("-");
if (pos >= 0) {
try {
index = Integer.parseInt(str.substring(pos + 1));
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
return index;
}
private static int getMinIndex(List<String> list) {
int min = Integer.MAX_VALUE;
for (String ele : list) {
int index = getIndex(ele);
if (index < 0) {
throw new RuntimeException("SEQUENTIAL節(jié)點(diǎn)名中不包含數(shù)字:" + ele);
}
if (index < min) {
min = index;
}
}
return min;
}
private static void waitUntilConnected(CountDownLatch connectedLatch) {
if (States.CONNECTING == zk.getState()) {
try {
connectedLatch.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
public static boolean connect(String hostPath, int sessionTimeout) {
if (zk == null || zk.getState() == States.CLOSED) {
try {
CountDownLatch connectedLatch = new CountDownLatch(1);
Watcher watcher = new ConnectedWatcher(connectedLatch);
zk = new ZooKeeper(hostPath, sessionTimeout, watcher);
waitUntilConnected(connectedLatch);
} catch (Exception e) {
logger.error("Connect to Zookeeper failed:", e);
return false;
}
}
return true;
}
public static boolean disconnect() {
if (zk != null) {
if (States.CLOSED != zk.getState()) {
try {
zk.close();
} catch (InterruptedException e) {
logger.error("Disconnect from Zookeeper failed:", e);
return false;
}
}
}
return true;
}
static class ConnectedWatcher implements Watcher {
private CountDownLatch connectedLatch;
ConnectedWatcher(CountDownLatch connectedLatch) {
this.connectedLatch = connectedLatch;
}
@Override
public void process(WatchedEvent event) {
// 事件狀態(tài)為SyncConnected時(shí),說明與服務(wù)端的連接已建立好
if (event.getState() == KeeperState.SyncConnected) {
connectedLatch.countDown();
}
}
}
public static void main(String[] args) {
String lockname = "writeHitCount2DBlock";
System.out.println("begin to run.");
ZooKeeperLock.setServerPath("192.168.119.96:2181");
try {
boolean havelock = ZooKeeperLock.getLock(lockname);
if (havelock) {
Date date = new Date();
System.out
.println("I got the lock,and I will write DB!" + date);
Thread.sleep(1000);// 休息一段時(shí)間之后再釋放鎖
}
System.out.println("Job done, I will release the lock.");
ZooKeeperLock.releaseLock(lockname);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|