gpt4 book ai didi

hadoop - 在 ZooKeeper 中使用 Zab 协议(protocol)进行广播

转载 作者:可可西里 更新时间:2023-11-01 15:38:19 27 4
gpt4 key购买 nike

早上好

我是 ZooKeeper 及其协议(protocol)的新手,我对它的广播协议(protocol) Zab 很感兴趣。

能否提供一个使用Zookeeper的Zab协议(protocol)的简单java代码?我一直在搜索,但没有成功找到显示如何使用 Zab 的代码。

事实上,我需要的很简单,我有一个 MapReduce 代码,我希望所有映射器在成功找到更好的 X 值(即更大的值)时更新一个变量(比如说 X)。在这种情况下,领导者必须比较旧值和新值,然后将实际最佳值广播给所有映射器。我怎样才能在 Java 中做这样的事情?

提前致谢,问候

最佳答案

您不需要使用 Zab 协议(protocol)。相反,您可以按照以下步骤操作:

你在 Zookeeper 上有一个 Znode say/bigvalue。所有映射器在启动时读取存储在其中的值。他们还在 Znode 上监视数据变化。每当映射器获得更好的值时,它就会用更好的值更新 Znode。所有映射器都将收到数据更改事件的通知,他们会读取新的最佳值,然后重新建立对数据更改的监视。这样它们就与最新的最佳值同步,并且只要有更好的值就可以更新最新的最佳值。

实际上 zkclient 是一个非常适合与 Zookeeper 一起使用的库,它隐藏了很多复杂性 (https://github.com/sgroschupf/zkclient)。下面是一个示例,演示了如何观察 Znode“/bigvalue”的任何数据更改。

package geet.org;

import java.io.UnsupportedEncodingException;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.data.Stat;

public class ZkExample implements IZkDataListener, ZkSerializer {
public static void main(String[] args) {
String znode = "/bigvalue";
ZkExample ins = new ZkExample();
ZkClient cl = new ZkClient("127.0.0.1", 30000, 30000,
ins);
try {
cl.createPersistent(znode);
} catch (ZkNodeExistsException e) {
System.out.println(e.getMessage());
}
// Change the data for fun
Stat stat = new Stat();
String data = cl.readData(znode, stat);
System.out.println("Current data " + data + "version = " + stat.getVersion());
cl.writeData(znode, "My new data ", stat.getVersion());

cl.subscribeDataChanges(znode, ins);
try {
Thread.sleep(36000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
System.out.println("Detected data change");
System.out.println("New data for " + dataPath + " " + (String)data);
}

@Override
public void handleDataDeleted(String dataPath) throws Exception {
System.out.println("Data deleted " + dataPath);
}

@Override
public byte[] serialize(Object data) throws ZkMarshallingError {
if (data instanceof String){
try {
return ((String) data).getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return null;
}

@Override
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
try {
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return null;
}
}

关于hadoop - 在 ZooKeeper 中使用 Zab 协议(protocol)进行广播,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21907366/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com