ZooKeeper Java サンプル
シンプルなウォッチクライアント
ZooKeeper Java APIを紹介するために、ここでは非常にシンプルなウォッチクライアントを開発します。このZooKeeperクライアントは、znodeの変更を監視し、プログラムの起動または停止によって応答します。
要件
クライアントには4つの要件があります。
- パラメータとして以下を受け取ります。
- ZooKeeperサービスのアドレス
- 監視対象となるznodeの名前
- 出力先ファイルの名前
- 引数付きの実行可能ファイル。
- znodeに関連付けられたフェッチデータを取得し、実行可能ファイルを開始します。
- znodeが変更された場合、クライアントは内容を再度フェッチし、実行可能ファイルを再起動します。
- znodeが消えた場合、クライアントは実行可能ファイルを強制終了します。
プログラム設計
慣例的に、ZooKeeperアプリケーションは2つのユニットに分割されます。1つは接続を維持し、もう1つはデータを監視します。このアプリケーションでは、Executorと呼ばれるクラスがZooKeeper接続を維持し、DataMonitorと呼ばれるクラスがZooKeeperツリー内のデータを監視します。また、Executorはメインスレッドを含み、実行ロジックを含みます。これは、ユーザーとのやり取りや、引数として渡した実行可能プログラムとのやり取りを担当します。このサンプル(要件に従って)は、znodeの状態に応じて、実行可能ファイルをシャットダウンおよび再起動します。
Executorクラス
Executorオブジェクトは、サンプルアプリケーションの主要なコンテナです。上記プログラム設計で説明したように、ZooKeeperオブジェクトとDataMonitorの両方を含みます。
// from the Executor class...
public static void main(String[] args) {
if (args.length < 4) {
System.err
.println("USAGE: Executor hostPort znode filename program [args ...]");
System.exit(2);
}
String hostPort = args[0];
String znode = args[1];
String filename = args[2];
String exec[] = new String[args.length - 3];
System.arraycopy(args, 3, exec, 0, exec.length);
try {
new Executor(hostPort, znode, filename, exec).run();
} catch (Exception e) {
e.printStackTrace();
}
}
public Executor(String hostPort, String znode, String filename,
String exec[]) throws KeeperException, IOException {
this.filename = filename;
this.exec = exec;
zk = new ZooKeeper(hostPort, 3000, this);
dm = new DataMonitor(zk, znode, null, this);
}
public void run() {
try {
synchronized (this) {
while (!dm.dead) {
wait();
}
}
} catch (InterruptedException e) {
}
}
Executorの役割は、コマンドラインで渡された実行可能ファイルを起動および停止することであるということを思い出してください。これは、ZooKeeperオブジェクトによって発生したイベントに応答して行います。上記のコードでわかるように、ExecutorはZooKeeperコンストラクタのWatcher引数として自分自身への参照を渡します。また、DataMonitorコンストラクタへのDataMonitorListener引数として自分自身への参照を渡します。Executorの定義に従って、これらの両方のインターフェースを実装します。
public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener {
...
Watcherインターフェースは、ZooKeeper Java APIによって定義されています。ZooKeeperは、コンテナに通知するために使用します。これは、process()
という1つのメソッドのみをサポートしており、ZooKeeperはこれを使用して、ZooKeeper接続やZooKeeperセッションの状態など、メインスレッドが関心を持つ可能性のある一般的なイベントを伝えます。この例では、ExecutorはこれらのイベントをDataMonitorに転送して、それらのイベントをどう処理するかを決定します。これは、慣例的に、Executorまたは何らかのExecutorのようなオブジェクトがZooKeeper接続を「所有」しているが、イベントを他のオブジェクトに委任してもよいという点を説明するためだけに行われます。また、これをウォッチイベントを発生させるデフォルトのチャネルとしても使用します。(これについては後で詳しく説明します。)
public void process(WatchedEvent event) {
dm.process(event);
}
一方、DataMonitorListenerインターフェースは、ZooKeeper APIの一部ではありません。これは、このサンプルアプリケーション用に設計された完全にカスタムなインターフェースです。DataMonitorオブジェクトは、コンテナ(Executorオブジェクトでもある)に通知するために使用します。DataMonitorListenerインターフェースは次のようになります。
public interface DataMonitorListener {
/**
* The existence status of the node has changed.
*/
void exists(byte data[]);
/**
* The ZooKeeper session is no longer valid.
*
* @param rc
* the ZooKeeper reason code
*/
void closing(int rc);
}
このインターフェースは、DataMonitorクラスで定義され、Executorクラスで実装されています。Executor.exists()
が呼び出されると、Executorは要件に従って起動するかシャットダウンするかを決定します。要件では、znodeが存在しなくなった場合に実行可能ファイルを強制終了するように指示していることを思い出してください。
Executor.closing()
が呼び出されると、ExecutorはZooKeeper接続が完全に消失したことへの応答として、自分自身をシャットダウンするかどうかを決定します。
ご想像のとおり、DataMonitorは、ZooKeeperの状態の変化に応じて、これらのメソッドを呼び出すオブジェクトです。
以下は、ExecutorのDataMonitorListener.exists()
およびDataMonitorListener.closing
の実装です。
public void exists( byte[] data ) {
if (data == null) {
if (child != null) {
System.out.println("Killing process");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
}
}
child = null;
} else {
if (child != null) {
System.out.println("Stopping child");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
FileOutputStream fos = new FileOutputStream(filename);
fos.write(data);
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
System.out.println("Starting child");
child = Runtime.getRuntime().exec(exec);
new StreamWriter(child.getInputStream(), System.out);
new StreamWriter(child.getErrorStream(), System.err);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void closing(int rc) {
synchronized (this) {
notifyAll();
}
}
DataMonitorクラス
DataMonitorクラスには、ZooKeeperロジックの主要部分があります。これは、ほとんどが非同期でイベントドリブンです。DataMonitorは、コンストラクタで次の処理を開始します。
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
DataMonitorListener listener) {
this.zk = zk;
this.znode = znode;
this.chainedWatcher = chainedWatcher;
this.listener = listener;
// Get things started by checking if the node exists. We are going
// to be completely event driven
ZooKeeper.exists()
の呼び出しは、znodeの存在を確認し、ウォッチを設定し、自分自身への参照(this
)を完了コールバックオブジェクトとして渡します。この意味で、実際の処理はウォッチがトリガーされたときに発生するため、処理を開始します。
注意
完了コールバックとウォッチコールバックを混同しないでください。DataMonitorオブジェクトに実装されているメソッド
StatCallback.processResult()
であるZooKeeper.exists()
の完了コールバックは、サーバー上での非同期のウォッチ設定操作(ZooKeeper.exists()
による)が完了したときに呼び出されます。一方、ウォッチのトリガーは、Executorオブジェクトにイベントを送信します。これは、ExecutorがZooKeeperオブジェクトのWatcherとして登録されているためです。
余談ですが、DataMonitorは、この特定のウォッチイベントのWatcherとしても自分自身を登録できることに注意してください。これは、ZooKeeper 3.0.0(複数のWatcherのサポート)の新機能です。ただし、この例では、DataMonitorはWatcherとして登録しません。
ZooKeeper.exists()
操作がサーバーで完了すると、ZooKeeper APIはこの完了コールバックをクライアントで呼び出します。
public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists;
switch (rc) {
case Code.Ok:
exists = true;
break;
case Code.NoNode:
exists = false;
break;
case Code.SessionExpired:
case Code.NoAuth:
dead = true;
listener.closing(rc);
return;
default:
// Retry errors
zk.exists(znode, true, this, null);
return;
}
byte b[] = null;
if (exists) {
try {
b = zk.getData(znode, false, null);
} catch (KeeperException e) {
// We don't need to worry about recovering now. The watch
// callbacks will kick off any exception handling
e.printStackTrace();
} catch (InterruptedException e) {
return;
}
}
if ((b == null && b != prevData)
|| (b != null && !Arrays.equals(prevData, b))) {
listener.exists(b);</emphasis>
prevData = b;
}
}
コードはまず、znodeの存在、致命的なエラー、および回復可能なエラーのエラーコードを確認します。ファイル(またはznode)が存在する場合、znodeからデータを取得し、状態が変化した場合はExecutorのexists()コールバックを呼び出します。getData呼び出しで例外処理を行う必要がないことに注意してください。エラーを引き起こす可能性のあるものについては、ウォッチが保留されているためです。ノードがZooKeeper.getData()
を呼び出す前に削除された場合、ZooKeeper.exists()
によって設定されたウォッチイベントがコールバックをトリガーします。通信エラーが発生した場合、接続が回復すると接続ウォッチイベントが発生します。
最後に、DataMonitorがウォッチイベントをどのように処理するかを見てください。
public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// We are are being told that the state of the
// connection has changed
switch (event.getState()) {
case SyncConnected:
// In this particular example we don't need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
break;
case Expired:
// It's all over
dead = true;
listener.closing(KeeperException.Code.SessionExpired);
break;
}
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let's find out
zk.exists(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}
クライアント側のZooKeeperライブラリが、セッションの有効期限(Expiredイベント)が切れる前にZooKeeperへの通信チャネル(SyncConnectedイベント)を再確立できる場合、セッションのすべてのウォッチがサーバーで自動的に再確立されます(ウォッチの自動リセットはZooKeeper 3.0.0の新機能です)。詳細については、プログラマーガイドのZooKeeperウォッチを参照してください。この関数の少し下で、DataMonitorがznodeのイベントを取得すると、ZooKeeper.exists()
を呼び出して何が変更されたかを調べます。
完全なソースリスト
Executor.java
/**
* A simple example program to use DataMonitor to start and
* stop executables based on a znode. The program watches the
* specified znode and saves the data that corresponds to the
* znode in the filesystem. It also starts the specified program
* with the specified arguments when the znode exists and kills
* the program if the znode goes away.
*/
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class Executor
implements Watcher, Runnable, DataMonitor.DataMonitorListener
{
String znode;
DataMonitor dm;
ZooKeeper zk;
String filename;
String exec[];
Process child;
public Executor(String hostPort, String znode, String filename,
String exec[]) throws KeeperException, IOException {
this.filename = filename;
this.exec = exec;
zk = new ZooKeeper(hostPort, 3000, this);
dm = new DataMonitor(zk, znode, null, this);
}
/**
* @param args
*/
public static void main(String[] args) {
if (args.length < 4) {
System.err
.println("USAGE: Executor hostPort znode filename program [args ...]");
System.exit(2);
}
String hostPort = args[0];
String znode = args[1];
String filename = args[2];
String exec[] = new String[args.length - 3];
System.arraycopy(args, 3, exec, 0, exec.length);
try {
new Executor(hostPort, znode, filename, exec).run();
} catch (Exception e) {
e.printStackTrace();
}
}
/***************************************************************************
* We do process any events ourselves, we just need to forward them on.
*
* @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent)
*/
public void process(WatchedEvent event) {
dm.process(event);
}
public void run() {
try {
synchronized (this) {
while (!dm.dead) {
wait();
}
}
} catch (InterruptedException e) {
}
}
public void closing(int rc) {
synchronized (this) {
notifyAll();
}
}
static class StreamWriter extends Thread {
OutputStream os;
InputStream is;
StreamWriter(InputStream is, OutputStream os) {
this.is = is;
this.os = os;
start();
}
public void run() {
byte b[] = new byte[80];
int rc;
try {
while ((rc = is.read(b)) > 0) {
os.write(b, 0, rc);
}
} catch (IOException e) {
}
}
}
public void exists(byte[] data) {
if (data == null) {
if (child != null) {
System.out.println("Killing process");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
}
}
child = null;
} else {
if (child != null) {
System.out.println("Stopping child");
child.destroy();
try {
child.waitFor();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
FileOutputStream fos = new FileOutputStream(filename);
fos.write(data);
fos.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
System.out.println("Starting child");
child = Runtime.getRuntime().exec(exec);
new StreamWriter(child.getInputStream(), System.out);
new StreamWriter(child.getErrorStream(), System.err);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
DataMonitor.java
/**
* A simple class that monitors the data and existence of a ZooKeeper
* node. It uses asynchronous ZooKeeper APIs.
*/
import java.util.Arrays;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
public class DataMonitor implements Watcher, StatCallback {
ZooKeeper zk;
String znode;
Watcher chainedWatcher;
boolean dead;
DataMonitorListener listener;
byte prevData[];
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
DataMonitorListener listener) {
this.zk = zk;
this.znode = znode;
this.chainedWatcher = chainedWatcher;
this.listener = listener;
// Get things started by checking if the node exists. We are going
// to be completely event driven
zk.exists(znode, true, this, null);
}
/**
* Other classes use the DataMonitor by implementing this method
*/
public interface DataMonitorListener {
/**
* The existence status of the node has changed.
*/
void exists(byte data[]);
/**
* The ZooKeeper session is no longer valid.
*
* @param rc
* the ZooKeeper reason code
*/
void closing(int rc);
}
public void process(WatchedEvent event) {
String path = event.getPath();
if (event.getType() == Event.EventType.None) {
// We are are being told that the state of the
// connection has changed
switch (event.getState()) {
case SyncConnected:
// In this particular example we don't need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
break;
case Expired:
// It's all over
dead = true;
listener.closing(KeeperException.Code.SessionExpired);
break;
}
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let's find out
zk.exists(znode, true, this, null);
}
}
if (chainedWatcher != null) {
chainedWatcher.process(event);
}
}
public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists;
switch (rc) {
case Code.Ok:
exists = true;
break;
case Code.NoNode:
exists = false;
break;
case Code.SessionExpired:
case Code.NoAuth:
dead = true;
listener.closing(rc);
return;
default:
// Retry errors
zk.exists(znode, true, this, null);
return;
}
byte b[] = null;
if (exists) {
try {
b = zk.getData(znode, false, null);
} catch (KeeperException e) {
// We don't need to worry about recovering now. The watch
// callbacks will kick off any exception handling
e.printStackTrace();
} catch (InterruptedException e) {
return;
}
}
if ((b == null && b != prevData)
|| (b != null && !Arrays.equals(prevData, b))) {
listener.exists(b);
prevData = b;
}
}
}