gpt4 book ai didi

akka-stream - Alpakka KinesisSink : Can not push messages to Stream

转载 作者:行者123 更新时间:2023-12-04 03:26:08 25 4
gpt4 key购买 nike

我正在尝试使用 alpakka kinesis connector将消息发送到 Kinesis Stream 但我没有成功。我尝试了下面的代码,但我的流中没有任何内容。

implicit val sys = ActorSystem()
implicit val mat = ActorMaterializer()
implicit val kinesisAsync: AmazonKinesisAsync = AmazonKinesisAsyncClientBuilder.defaultClient()


val debug = Flow[PutRecordsRequestEntry].map { reqEntry =>
println(reqEntry)
reqEntry
}

val entry = new PutRecordsRequestEntry()
.withData(ByteBuffer.wrap("Hello World".getBytes))
.withPartitionKey(Random.nextInt.toString)

Source.tick(1.second, 1.second, entry).to(KinesisSink("myStreamName", KinesisFlowSettings.defaultInstance)).run()

// 2) Source.tick(1.second, 1.second,entry).via(debug).to(KinesisSink("myStreamName", inesisFlowSettings.defaultInstance)).run()
  • 使用 Sink.foreach(println)而不是 KinesisSink打印出 PutRecordsRequestEntry每 1 秒 => 预期
  • 使用 KinesisSink ,条目仅生成一次。

  • 我究竟做错了什么 ?

    我正在使用 KinesisSource 检查我的流并且正在阅读(用另一个流测试)

    AWS Kinesis 的监控仪表板也没有显示任何 PUT 请求。

    注 1:我尝试启用 alpakka 的调试日志但没有效果
    <logger name="akka.stream.alpakka.kinesis" level="DEBUG"/>

    在我的 logback.xml + 在根级别调试

    最佳答案

    下面要考虑的一些故障排除步骤 - 我希望它们有所帮助。

    我怀疑您可能缺少 Kinesis 客户端的凭据和/或区域配置。

    Kinesis Firehose

    Kinesis Producer Library(Alpakka 似乎正在使用)不适用于 Kinesis Firehose。如果您尝试写入 Firehose,这将不起作用。

    应用程序日志

    您可能希望为 Kinesis Producer Library 启用日志记录,而不仅仅是在 Alpakka 本身中。相关文档可在此处获得:

    Configuring the Kinesis Producer Library

    Configuration Defaults for Kinesis Producer Library

    AWS 侧日志记录

    AWS CloudTrail 自动为 Kinesis 流启用开箱即用,默认情况下,AWS 将为您保留 90 天的 CloudTrail 日志。

    https://docs.aws.amazon.com/streams/latest/dev/logging-using-cloudtrail.html

    您可以使用 CloudTrail 日志查看您的应用程序代表您对 Kinesis 进行的 API 调用。请求显示通常会出现适度延迟 - 但这会让您知道请求是否由于 IAM 权限不足或您的 AWS 资源配置的其他问题而失败。

    检查 SDK 身份验证

    Kinesis 客户端将使用 DefaultAWSCredentialsProviderChain 凭证提供程序向 AWS 发出请求。

    您需要确保提供具有 IAM 权限的有效 AWS 凭证以向 Kinesis 发出这些请求。如果您的代码在 AWS 上运行,则提供应用程序凭证的首选方式是使用 IAM Roles (在实例启动时指定)。

    在您的代码中构建客户端时,您还需要指定 AWS 区域。使用您的 application.properties用于配置此,或者如果您的应用程序是位于单个区域中的 CloudFormation 堆栈的一部分 - 使用 instance metadata当您的代码在 AWS 上运行时检索当前区域的服务。

    关于akka-stream - Alpakka KinesisSink : Can not push messages to Stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48081710/

    25 4 0