- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我正在从事一个监控基于微服务的系统的项目。我创建的模拟微服务产生数据并将其上传到亚马逊Kinesis,现在我使用亚马逊的这段代码来生产和使用 Kinesis。但是我不明白如何添加更多处理器( worker )将处理相同的记录列表(可能同时),这意味着我正在尝试弄清楚在哪里以及如何将我的代码插入到我在下面添加的亚马逊添加代码中。
我的程序中将有两个处理器:
我非常感谢您提供一些指导,因为这是我的第一个行业项目,而且我对 AWS 还是个新手(尽管我已经阅读了很多相关内容)。谢谢!
这是从这个链接中获取的来自亚马逊的代码: https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/src/com/amazonaws/services/kinesis/producer/sample/SampleConsumer.java
/*
* Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.producer.sample;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
/**
* If you haven't looked at {@link SampleProducer}, do so first.
*
* <p>
* As mentioned in SampleProducer, we will check that all records are received
* correctly by the KCL by verifying that there are no gaps in the sequence
* numbers.
*
* <p>
* As the consumer runs, it will periodically log a message indicating the
* number of gaps it found in the sequence numbers. A gap is when the difference
* between two consecutive elements in the sorted list of seen sequence numbers
* is greater than 1.
*
* <p>
* Over time the number of gaps should converge to 0. You should also observe
* that the range of sequence numbers seen is equal to the number of records put
* by the SampleProducer.
*
* <p>
* If the stream contains data from multiple runs of SampleProducer, you should
* observe the SampleConsumer detecting this and resetting state to only count
* the latest run.
*
* <p>
* Note if you kill the SampleConsumer halfway and run it again, the number of
* gaps may never converge to 0. This is because checkpoints may have been made
* such that some records from the producer's latest run are not processed
* again. If you observe this, simply run the producer to completion again
* without terminating the consumer.
*
* <p>
* The consumer continues running until manually terminated, even if there are
* no more records to consume.
*
* @see SampleProducer
* @author chaodeng
*
*/
public class SampleConsumer implements IRecordProcessorFactory {
private static final Logger log = LoggerFactory.getLogger(SampleConsumer.class);
// All records from a run of the producer have the same timestamp in their
// partition keys. Since this value increases for each run, we can use it
// determine which run is the latest and disregard data from earlier runs.
private final AtomicLong largestTimestamp = new AtomicLong(0);
// List of record sequence numbers we have seen so far.
private final List<Long> sequenceNumbers = new ArrayList<>();
// A mutex for largestTimestamp and sequenceNumbers. largestTimestamp is
// nevertheless an AtomicLong because we cannot capture non-final variables
// in the child class.
private final Object lock = new Object();
/**
* One instance of RecordProcessor is created for every shard in the stream.
* All instances of RecordProcessor share state by capturing variables from
* the enclosing SampleConsumer instance. This is a simple way to combine
* the data from multiple shards.
*/
private class RecordProcessor implements IRecordProcessor {
@Override
public void initialize(String shardId) {}
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
long timestamp = 0;
List<Long> seqNos = new ArrayList<>();
for (Record r : records) {
// Get the timestamp of this run from the partition key.
timestamp = Math.max(timestamp, Long.parseLong(r.getPartitionKey()));
// Extract the sequence number. It's encoded as a decimal
// string and placed at the beginning of the record data,
// followed by a space. The rest of the record data is padding
// that we will simply discard.
try {
byte[] b = new byte[r.getData().remaining()];
r.getData().get(b);
seqNos.add(Long.parseLong(new String(b, "UTF-8").split(" ")[0]));
} catch (Exception e) {
log.error("Error parsing record", e);
System.exit(1);
}
}
synchronized (lock) {
if (largestTimestamp.get() < timestamp) {
log.info(String.format(
"Found new larger timestamp: %d (was %d), clearing state",
timestamp, largestTimestamp.get()));
largestTimestamp.set(timestamp);
sequenceNumbers.clear();
}
// Only add to the shared list if our data is from the latest run.
if (largestTimestamp.get() == timestamp) {
sequenceNumbers.addAll(seqNos);
Collections.sort(sequenceNumbers);
}
}
try {
checkpointer.checkpoint();
} catch (Exception e) {
log.error("Error while trying to checkpoint during ProcessRecords", e);
}
}
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
log.info("Shutting down, reason: " + reason);
try {
checkpointer.checkpoint();
} catch (Exception e) {
log.error("Error while trying to checkpoint during Shutdown", e);
}
}
}
/**
* Log a message indicating the current state.
*/
public void logResults() {
synchronized (lock) {
if (largestTimestamp.get() == 0) {
return;
}
if (sequenceNumbers.size() == 0) {
log.info("No sequence numbers found for current run.");
return;
}
// The producer assigns sequence numbers starting from 1, so we
// start counting from one before that, i.e. 0.
long last = 0;
long gaps = 0;
for (long sn : sequenceNumbers) {
if (sn - last > 1) {
gaps++;
}
last = sn;
}
log.info(String.format(
"Found %d gaps in the sequence numbers. Lowest seen so far is %d, highest is %d",
gaps, sequenceNumbers.get(0), sequenceNumbers.get(sequenceNumbers.size() - 1)));
}
}
@Override
public IRecordProcessor createProcessor() {
return this.new RecordProcessor();
}
public static void main(String[] args) {
KinesisClientLibConfiguration config =
new KinesisClientLibConfiguration(
"KinesisProducerLibSampleConsumer",
SampleProducer.STREAM_NAME,
new DefaultAWSCredentialsProviderChain(),
"KinesisProducerLibSampleConsumer")
.withRegionName(SampleProducer.REGION)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
final SampleConsumer consumer = new SampleConsumer();
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
consumer.logResults();
}
}, 10, 1, TimeUnit.SECONDS);
new Worker.Builder()
.recordProcessorFactory(consumer)
.config(config)
.build()
.run();
}
}
最佳答案
您的问题非常广泛,但这里有一些关于 Kinesis 消费者的建议,希望与您的用例相关。
每个 Kinesis 流都被分成一个或多个分片。每个分片都有限制,例如每秒不能将超过 MiB 的数据写入分片,并且每秒不能向单个分片发起超过 5 个 GetRecords(消费者的 processRecords 在后台调用)请求碎片。 (请参阅约束的完整列表 here。)如果您处理的数据量接近或超过这些约束,您会希望增加流中的分片数量。
当您只有一个消费者应用程序和一个工作人员时,它负责处理相应流的所有分片。如果有多个工作人员,他们每个人都负责分片的某个子集,因此每个分片都分配给一个且只有一个工作人员(如果您查看消费者日志,您会发现这被称为分片上的“租约”)。
如果您希望拥有多个独立摄取 Kinesis 流量和处理记录的处理器,您需要注册两个单独的消费者应用程序。在您上面引用的代码中,应用程序名称是 KinesisClientLibConfiguration 构造函数的第一个参数。请注意,即使它们是单独的消费者应用程序,每秒 5 个 GetRecords 的总数限制仍然适用。
换句话说,您需要有两个独立的进程,一个将实例化与 DB 对话的消费者,另一个将实例化更新 GUI 的消费者:
KinesisClientLibConfiguration databaseSaverKclConfig =
new KinesisClientLibConfiguration(
"DatabaseSaverKclApp",
"your-stream",
new DefaultAWSCredentialsProviderChain(),
// I believe worker ids don't need to be unique, but it's a good practice to make them unique so you can easily identify the workers
"unique-worker-id")
.withRegionName(SampleProducer.REGION)
// this only matters the very first time your consumer is launched, subsequent launches will read the checkpoint from the previous runs
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
final IRecordProcessorFactory databaseSaverConsumer = new DatabaseSaverConsumer();
KinesisClientLibConfiguration guiUpdaterKclConfig =
new KinesisClientLibConfiguration(
"GuiUpdaterKclApp",
"your-stream",
new DefaultAWSCredentialsProviderChain(),
"unique-worker-id")
.withRegionName(SampleProducer.REGION)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);
final IRecordProcessorFactory guiUpdaterConsumer = new GuiUpdaterConsumer();
DatabaseSaverConsumer和GuiUpdaterConsumer的实现呢?他们每个人都需要在 processRecords 方法中实现自定义逻辑。您需要确保它们中的每一个都在此方法中完成了正确的工作量,并且检查点逻辑是合理的。让我们来解读一下:
说到 GUI,您用什么来显示数据,为什么 Kinesis 消费者需要更新它,而不是 GUI 本身查询底层数据存储?
无论如何,我希望这对您有所帮助。如果您有更具体的问题,请告诉我。
关于java - 如何处理运动流记录? (多处理器),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55943491/
我为 4 套接字服务器的大对象( double 矩阵)编写 NUMA-aaware 缓存。我观察到套接字间通信是我的应用程序的瓶颈。因此,我希望不同套接字上的线程具有单独的矩阵缓存。我已将线程限制到特
这个问题在这里已经有了答案: 关闭 12 年前。 Possible Duplicate: Parsing JSON using C? 处理 JSON 的最佳 C 库是什么? http://www.j
我一直在使用递归 SpinTax 处理器,如 here 所示, 它适用于较小的字符串。然而,当字符串超过 20KB 时,它开始耗尽内存,这就成了一个问题。 如果我有这样的字符串: {Hello|How
C# 中是否有一个#define 允许我在编译时知道我是针对 x86 (Win32) 还是针对 x64 (Win64) 进行编译? 最佳答案 默认情况下没有办法做到这一点。原因是 C# 代码不是针对特
我不确定 SO 是否是提出这个问题的最佳场所。如果没有,请告诉我应该去哪个姊妹网站。 我一直在阅读一篇关于英特尔的可信执行技术 (TXT) 的论文,其中包含以下我似乎无法理解的文字: “英特尔创建了一
我需要一个工具来针对 执行 XSLT非常大 XML 文件。需要明确的是,我不需要任何东西来设计、编辑或调试 XSLT,只需执行它们即可。我正在使用的转换已经很好地优化了,但是大文件导致我尝试过的工具(
我正在学习Apache Camel。 能否请您解释一下关于Apache Camel的处理器,组件和端点之间的区别。 最佳答案 我建议所有刚接触Apache Camel的人阅读这篇文章,它很好地解释了C
我想知道在 Camel 处理器上获得同步的方法。 我在 docs 找到的唯一相关内容: Note that there is no concurrency or locking issue when
我看到这个 https://issues.apache.org/jira/browse/NIFI-78在 jira 上,但它引用了 java。有没有办法将 nifi 进程映射到服务器上的线程,以便我可
我有以下用例: 在一个应用程序中,我使用 X 线程消费一些消息,其中我有一个这样定义的 Consumer 实现: public interface Consumer { onMessage(
CPU12 处理器中是否有提供简单 NOT 功能的代码? 最佳答案 这应该是 the datasheet您正在寻找。没有可用的logical NOT,您必须自己编写代码。 关于assembly - 不
我对 Oracle XDK 中包含的 Java XSLT 处理器与 Oracle DB 中嵌入并由 SQL XMLtransform 函数使用的 XSLT 处理器之间的关系感到困惑。 这些是相同的野兽
我正在试用 Camel,发现它是一个方便的端点集成工具。我已经设置了以下实验性应用程序: 第一个端点是一个简单的 http-get 请求(在命令行上使用 curl)。这与使用 Jetty 的中央交换机
我正在为一个应用程序使用 Apache Camel 和 Spring Boot。我需要从目录中读取数据,然后解码读取的 xml,然后处理解码的对象以在其中设置更多数据,然后再次对其进行编码并将其发送到
我已经知道如何编写自定义处理器(扩展org.apache.nifi.processor.AbstractProcessor)。我已经使用了这种技术,并且也可以轻松创建自定义 org.apache.ni
是否有任何用 python 编写的 EasyList 处理器/解析器? http://easylist.adblockplus.org/en/ 最佳答案 找到了!就像一个月后:( http://adb
我有一个无法安装任何东西的开发(说来话长)。我只需要使用纯 HTML/浏览器 JS 进行开发,并且我想使用 CSS 预处理器。我喜欢 SCSS (SASS),但为了使用它,我必须在我的机器上安装 ru
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 要求我们推荐或查找工具、库或最喜欢的场外资源的问题对于 Stack Overflow 来说是偏离主题的,
对于新手来说,是直接从 ARM 处理器的数据表和用户手册开始,还是先了解一下 ARM 世界然后再继续? 最佳答案 当我开始使用一项(对我而言)新技术时,我首先会找到尽可能多的数据表和应用说明,然后直接
我使用 AMD FX X6 6300 型处理器。 (它支持虚拟化,我的 BIOS 设置为 ON) 我安装了“英特尔 x86 仿真器加速器”。当我尝试运行 Intel 加速器设置时,我得到该设置无法安装
我是一名优秀的程序员,十分优秀!