gpt4 book ai didi

java - Spark Structured Streaming with RabbitMQ 源码

转载 作者:塔克拉玛干 更新时间:2023-11-02 20:22:59 26 4
gpt4 key购买 nike

我正在尝试为 Structured Streaming 编写一个自定义接收器,它将使用来自 RabbitMQ 的消息。Spark recently released DataSource V2 API,这看起来很有前途。由于它抽象了很多细节,我想使用这个 API 是为了简单和性能。但是,由于它很新,因此可用的资源不多。我需要经验丰富的 Spark 人员进行一些说明,因为他们会更容易掌握关键点。我们开始吧:

我的起点是博客文章系列,第一部分 here .它展示了如何在没有流功能的情况下实现数据源。为了制作一个流媒体源,我稍微改变了它们,因为我需要实现 MicroBatchReadSupport代替(或补充)DataSourceV2 .

为了提高效率,明智的做法是让多个 spark 执行器同时使用 RabbitMQ,即来自同一个队列。如果我没有混淆的话,输入的每个分区 - 在 Spark 的术语中 - 对应于来自队列的消费者 - 在 RabbitMQ 的术语中。因此,我们需要为输入流设置多个分区,对吧?

类似于part 4 of the series , 我实现了 MicroBatchReader如下:

@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
int partition = options.getInt(RMQ.PARTITICN, 5);
List<DataReaderFactory<Row>> factories = new LinkedList<>();
for (int i = 0; i < partition; i++) {
factories.add(new RMQDataReaderFactory(options));
}
return factories;
}

我正在返回一个工厂列表,希望列表中的每个实例都被用来创建一个读者,这也是一个消费者。这种方法正确吗?

我希望我的接收者是可靠的,即在每条处理过的消息之后(或者至少写入 chekpoint 目录以供进一步处理),我需要将其确认回 RabbitMQ。问题从这里开始:这些工厂是在驱动程序中创建的,实际的读取过程通过 DataReader 在执行程序中进行。秒。然而,commit方法是 MicroBatchReader 的一部分,而不是 DataReader。由于每个 MicroBatchReader 我有很多 DataReader,我应该如何将这些消息返回给 RabbitMQ?或者我应该在 next 时确认在 DataReader 上调用方法?安全吗?如果是这样,那么 commit 函数的目的是什么?

CLARIFICATION: OBFUSCATION: 答案中提供的关于重命名某些类/函数的链接(除了那里的解释)使一切都更加清晰 比以前更糟。引自 there :

Renames:

  • DataReaderFactory to InputPartition

  • DataReader to InputPartitionReader

...

InputPartition's purpose is to manage the lifecycle of the associated reader, which is now called InputPartitionReader, with an explicit create operation to mirror the close operation. This was no longer clear from the API because DataReaderFactory appeared to be more generic than it is and it isn't clear why a set of them is produced for a read.

编辑: 然而,docs清楚地说“读取器工厂将被序列化并发送给执行器,然后数据读取器将在执行器上创建并进行实际读取。”

为了使消费者可靠,我必须仅在特定消息在 Spark 端提交后才确认该消息。 Note that必须在传递消息的同一连接上确认消息,但在驱动程序节点调用提交函数。我如何在工作节点/执行节点上提交?

最佳答案

<删除>> 我正在返回一个工厂列表,希望列表中的每个实例都被用来创建一个读者,这也是一个消费者。这种做法正确吗?源 [socket][1] 源实现有一个线程将消息推送到内部 ListBuffer。换句话说,有一个消费者(线程)填满了内部 ListBuffer,它**然后**由 `planInputPartitions`(`createDataReaderFactories` 已 [重命名][2] 为 `planInputPartitions`)分成多个分区。此外,根据 [MicroBatchReadSupport][3] 的 Javadoc> 执行引擎将在流式查询开始时创建一个微批处理读取器,交替调用 setOffsetRange 和 createDataReaderFactories 以处理每个批处理,然后在执行完成时调用 stop()。请注意,由于重启或故障恢复,单个查询可能会多次执行。换句话说,应该调用 `createDataReaderFactories` **多次** 次,根据我的理解,这表明每个 `DataReader` 负责一个静态输入分区,这意味着 DataReader 不应该是消费者。----------> 但是,commit 方法是 MicroBatchReader 的一部分,而不是 DataReader ... 如果是这样,那么 commit 函数的目的是什么?提交函数的部分原因可能是防止 MicroBatchReader 的内部缓冲区变大。通过提交偏移量,您可以有效地从缓冲区中删除小于偏移量的元素,因为您 promise 不再处理它们。您可以使用 `batches.trimStart(offsetDiff)` 在套接字源代码中看到这种情况


<删除>我不确定是否要实现一个可靠的接收器,所以我希望有一个更有经验的 Spark 人过来解决你的问题,因为我也很感兴趣!希望这可以帮助!

编辑

我只研究了 socket , 和 wiki-edit来源。这些资源还没有准备好生产,这是问题所在没有寻找的东西。相反,kafka source 是更好的起点,与上述来源不同,它有多个像作者正在寻找的消费者。

但是,也许如果您正在寻找不可靠的来源,上面的 socket 和 wikiedit 来源提供了一个不太复杂的解决方案。

关于java - Spark Structured Streaming with RabbitMQ 源码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50684667/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com