gpt4 book ai didi

java - 在 Google Dataflow 中作为 DataflowPipelineRunner 运行时访问资源文件

转载 作者:行者123 更新时间:2023-11-30 08:33:03 26 4
gpt4 key购买 nike

在我的项目中,我试图将一些元数据添加到我的管道中处理的数据中。元数据位于 src 文件夹旁边名为 resources 的子文件夹中的 DBF 文件中。

src 文件夹包含主类,我有几个包(IO、处理、聚合、实用程序)。

我在定义管道的主类中读取和处理包含元数据的文件。我用来访问文件的代码如下:

File temp1 = new File("resources/xxx.dbf");

我检查文件是否被发现使用:

LOG.info(temp1.exists())

运行良好。

有消息以字符串形式传入,我使用 PubSubIO 读取这些消息。我使用此文件的内容来填充包含键和值的 Map。

Map<String, ArrayList<Double>> sensorToCoordinates = coordinateData.getSensorLocations();

然后我在我创建的名为“SensorValues”的自定义类中设置了一个静态变量:

SensorValue.setKeyToCoordinates(sensorToCoordinates);

当我使用 ParDo 函数(从 PCollection 到 PCollection)将传入消息从字符串解析为 SensorValue 类时,该映射用于 SensorValue 类的构造函数。

使用 DirectPipelineRunner 运行这段代码效果很好。但是,当我使用 DataflowPipelineRunner 并尝试访问 SensorValue 构造函数中的映射时,我遇到了 NullPointerException。

现在我想知道为什么 setter 在使用 DataflowPipelineRunner 时不工作(我猜这与在多个工作人员之间分配执行有关)以及使用任何静态资源文件的最佳做法是什么丰富您的管道?

最佳答案

您是对的,问题是因为 ParDo 的执行分配给了多个工作人员。他们没有本地文件,他们可能没有 map 的内容。

这里有几个选项:

  1. 将文件放入 GCS,让管道读取文件的内容(使用 TextIO 或类似的东西)并将其用作 side-input到您以后的处理。

  2. 将文件包含在管道的资源中,并将其加载到需要它的 DoFnstartBundle 中(将来会有方法使这种情况发生的频率低于每个 bundle )。

  3. 您可以将映射的内容序列化为 DoFn 的参数,方法是将其作为非静态字段传递给该类的构造函数。

随着此文件大小的增加,选项 1 更好(因为它可以支持将其拆分成多个部分并进行查找),而选项 2 可能会减少检索文件的网络流量。选项 3 仅在文件非常小的情况下才有效,因为它会显着增加序列化 DoFn 的大小,这可能导致作业太大而无法提交到 Dataflow 服务。

关于java - 在 Google Dataflow 中作为 DataflowPipelineRunner 运行时访问资源文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39649173/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com