gpt4 book ai didi

java - AWS Lambda : Java : Kinesis Events

转载 作者:行者123 更新时间:2023-11-30 06:06:34 25 4
gpt4 key购买 nike

我无法找到我正在尝试做的事情的示例...

我想用 Java 创建 Lambda 函数。我以为我会始终使用 Javascript 来执行 Lambda 函数,但在这种情况下,我最终会重新使用已经用 Java 编写的应用程序逻辑,因此这是有道理的。

过去,我编写过由 Kinesis 事件触发的 Javascript Lambda 函数。 super 简单,函数接收事件作为参数,做一些事情,瞧。我想用 Java 做同样的事情。非常简单:

Kinesis 事件 -> 触发函数 -> (Java) 接收 Kinesis 事件,并对其执行操作

有人有这种用例的经验吗?

最佳答案

这是我编写的一些示例代码,用于在内部演示相同的概念。此代码将事件从一个流转发到另一个流。

请注意,如果转发出现错误,此代码不会处理重试,也不意味着在生产环境中具有高性能,但它确实演示了如何处理来自发布流的记录。

import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class KinesisToKinesis {

private LambdaLogger logger;
final private AmazonKinesisClient kinesisClient = new AmazonKinesisClient();

public PutRecordsResult eventHandler(KinesisEvent event, Context context) {
logger = context.getLogger();
if (event == null || event.getRecords() == null) {
logger.log("Event contains no data" + System.lineSeparator());
return null;
} else {
logger.log("Received " + event.getRecords().size() +
" records from " + event.getRecords().get(0).getEventSourceARN() + System.lineSeparator());
}

final Long startTime = System.currentTimeMillis();

// set up the client
Region region;
final Map<String, String> environmentVariables = System.getenv();
if (environmentVariables.containsKey("AWS_REGION")) {
region = Region.getRegion(Regions.fromName(environmentVariables.get("AWS_REGION")));
} else {
region = Region.getRegion(Regions.US_WEST_2);
logger.log("Using default region: " + region.toString() + System.lineSeparator());
}
kinesisClient.setRegion(region);

Long elapsed = System.currentTimeMillis() - startTime;
logger.log("Finished setup in " + elapsed + " ms" + System.lineSeparator());

PutRecordsRequest putRecordsRequest = new PutRecordsRequest().withStreamName("usagecounters-global");
List<PutRecordsRequestEntry> putRecordsRequestEntryList = event.getRecords().parallelStream()
.map(r -> new PutRecordsRequestEntry()
.withData(ByteBuffer.wrap(r.getKinesis().getData().array()))
.withPartitionKey(r.getKinesis().getPartitionKey()))
.collect(Collectors.toList());

putRecordsRequest.setRecords(putRecordsRequestEntryList);

elapsed = System.currentTimeMillis() - startTime;
logger.log("Processed " + putRecordsRequest.getRecords().size() +
" records in " + elapsed + " ms" + System.lineSeparator());

PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest);
elapsed = System.currentTimeMillis() - startTime;
logger.log("Forwarded " + putRecordsRequest.getRecords().size() +
" records to Kinesis " + putRecordsRequest.getStreamName() +
" in " + elapsed + " ms" + System.lineSeparator());
return putRecordsResult;
}
}

关于java - AWS Lambda : Java : Kinesis Events,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43768028/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com