gpt4 book ai didi

amazon-kinesis - 仅当记录数超过 x 条时才启动 Kinesis 消费者?

转载 作者:行者123 更新时间:2023-12-04 12:56:47 26 4
gpt4 key购买 nike

有没有办法创建具有缓冲区限制的 Kinesis 使用者?赞 here :

#Flush when buffer exceeds 100000 Amazon Kinesis records, 64 MB size limit or when time since last buffer exceeds 1 hour
bufferByteSizeLimit = 67108864
bufferRecordCountLimit = 100000
bufferMillisecondsLimit = 3600000

本质上,我想开始 IRecordProcessor仅当有大量数据时。我不能使用上面的连接器代码,因为我需要 latest amazon-kinesis-client 的版本.

最佳答案

我最终实现了自己的解决方案。

  • 有一个 ConcurrentHashMap存储流数据
      private val recsMap = new ConcurrentHashMap[String, List[RecordStore]]
    private val currByteSize = new AtomicLong(0L)
    private val currRecordCount = new AtomicLong(0L)
    private val currSeconds = new AtomicLong(0L)
  • 更新计数器(按大小/时间/记录数量)
  • 达到计数器时刷新数据
      recsMap.foreach(write2File())
    // clean up
    recsMap.remove(writtenRecs())
  • 检查点和重置计数器
      // reset counters
    currByteSize.getAndSet(value)
    currRecordCount.getAndSet(value)
    currSeconds.getAndSet(value)
  • 关于amazon-kinesis - 仅当记录数超过 x 条时才启动 Kinesis 消费者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53575237/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com