gpt4 book ai didi

java - 如何处理运动流记录? (多处理器)

转载 作者:塔克拉玛干 更新时间:2023-11-03 06:37:09 25 4
gpt4 key购买 nike

我正在从事一个监控基于微服务的系统的项目。我创建的模拟微服务产生数据并将其上传到亚马逊Kinesis,现在我使用亚马逊的这段代码来生产和使用 Kinesis。但是我不明白如何添加更多处理器( worker )将处理相同的记录列表(可能同时),这意味着我正在尝试弄清楚在哪里以及如何将我的代码插入到我在下面添加的亚马逊添加代码中。

我的程序中将有两个处理器:

  1. 将每条记录保存到数据库中。
  2. 将更新显示系统监控的 GUI,前提是它可以将当前交易与有效交易进行比较。我的有效交易也将存储在数据库中。这意味着我们将能够看到系统中的所有数据流,并了解每个请求是如何从头到尾处理的。

我非常感谢您提供一些指导,因为这是我的第一个行业项目,而且我对 AWS 还是个新手(尽管我已经阅读了很多相关内容)。谢谢!

这是从这个链接中获取的来自亚马逊的代码: https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/src/com/amazonaws/services/kinesis/producer/sample/SampleConsumer.java

/*
* Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazonaws.services.kinesis.producer.sample;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;

/**
* If you haven't looked at {@link SampleProducer}, do so first.
*
* <p>
* As mentioned in SampleProducer, we will check that all records are received
* correctly by the KCL by verifying that there are no gaps in the sequence
* numbers.
*
* <p>
* As the consumer runs, it will periodically log a message indicating the
* number of gaps it found in the sequence numbers. A gap is when the difference
* between two consecutive elements in the sorted list of seen sequence numbers
* is greater than 1.
*
* <p>
* Over time the number of gaps should converge to 0. You should also observe
* that the range of sequence numbers seen is equal to the number of records put
* by the SampleProducer.
*
* <p>
* If the stream contains data from multiple runs of SampleProducer, you should
* observe the SampleConsumer detecting this and resetting state to only count
* the latest run.
*
* <p>
* Note if you kill the SampleConsumer halfway and run it again, the number of
* gaps may never converge to 0. This is because checkpoints may have been made
* such that some records from the producer's latest run are not processed
* again. If you observe this, simply run the producer to completion again
* without terminating the consumer.
*
* <p>
* The consumer continues running until manually terminated, even if there are
* no more records to consume.
*
* @see SampleProducer
* @author chaodeng
*
*/
public class SampleConsumer implements IRecordProcessorFactory {
private static final Logger log = LoggerFactory.getLogger(SampleConsumer.class);

// All records from a run of the producer have the same timestamp in their
// partition keys. Since this value increases for each run, we can use it
// determine which run is the latest and disregard data from earlier runs.
private final AtomicLong largestTimestamp = new AtomicLong(0);

// List of record sequence numbers we have seen so far.
private final List<Long> sequenceNumbers = new ArrayList<>();

// A mutex for largestTimestamp and sequenceNumbers. largestTimestamp is
// nevertheless an AtomicLong because we cannot capture non-final variables
// in the child class.
private final Object lock = new Object();

/**
* One instance of RecordProcessor is created for every shard in the stream.
* All instances of RecordProcessor share state by capturing variables from
* the enclosing SampleConsumer instance. This is a simple way to combine
* the data from multiple shards.
*/
private class RecordProcessor implements IRecordProcessor {
@Override
public void initialize(String shardId) {}

@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
long timestamp = 0;
List<Long> seqNos = new ArrayList<>();

for (Record r : records) {
// Get the timestamp of this run from the partition key.
timestamp = Math.max(timestamp, Long.parseLong(r.getPartitionKey()));

// Extract the sequence number. It's encoded as a decimal
// string and placed at the beginning of the record data,
// followed by a space. The rest of the record data is padding
// that we will simply discard.
try {
byte[] b = new byte[r.getData().remaining()];
r.getData().get(b);
seqNos.add(Long.parseLong(new String(b, "UTF-8").split(" ")[0]));
} catch (Exception e) {
log.error("Error parsing record", e);
System.exit(1);
}
}

synchronized (lock) {
if (largestTimestamp.get() < timestamp) {
log.info(String.format(
"Found new larger timestamp: %d (was %d), clearing state",
timestamp, largestTimestamp.get()));
largestTimestamp.set(timestamp);
sequenceNumbers.clear();
}

// Only add to the shared list if our data is from the latest run.
if (largestTimestamp.get() == timestamp) {
sequenceNumbers.addAll(seqNos);
Collections.sort(sequenceNumbers);
}
}

try {
checkpointer.checkpoint();
} catch (Exception e) {
log.error("Error while trying to checkpoint during ProcessRecords", e);
}
}

@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
log.info("Shutting down, reason: " + reason);
try {
checkpointer.checkpoint();
} catch (Exception e) {
log.error("Error while trying to checkpoint during Shutdown", e);
}
}
}

