- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”给Curator予高度评价。
引子和趣闻:
Zookeeper名字的由来是比较有趣的,下面的片段摘抄自《从PAXOS到ZOOKEEPER分布式一致性原理与实践》一书:
Zookeeper最早起源于雅虎的研究院的一个研究小组。在当时,研究人员发现,在雅虎内部很多大型的系统需要依赖一个类似的系统进行分布式协调,但是这些系统往往存在分布式单点问题。所以雅虎的开发人员就试图开发一个通用的无单点问题的分布式协调框架。在立项初期,考虑到很多项目都是用动物的名字来命名的(例如著名的Pig项目),雅虎的工程师希望给这个项目也取一个动物的名字。时任研究院的首席科学家Raghu Ramakrishnan开玩笑说:再这样下去,我们这儿就变成动物园了。此话一出,大家纷纷表示就叫动物园管理员吧——因为各个以动物命名的分布式组件放在一起,雅虎的整个分布式系统看上去就像一个大型的动物园了,而Zookeeper正好用来进行分布式环境的协调——于是,Zookeeper的名字由此诞生了。
Curator无疑是Zookeeper客户端中的瑞士军刀,它译作"馆长"或者’‘管理者’',不知道是不是开发小组有意而为之,笔者猜测有可能这样命名的原因是说明Curator就是Zookeeper的馆长(脑洞有点大:Curator就是动物园的园长)。
Curator包含了几个包:
curator-framework:对zookeeper的底层api的一些封装
curator-client:提供一些客户端的操作,例如重试策略等
curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
Maven依赖(使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x,如果跨版本会有兼容性问题,很有可能导致节点操作失败):
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
一个例子如下:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
connectionInfo,
5000,
3000,
retryPolicy);
newClient静态工厂方法包含四个主要参数:
参数名 | 说明 |
---|---|
connectionString | 服务器列表,格式host1:port1,host2:port2,… |
retryPolicy | 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口 |
sessionTimeoutMs | 会话超时时间,单位毫秒,默认60000ms |
connectionTimeoutMs | 连接创建超时时间,单位毫秒,默认60000ms |
核心参数变为流式设置,一个列子如下:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
为了实现不同的Zookeeper业务之间的隔离,需要为每个业务分配一个独立的命名空间(NameSpace),即指定一个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(下面的例子)当客户端指定了独立命名空间为“/base”,那么该客户端对Zookeeper上的数据节点的操作都是基于该目录进行的。通过设置Chroot可以将客户端应用与Zookeeper服务端的一课子树相对应,在多个应用共用一个Zookeeper集群的场景下,这对于实现不同应用之间的相互隔离十分有意义。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
当创建会话成功,得到client的实例然后可以直接调用其start( )方法:
client.start();
Zookeeper的节点创建模式:
**创建一个节点,初始内容为空 **
client.create().forPath("path");
注意:如果没有设置节点属性,节点创建模式默认为持久化节点,内容默认为空
创建一个节点,附带初始化内容
client.create().forPath("path","init".getBytes());
创建一个节点,指定创建模式(临时节点),内容为空
client.create().withMode(CreateMode.EPHEMERAL).forPath("path");
创建一个节点,指定创建模式(临时节点),附带初始化内容
client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());
创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("path","init".getBytes());
这个creatingParentContainersIfNeeded()接口非常有用,因为一般情况开发人员在创建一个子节点必须判断它的父节点是否存在,如果不存在直接创建会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点。
删除一个节点
client.delete().forPath("path");
注意,此方法只能删除叶子节点,否则会抛出异常。
删除一个节点,并且递归删除其所有的子节点
client.delete().deletingChildrenIfNeeded().forPath("path");
删除一个节点,强制指定版本进行删除
client.delete().withVersion(10086).forPath("path");
删除一个节点,强制保证删除
client.delete().guaranteed().forPath("path");
guaranteed()接口是一个保障措施,只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功。
**注意:**上面的多个流式接口是可以自由组合的,例如:
client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");
读取一个节点的数据内容
client.getData().forPath("path");
注意,此方法返的返回值是byte[ ];
读取一个节点的数据内容,同时获取到该节点的stat
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");
更新一个节点的数据内容
client.setData().forPath("path","data".getBytes());
注意:该接口会返回一个Stat实例
更新一个节点的数据内容,强制指定版本进行更新
client.setData().withVersion(10086).forPath("path","data".getBytes());
client.checkExists().forPath("path");
注意:该方法返回一个Stat实例,用于检查ZNode是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用forPath( )指定要操作的ZNode
client.getChildren().forPath("path");
注意:该方法的返回值为List,获得ZNode的子节点Path列表。 可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的父ZNode
CuratorFramework的实例包含inTransaction( )接口方法,调用此方法开启一个ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交。一个例子如下:
client.inTransaction().check().forPath("path")
.and()
.create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
.and()
.setData().withVersion(10086).forPath("path","data2".getBytes())
.and()
.commit();
上面提到的创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。
CuratorEventType
事件类型 | 对应CuratorFramework实例的方法 |
---|---|
CREATE | #create() |
DELETE | #delete() |
EXISTS | #checkExists() |
GET_DATA | #getData() |
SET_DATA | #setData() |
CHILDREN | #getChildren() |
SYNC | #sync(String,Object) |
GET_ACL | #getACL() |
SET_ACL | #setACL() |
WATCHED | #Watcher(Watcher) |
CLOSING | #close() |
响应码(#getResultCode())
响应码 | 意义 |
---|---|
0 | OK,即调用成功 |
-4 | ConnectionLoss,即客户端与服务端断开连接 |
-110 | NodeExists,即节点已经存在 |
-112 | SessionExpired,即会话过期 |
一个异步创建节点的例子如下:
Executor executor = Executors.newFixedThreadPool(2);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((curatorFramework, curatorEvent) -> { System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
},executor)
.forPath("path");
注意:如果#inBackground()方法不指定executor,那么会默认使用Curator的EventThread去进行异步处理。
提醒:首先你必须添加curator-recipes依赖,下文仅仅对recipes一些特性的使用进行解释和举例,不打算进行源码级别的探讨
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
重要提醒:强烈推荐使用ConnectionStateListener监控连接的状态,当连接状态为LOST,curator-recipes下的所有Api将会失效或者过期,尽管后面所有的例子都没有使用到ConnectionStateListener。
Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。
Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。
实际使用时会涉及到四个类:
通过下面的构造函数创建Path Cache:
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
想使用cache,必须调用它的start
方法,使用完后调用close
方法。 可以设置StartMode来实现启动的模式
有没有一种方法可以让我使用 Curator 来观察领导层的变化?当领导者改变时,我需要所有的追随者都知道这已经发生了,这样他们才能发现新的领导者是谁。追随者想知道领导者是谁,这似乎是一种标准,但我无法
给CuratorFrameworkFactory#newClient的连接字符串如何看起来像?到目前为止,我还没有在网上找到任何信息,JavaDoc 也没有告诉我正确的格式。 最佳答案 根据 this
本文整理了Java中org.apache.flink.shaded.curator.org.apache.curator.utils.ZKPaths类的一些代码示例,展示了ZKPaths类的具体用法。
本文整理了Java中org.apache.flink.shaded.curator.org.apache.curator.utils.ZKPaths.getNodeFromPath()方法的一些代码示
本文整理了Java中org.apache.flink.shaded.curator.org.apache.curator.utils.ZKPaths.deleteChildren()方法的一些代码示例
我正在尝试来自策展人的基本代码 http://curator.readthedocs.io/en/4.0/examples.html 我尝试 import elasticsearch import c
只是想知道是否有任何 API 可以使用 Curator Framework 中的 CuratorFramework 类来知道特定节点是领导者。我正在使用 LeaderLatch,但即使 Node 是领
Apache Curator library for ZooKeeper 使用了一个很好的“流畅”语法。例如,要修改事务中的多个节点,代码如下所示: client.inTransaction().
我正在尝试使用CLI探索Elasticsearch策展人 https://www.elastic.co/guide/en/elasticsearch/client/curator/current/in
使用Elasticsearch策展人,如何删除与模式匹配的所有索引,但最新的除外? 我尝试使用filtertype: age,但似乎无法满足我的需要。 最佳答案 您需要两个过滤器:pattern(以匹
我想删除超过 7 天的 Elasticsearch 索引。所以我安装了curator 4.2,因为我的elasticsearch版本是5.0.0(4.x之前的curator版本与elasticsear
场景: 使用Curator实现分布式互斥体。如果已经获取互斥量的客户端由于网络故障而与 zk 服务器断开连接,则 Curator 将尝试重新连接。假设最终重连成功,但是最后一个 session 已经过
我创建了一个 Curator 客户端,如下所示: RetryPolicy retryPolicy = new RetryNTimes(3, 1000); CuratorFramewor
创建路径时,我在以下代码中收到 NodeExists 错误。 CuratorTransaction transaction = curatorFramework.inTransaction(); tr
我已将 Curator 设置为通过此过滤器删除旧的 Elasticsearch 索引: (...) filters: - filtertype: pattern kind: regex val
我正在尝试测试 Apache Curator 中的可撤销锁定。我有两个尝试获取锁的线程。如果第一个测试获取了锁,第二个线程可以要求第一个线程释放锁,以便第二个线程可以获取它 Retry
我目前正在使用 Apache Curator 来外部化共享资源(数据库中的一行)的锁定。总结一下这个问题,我正在运行一个服务的 2 个实例(使用 Spring Boot),我们将其称为服务 A,并调用
我知道 Apache Curator 可以实现基于 zookeeper 的分布式锁功能。根据 Apache Curator 官方网站上发布的文档,它看起来非常容易使用。例如: RetryPolicy
本文整理了Java中org.apache.curator.utils.ZKPaths类的一些代码示例,展示了ZKPaths类的具体用法。这些代码示例主要来源于Github/Stackoverflow/
本文整理了Java中com.bazaarvoice.curator.dropwizard.ZooKeeperConfiguration类的一些代码示例,展示了ZooKeeperConfiguratio
我是一名优秀的程序员,十分优秀!