gpt4 book ai didi

java - 自定义无限制源在Google Cloud DataFlow中如何工作?

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:40:39 26 4
gpt4 key购买 nike

我正在尝试对Google Cloud Dataflow实施自定义无限源,以从Amazon Kinesis队列中读取。为了正确实现检查点,我想了解该机制的工作原理。

DataFlow如何工作

我试图通过阅读DataFlow文档来了解检查点,但是缺少一些关键的内容,因此我阅读了MillWheel论文。首先,让我解释一下我如何理解本文提出的概念。在数据流API方面,我将重点关注强大的生产设置中源及其消费者之间的交互:

在源上调用

  • createReader(),并将null值作为CheckpointMark传递给
  • 在阅读器实例上调用
  • start()
  • advance()在阅读器
  • 上被调用了X次
  • 现在,工作人员决定进行检查点标记。它在阅读器上调用getCheckpointMark()
  • 检查点由工作人员
  • 保留
    在检查点对象上调用
  • finalizeCheckpoint() 到目前为止读取的
  • 数据将发送给使用者,该使用者将记录存储在缓存中,以便对重复数据进行重复数据删除以进行可能的重试
  • 使用者将ACK发送到源。此时,从源中删除检查点,并在接受ACK时,使用者从缓存中删除记录(因为此时源将不会重试)
  • 如果源未能接收到ACK,则它将创建新的读取器实例,并通过最后一个检查点作为参数,并将重试将数据发送给使用者。如果使用者收到重试数据,它将尝试对
  • 进行重复数据删除
  • 一切重复。我不清楚它是如何发生的:第一个阅读器实例是否用于继续从流中读取?还是创建具有空检查点标记的新阅读器以执行此操作?还是第二台读取器(带有检查点数据)用于继续从流中读取数据?

  • PubSub与Kinesis

    现在,请让我说说Kinesis队列的工作方式,因为它与Pub / Sub有很大的不同(就我所了解的Pub / Sub的工作原理而言,我并不是一直在使用它)。

    发布/订阅

    我看到Pub / Sub拉模型在很大程度上依赖ACK,即客户端收到的消息被ACK,然后Pub / Sub中的“内部检查点”向前移动->这意味着即将到来的拉请求将在上一个ACK之后接收到连续记录。

    运动学

    Kinesis pull接口(这里根本没有推送)与您与文件进行交互的方式更相似。您可以在流中的任何位置开始读取(特殊值TRIM_HORIZON是流中最旧的记录,而LATEST是流中的最新记录),然后使用迭代器逐条记录前进(迭代器存储在服务器端,并且有5个分钟到期时间(如果未使用)。服务器没有ACK-客户端有责任跟踪流中的位置,并且您始终可以重新读取旧记录(当然,除非它们已过期)。

    问题/问题
  • 检查点应如何显示?在给定检查点的情况下,读取器是否应该仅读取与之相关的部分数据,还是应该从检查点读取所有数据?换句话说,我的检查点应该是:“x和y之间的数据”还是“x之后的所有数据”?
  • 我知道第一个读者将空值作为检查点标记,这非常好-这意味着我应该从应用程序开发人员定义的点开始读取。但是DataFlow可以像这样创建其他具有null的阅读器吗(例如,我想像一下当jvm死亡时,DataFlow会创建一个新的具有以null作为检查点的阅读器的阅读器)吗?在这种情况下,我不知道我的起始位置是什么,因为我可能已经使用以前的读取器读取了一些数据,但是现在进度的标记已经丢失了。
  • 在用户端用于记录重复数据删除的ID是什么?它是getCurrentRecordId返回的值吗?我问这个问题,是因为我考虑过使用流中的位置,因为它在特定流中是唯一的。但是,如果我以后通过拼合它们来加入几个运动学源,会发生什么->这将导致不同记录可能共享相同ID的情况。我是否应该使用(流名称,位置)元组作为id(在这种情况下是唯一的)。

  • 干杯,
    普热梅克

    最佳答案

    我们很高兴看到您将Kinesis与Dataflow结合使用。我们希望使用GitHub project向我们的contrib connector for Kinesis发出拉取请求。在开发过程中,我们也很乐意通过GitHub查看您的代码,并在那里提供反馈。

    检查点应如何显示?在给定检查点的情况下,读取器是否应该仅读取与之相关的部分数据,还是应该从检查点读取所有数据?换句话说,我的检查点应该是:“x和y之间的数据”还是“x之后的所有数据”?

    检查点标记应代表“该阅读器已生成并完成的数据”。例如,如果读者负责特定的分片,则检查点标记可能由分片标识符和已成功读取的该分片内的最后一个序列号Y组成,指示“已生成所有包含Y的数据”。

    我知道第一个读者将空值作为检查点标记,这很好-这意味着我应该从应用程序开发人员定义的点开始阅读。但是DataFlow可以像这样创建其他具有null的阅读器吗(例如,我想像一下当jvm死亡时,DataFlow会创建一个新的具有以null作为检查点的阅读器的阅读器)吗?在这种情况下,我不知道我的起始位置是什么,因为我可能已经使用以前的读取器读取了一些数据,但是现在进度的标记已经丢失了。

    即使在JVM故障中,最终的检查点也会保留。换句话说,当JVM死亡时,将使用最后确定的最后一个检查点来构造阅读器。除非打算从源头开始读取,否则在您的场景中当JVM在首次成功调用finalizeCheckpoint()之前死亡时,您不应看到使用空检查点创建的读取器。您可以使用新读取器上的检查点标记为从下一个要读取的记录开始的同一分片构造一个新的迭代器,并且可以继续进行而不会丢失数据。

    在用户端用于记录重复数据删除的ID是什么?它是由getCurrentRecordId返回的值吗?我问这个问题,是因为我考虑过使用流中的位置,因为它在特定流中是唯一的。但是,如果我以后通过拼合它们来加入几个运动学源,会发生什么->这将导致不同记录可能共享相同ID的情况。我是否应该使用(流名称,位置)元组作为id(在这种情况下是唯一的)。

    在Dataflow中,每个UnboundedSource(实现getCurrentRecordId并重写requiresDeduping以返回true)都将自行删除重复数据。因此,记录ID仅要求对于同一源实例是唯一的。来自两个不同来源的记录可以使用相同的记录ID,并且在展平期间不会将它们视为“重复项”。因此,如果Amazon Kinesis保证所有记录的ID在全局上都是唯一的(跨流中的所有分片)并且是永久的(例如,跨重新分片操作),那么这些ID应该适合用作记录ID。

    请注意,getCurrentRecordIdUnboundedReader的可选方法-如果您的检查点方案唯一标识每个记录,则无需实现它。 Kinesis使您可以按序列号顺序读取记录,并且看起来序列号是全局唯一的。因此,您可以将每个分片分配给generateInitialSplits中的不同工作人员,并且每个工作人员可能永远不会产生重复数据-在这种情况下,您可能根本不必担心记录ID。

    大多数答案都假设简单的情况,即您的Kinesis流永远不会更改其分片。另一方面,如果流上的分片发生变化,那么您的解决方案将变得更加复杂。例如,每个工作人员可能负责一个以上的分片,因此检查点标记将是分片的映射->序列号而不是序列号。而且拆分和合并的碎片可能会在不同的Dataflow工作人员之间移动以平衡负载,并且可能很难保证两个不同的工作人员都不会两次读取Kinesis记录。在这种情况下,将Kinesis记录ID与您所描述的语义一起使用就足够了。

    关于java - 自定义无限制源在Google Cloud DataFlow中如何工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34118103/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com