/**
* Log a message indicating the current state.
*/
public void logResults() {
synchronized (lock) {
if (largestTimestamp.get() == 0) {
return;
}

if (sequenceNumbers.size() == 0) {
log.info("No sequence numbers found for current run.");
return;
}

// The producer assigns sequence numbers starting from 1, so we
// start counting from one before that, i.e. 0.
long last = 0;
long gaps = 0;
for (long sn : sequenceNumbers) {
if (sn - last > 1) {
gaps++;
}
last = sn;
}

log.info(String.format(
"Found %d gaps in the sequence numbers. Lowest seen so far is %d, highest is %d",
gaps, sequenceNumbers.get(0), sequenceNumbers.get(sequenceNumbers.size() - 1)));
}
}

@Override
public IRecordProcessor createProcessor() {
return this.new RecordProcessor();
}

public static void main(String[] args) {
KinesisClientLibConfiguration config =
new KinesisClientLibConfiguration(
"KinesisProducerLibSampleConsumer",
SampleProducer.STREAM_NAME,
new DefaultAWSCredentialsProviderChain(),
"KinesisProducerLibSampleConsumer")
.withRegionName(SampleProducer.REGION)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

final SampleConsumer consumer = new SampleConsumer();

Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
consumer.logResults();
}
}, 10, 1, TimeUnit.SECONDS);

new Worker.Builder()
.recordProcessorFactory(consumer)
.config(config)
.build()
.run();
}
}

最佳答案

您的问题非常广泛,但这里有一些关于 Kinesis 消费者的建议,希望与您的用例相关。

每个 Kinesis 流都被分成一个或多个分片。每个分片都有限制,例如每秒不能将超过 MiB 的数据写入分片,并且每秒不能向单个分片发起超过 5 个 GetRecords(消费者的 processRecords 在后台调用)请求碎片。 (请参阅约束的完整列表 here。)如果您处理的数据量接近或超过这些约束,您会希望增加流中的分片数量。

当您只有一个消费者应用程序和一个工作人员时,它负责处理相应流的所有分片。如果有多个工作人员,他们每个人都负责分片的某个子集,因此每个分片都分配给一个且只有一个工作人员(如果您查看消费者日志,您会发现这被称为分片上的“租约”)。

如果您希望拥有多个独立摄取 Kinesis 流量和处理记录的处理器,您需要注册两个单独的消费者应用程序。在您上面引用的代码中,应用程序名称是 KinesisClientLibConfiguration 构造函数的第一个参数。请注意,即使它们是单独的消费者应用程序,每秒 5 个 GetRecords 的总数限制仍然适用。

换句话说,您需要有两个独立的进程,一个将实例化与 DB 对话的消费者,另一个将实例化更新 GUI 的消费者:

KinesisClientLibConfiguration databaseSaverKclConfig =
new KinesisClientLibConfiguration(
"DatabaseSaverKclApp",
"your-stream",
new DefaultAWSCredentialsProviderChain(),
// I believe worker ids don't need to be unique, but it's a good practice to make them unique so you can easily identify the workers
"unique-worker-id")
.withRegionName(SampleProducer.REGION)
// this only matters the very first time your consumer is launched, subsequent launches will read the checkpoint from the previous runs
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

final IRecordProcessorFactory databaseSaverConsumer = new DatabaseSaverConsumer();
KinesisClientLibConfiguration guiUpdaterKclConfig =
new KinesisClientLibConfiguration(
"GuiUpdaterKclApp",
"your-stream",
new DefaultAWSCredentialsProviderChain(),
"unique-worker-id")
.withRegionName(SampleProducer.REGION)
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON);

final IRecordProcessorFactory guiUpdaterConsumer = new GuiUpdaterConsumer();

DatabaseSaverConsumer和GuiUpdaterConsumer的实现呢?他们每个人都需要在 processRecords 方法中实现自定义逻辑。您需要确保它们中的每一个都在此方法中完成了正确的工作量,并且检查点逻辑是合理的。让我们来解读一下:

  • 假设 processRecords 处理 100 条记录需要 10 秒,但相应的分片在 10 秒内收到 500 条记录。 processRecords 的每次后续调用都会进一步落后于分片。这意味着要么需要从 processRecords 中提取一些工作,要么需要扩大分片的数量。
  • 相反,如果 processRecords 只需要 0.1 秒,那么 processRecords 每秒将被调用 10 次,超过分配给每个分片的每秒 5 个事务。如果我理解/没记错的话,无法在 KCL 配置中对 processRecords 的后续调用之间添加暂停,因此您必须在代码中添加 sleep 。
  • 检查点:每个工作人员都需要跟踪其进度,以便在意外中断并且另一个工作人员接管同一个分片时,它知道从哪里继续。它通常以两种方式之一完成:在 processRecords 的开头,或在结尾。在前一种情况下,你是在说“我可以跳过流中的一些记录,但绝对不想处理它们两次”;在后者中,您是在说“我可以两次处理某些记录,但绝对不能丢失任何记录”。 (当您需要两全其美时,即只处理一次记录,您需要将状态保存在工作人员之外的某个数据存储中。)在您的情况下,数据库编写器很可能需要在处理后检查点;我不太确定他的 GUI。

说到 GUI,您用什么来显示数据,为什么 Kinesis 消费者需要更新它,而不是 GUI 本身查询底层数据存储?

无论如何,我希望这对您有所帮助。如果您有更具体的问题,请告诉我。

关于java - 如何处理运动流记录? (多处理器),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55943491/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com