- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
本文整理了Java中com.pinterest.secor.common.ZookeeperConnector
类的一些代码示例,展示了ZookeeperConnector
类的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ZookeeperConnector
类的具体详情如下:
包路径:com.pinterest.secor.common.ZookeeperConnector
类名称:ZookeeperConnector
[英]ZookeeperConnector implements interactions with Zookeeper.
[中]ZookePerConnector实现与Zookeeper的交互。
代码示例来源:origin: pinterest/secor
public static void main(String[] args) {
try {
CommandLine commandLine = parseArgs(args);
String command = commandLine.getOptionValue("command");
if (!command.equals("delete_committed_offsets")) {
throw new IllegalArgumentException(
"command has to be one of \"delete_committed_offsets\"");
}
SecorConfig config = SecorConfig.load();
ZookeeperConnector zookeeperConnector = new ZookeeperConnector(config);
String topic = commandLine.getOptionValue("topic");
if (commandLine.hasOption("partition")) {
int partition =
((Number) commandLine.getParsedOptionValue("partition")).intValue();
TopicPartition topicPartition = new TopicPartition(topic, partition);
zookeeperConnector.deleteCommittedOffsetPartitionCount(topicPartition);
} else {
zookeeperConnector.deleteCommittedOffsetTopicCount(topic);
}
} catch (Throwable t) {
LOG.error("Zookeeper client failed", t);
System.exit(1);
}
}
}
代码示例来源:origin: pinterest/secor
private String getCommittedOffsetTopicPath(String topic) {
return getCommittedOffsetGroupPath() + "/" + topic;
}
代码示例来源:origin: pinterest/secor
public void setCommittedOffsetCount(TopicPartition topicPartition, long count)
throws Exception {
ZooKeeper zookeeper = mZookeeperClient.get();
String offsetPath = getCommittedOffsetPartitionPath(topicPartition);
LOG.info("creating missing parents for zookeeper path {}", offsetPath);
createMissingParents(offsetPath);
byte[] data = Long.toString(count).getBytes();
try {
LOG.info("setting zookeeper path {} value {}", offsetPath, count);
// -1 matches any version
zookeeper.setData(offsetPath, data, -1);
} catch (KeeperException.NoNodeException exception) {
zookeeper.create(offsetPath, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
代码示例来源:origin: pinterest/secor
public void deleteCommittedOffsetTopicCount(String topic) throws Exception {
ZooKeeper zookeeper = mZookeeperClient.get();
List<Integer> partitions = getCommittedOffsetPartitions(topic);
for (Integer partition : partitions) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
String offsetPath = getCommittedOffsetPartitionPath(topicPartition);
LOG.info("deleting path {}", offsetPath);
zookeeper.delete(offsetPath, -1);
}
}
代码示例来源:origin: pinterest/secor
topicPartition.getPartition());
mZookeeperConnector.lock(lockPath);
try {
long zookeeperCommittedOffsetCount = mZookeeperConnector.getCommittedOffsetCount(
topicPartition);
if (zookeeperCommittedOffsetCount == committedOffsetCount) {
mZookeeperConnector.setCommittedOffsetCount(topicPartition, lastSeenOffset + 1);
mOffsetTracker.setCommittedOffsetCount(topicPartition, lastSeenOffset + 1);
if (isOffsetsStorageKafka) {
mZookeeperConnector.unlock(lockPath);
代码示例来源:origin: pinterest/secor
Mockito.verify(mZookeeperConnector).lock(lockPath);
PowerMockito.verifyStatic();
FileUtil.moveToCloud(
+ "some_other_partition/10_0_00000000000000000010");
Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition);
Mockito.verify(mZookeeperConnector).setCommittedOffsetCount(
mTopicPartition, 1L);
Mockito.verify(mOffsetTracker).setCommittedOffsetCount(mTopicPartition,
1L);
Mockito.verify(mZookeeperConnector).unlock(lockPath);
代码示例来源:origin: pinterest/secor
protected void verify(String zookeeperPath, String expectedOffsetPath) {
ZookeeperConnector zookeeperConnector = new ZookeeperConnector();
PropertiesConfiguration properties = new PropertiesConfiguration();
properties.setProperty("kafka.zookeeper.path", zookeeperPath);
properties.setProperty("secor.kafka.group", "secor_cg");
SecorConfig secorConfig = new SecorConfig(properties);
zookeeperConnector.setConfig(secorConfig);
Assert.assertEquals(expectedOffsetPath, zookeeperConnector.getCommittedOffsetGroupPath());
}
}
代码示例来源:origin: pinterest/secor
/**
* Init the Uploader with its dependent objects.
*
* @param config Secor configuration
* @param offsetTracker Tracker of the current offset of topics partitions
* @param fileRegistry Registry of log files on a per-topic and per-partition basis
* @param uploadManager Manager of the physical upload of log files to the remote repository
* @param metricCollector component that ingest metrics into monitoring system
*/
public void init(SecorConfig config, OffsetTracker offsetTracker, FileRegistry fileRegistry,
UploadManager uploadManager, MessageReader messageReader, MetricCollector metricCollector) {
init(config, offsetTracker, fileRegistry, uploadManager, messageReader,
new ZookeeperConnector(config), metricCollector);
}
代码示例来源:origin: pinterest/secor
public void testDeleteTopicPartition() throws Exception {
Mockito.when(
mZookeeperConnector.getCommittedOffsetCount(mTopicPartition))
.thenReturn(31L);
Mockito.when(
mOffsetTracker.setCommittedOffsetCount(mTopicPartition, 30L))
.thenReturn(11L);
Mockito.when(mOffsetTracker.getLastSeenOffset(mTopicPartition))
.thenReturn(20L);
mUploader.applyPolicy(false);
Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition);
}
代码示例来源:origin: pinterest/secor
public long getCommittedOffsetCount(TopicPartition topicPartition) throws Exception {
ZooKeeper zookeeper = mZookeeperClient.get();
String offsetPath = getCommittedOffsetPartitionPath(topicPartition);
try {
byte[] data = zookeeper.getData(offsetPath, false, null);
return Long.parseLong(new String(data));
} catch (KeeperException.NoNodeException exception) {
LOG.warn("path {} does not exist in zookeeper", offsetPath);
return -1;
}
}
代码示例来源:origin: pinterest/secor
public void testUploadFiles() throws Exception {
Mockito.when(
mZookeeperConnector.getCommittedOffsetCount(mTopicPartition))
.thenReturn(11L);
Mockito.when(
Mockito.verify(mZookeeperConnector).lock(lockPath);
PowerMockito.verifyStatic();
FileUtil.moveToCloud(
+ "some_other_partition/10_0_00000000000000000010");
Mockito.verify(mFileRegistry).deleteTopicPartition(mTopicPartition);
Mockito.verify(mZookeeperConnector).setCommittedOffsetCount(
mTopicPartition, 21L);
Mockito.verify(mOffsetTracker).setCommittedOffsetCount(mTopicPartition,
21L);
Mockito.verify(mZookeeperConnector).unlock(lockPath);
代码示例来源:origin: pinterest/secor
@Override
public void init(SecorConfig config) {
mConfig = config;
mZookeeperConnector = new ZookeeperConnector(mConfig);
mKafkaMessageTimestampFactory = new KafkaMessageTimestampFactory(mConfig.getKafkaMessageTimestampClass());
}
}
代码示例来源:origin: pinterest/secor
@Override
public Message getCommittedMessage(TopicPartition topicPartition) throws Exception {
SimpleConsumer consumer = null;
try {
long committedOffset = mZookeeperConnector.getCommittedOffsetCount(topicPartition) - 1;
if (committedOffset < 0) {
return null;
}
consumer = createConsumer(topicPartition);
if (consumer == null) {
return null;
}
return getMessage(topicPartition, committedOffset, consumer);
} catch (MessageDoesNotExistException e) {
// If a RuntimeEMessageDoesNotExistException exception is raised,
// the message at the last comitted offset does not exist in Kafka.
// This is usually due to the message being compacted away by the
// Kafka log compaction process.
//
// That is no an exceptional situation - in fact it can be normal if
// the topic being consumed by Secor has a low volume. So in that
// case, simply return null
LOG.warn("no committed message for topic {} partition {}", topicPartition.getTopic(), topicPartition.getPartition());
return null;
} finally {
if (consumer != null) {
consumer.close();
}
}
}
代码示例来源:origin: pinterest/secor
public void deleteCommittedOffsetPartitionCount(TopicPartition topicPartition)
throws Exception {
String offsetPath = getCommittedOffsetPartitionPath(topicPartition);
ZooKeeper zookeeper = mZookeeperClient.get();
LOG.info("deleting path {}", offsetPath);
zookeeper.delete(offsetPath, -1);
}
代码示例来源:origin: pinterest/secor
public PartitionFinalizer(SecorConfig config) throws Exception {
mConfig = config;
Class kafkaClientClass = Class.forName(mConfig.getKafkaClientClass());
this.mKafkaClient = (KafkaClient) kafkaClientClass.newInstance();
this.mKafkaClient.init(config);
mZookeeperConnector = new ZookeeperConnector(mConfig);
mMessageParser = (TimestampedMessageParser) ReflectionUtil.createMessageParser(
mConfig.getMessageParserClass(), mConfig);
mQuboleClient = new QuboleClient(mConfig);
if (mConfig.getFileExtension() != null && !mConfig.getFileExtension().isEmpty()) {
mFileExtension = mConfig.getFileExtension();
} else if (mConfig.getCompressionCodec() != null && !mConfig.getCompressionCodec().isEmpty()) {
CompressionCodec codec = CompressionUtil.createCompressionCodec(mConfig.getCompressionCodec());
mFileExtension = codec.getDefaultExtension();
} else {
mFileExtension = "";
}
mLookbackPeriods = config.getFinalizerLookbackPeriods();
LOG.info("Lookback periods: " + mLookbackPeriods);
}
代码示例来源:origin: pinterest/secor
public void testTrimFiles() throws Exception {
Mockito.when(
mZookeeperConnector.getCommittedOffsetCount(mTopicPartition))
.thenReturn(21L);
代码示例来源:origin: pinterest/secor
public List<String> getCommittedOffsetTopics() throws Exception {
ZooKeeper zookeeper = mZookeeperClient.get();
String offsetPath = getCommittedOffsetGroupPath();
List<String> topics = zookeeper.getChildren(offsetPath, false);
LinkedList<String> result = new LinkedList<String>();
for (String topicPath : topics) {
String[] elements = topicPath.split("/");
String topic = elements[elements.length - 1];
result.add(topic);
}
return result;
}
代码示例来源:origin: pinterest/secor
public ProgressMonitor(SecorConfig config)
throws Exception
{
mConfig = config;
mZookeeperConnector = new ZookeeperConnector(mConfig);
try {
Class timestampClass = Class.forName(mConfig.getKafkaClientClass());
this.mKafkaClient = (KafkaClient) timestampClass.newInstance();
this.mKafkaClient.init(config);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
mMessageParser = (MessageParser) ReflectionUtil.createMessageParser(
mConfig.getMessageParserClass(), mConfig);
mPrefix = mConfig.getMonitoringPrefix();
if (Strings.isNullOrEmpty(mPrefix)) {
mPrefix = "secor";
}
if (mConfig.getStatsDHostPort() != null && !mConfig.getStatsDHostPort().isEmpty()) {
HostAndPort hostPort = HostAndPort.fromString(mConfig.getStatsDHostPort());
mStatsDClient = new NonBlockingStatsDClient(null, hostPort.getHostText(), hostPort.getPort(),
mConfig.getStatsDDogstatsdConstantTags());
}
}
代码示例来源:origin: pinterest/secor
modificationAgeSec >= mConfig.getMaxFileAgeSeconds() ||
isRequiredToUploadAtTime(topicPartition)) {
long newOffsetCount = mZookeeperConnector.getCommittedOffsetCount(topicPartition);
long oldOffsetCount = mOffsetTracker.setCommittedOffsetCount(topicPartition,
newOffsetCount);
就像添加一类“data-pin-pin”而不是“data-pin-nopin”? slider 中有图像,但它们都是背景图像,因此当前 pin 按钮不起作用。 谢谢! 最佳答案 Pinterest 无
通常,要获取 Pinterest 版 block 的 RSS 提要,只需将“.rss”添加到 URL 末尾即可。 例如,对于 http://www.pinterest.com/philchairez/
我正在搜索 Pinterest 的提要移动应用程序 API。 我可以知道如何从 Pinterest 获取搜索提要吗? API。 最佳答案 这里有文档 https://developers.pinter
Pinterest Widget Builder允许灵活地创建小部件以放置在您的网站上。我在 this page 上加了一个,但您可以为小部件设置的宽度似乎有限制。例如,我将宽度设置为 1170,但它
自今年年初(整整 7 个月!)以来,关于 Pinterest API 中的搜索功能出现了几个问题。 示例:我在这里得到与我们的 friend leonardo 相同的回复:Pinterest API
我们希望将网站设置为 oEmbed 提供商。 官方文档 ( http://oembed.com/ ) 说: Configuration for oEmbed is very simple. Provi
有没有办法在 React Native 中使用 flexbox 来实现 Masonry/Pinterest 风格的列? 最佳答案 在 React Native 中,远程图像不会在加载时调整大小(参见“
我想知道是否有相应的方法可以打开 pinterest 链接,如果安装了 pinterest 应用程序,或者如果它不在 Facebook 等设备中,则使用浏览器。对于 Facebook,我使用以下代码
对于 iOS 8,Pinterest 有一个共享扩展。如何设置图片来源网址和描述属性?我正在使用 UIActivityViewController。我是否使用 UIImage、NSURL 用于 sou
目前,我正在尝试重新创建类似 Instagram/Pinterest iOS 个人资料页面的内容,您可以在具有不同数据和布局的部分之间滑动,同时标题不会水平滚动,而是与每个部分垂直滚动。与当前 Ins
在页面 https://developers.pinterest.com/tools/access_token/您可以生成访问 token 。有谁知道那个 token 的生命周期是多少?它会在固定时间
我们的 prod pinterest 应用程序有问题。 获得 token 后: "{"access_token": "AabcYgRUKiaBI45HYM72teXO6fZaFQoEhVxkxaREo
我正在寻找一种从给定用户名获取板名称列表的方法。我知道 pinterest 已经为给定用户的所有 pin 和给定 pinboard 的所有 pin 提供了 rss。 来自给定用户的所有 Pin 图:p
这个文档很清楚:http://developers.pinterest.com/api_docs/oauth_code_exchange/ 我需要 code用访问 token 交换它。但是访问此代码的
任何人都可以对(未受过教育的)猜测如何使用未发布的Pinterest API进行分页吗? 例如,此链接:https://api.pinterest.com/v3/pidgets/boards/grai
我正在通过使用此url访问Pinterest API以获取用户信息,但是我找不到如何为Pinterest生成访问 token 的方法。 根据这个blog post,它说 Pinterest uses
关闭。这个问题是not reproducible or was caused by typos .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 4年前关闭。
尝试实现“固定它”按钮,但它会为每张图片返回以下问题。我们在整个网站上使用 SSL,我想知道这是否相关。 有什么想法吗? {"route_pattern": "^/resource/:name/:me
我集成了“Pin It”按钮以在 pinterest.com 上分享我的产品图片。但它不会在我网站上的 pin it 按钮前显示 pin 计数。我写了下面的代码。 这是我网站上的固定按钮 &这个
我有一个 AngularJS 应用程序,它是一个单页应用程序。在每个单独的页面上都会加载 Pinterest 板,这是通过 Pinterest 的小部件构建器完成的: This can be foun
我是一名优秀的程序员,十分优秀!