gpt4 book ai didi

google-bigquery - 当使用无界 PCollection 从 TextIO 到 BigQuery 时,数据卡在 BigQueryIO 内部的 Reshuffle/GroupByKey 中

转载 作者:行者123 更新时间:2023-12-04 15:06:08 25 4
gpt4 key购买 nike

我正在使用 TextIO 从云存储读取。因为我想让工作连续运行,所以我使用 watchForNewFiles。

为了完整性,如果我使用有界 PCollections(批处理模式下没有 watchForNewFiles 和 BigQueryIO),我读取的数据工作正常,因此没有数据问题。

我有 p.run().waitUntilFinish();在我的代码中,所以管道运行。它不会给出任何错误。

Apache Beam 版本为 2.8.0

PCollection<String> stream =
p.apply("Read File", TextIO
.read()
.from(options.getInput())
.watchForNewFiles(
Duration.standardMinutes(1),
Watch.Growth.afterTimeSinceNewOutput(Duration.standardHours(1))
)
.withCompression(Compression.AUTO));

这工作得很好,并在文件可用时立即读取。 PCollection 是无界的,包含来自这些文件的文本行。

做了一些改造后
PCollection<List<String>> lines = stream.apply("Parse CSV",
ParDo.of(new ParseCSV())
);

PCollection<TableRow> rows = lines.apply("Convert to BQ",
ParDo.of(new BigQueryConverter(schema))
);

ParseCSV 步骤通过 outputWithTimestamp 向其接收器添加时间戳。

我最终得到了准备流式传输到 BigQuery 的 TableRows PCollection。
为此,我使用
WriteResult result = rows.apply("WriteToBigQuery",
BigQueryIO.
<TableRow>write()
.withFormatFunction(input -> input)
.withSchema(bqSchema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withExtendedErrorInfo()
.to(options.getOutput())

);

这永远不会将数据写入 BigQuery。如果我查看 UI,我会发现 BigQueryIO 确实如此
  • 分片表写入
  • TagWithUniqueId
  • 改组
  • 窗口.进入
  • GroupByKey

  • 数据进入和离开前两步。但从来没有重新洗牌。这只读取数据,但从不传递数据。 Reshuffle 中的步骤是 GroupByKey。

    由于集合是无界的,我试图用
    lines = lines.apply(Window.configure()
    .<List<String>>into(FixedWindows
    .of(Duration.standardSeconds(10))
    )
    );

    这应该强制执行 GroupByKey 的任何操作在 10 秒后释放窗口。但事实并非如此。
    lines = lines.apply(Window.configure()
    .<List<String>>into(FixedWindows
    .of(Duration.standardSeconds(10))
    )
    .triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10)))
    .withAllowedLateness(Duration.standardSeconds(0))
    .discardingFiredPanes()
    );

    在处理时间上添加特定触发器也无济于事。
    有什么线索吗?提前致谢!

    最佳答案

    一种解决方法可能是(对我有用)为每个元素分配一个新键,并强制数据流使用 Reshuffle 或 GroupByKey 解耦转换。

    streams.apply(WithKeys.of(input -> 1)).setCoder(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of()))
    .apply(Reshuffle.of())
    .apply(MapElements.via(new SimpleFunction<KV<Integer, String>, String>() {
    @Override
    public String apply(KV<Integer, String> input) {
    return input.getValue();
    }
    }))
    .apply("convertToTableRow", ...)
    .apply("WriteToBigQuery", ...)

    key 可以是示例中的常量,也可以是随机的。如果您选择随机,那么您必须将范围设置得足够小以适应 JVM 内存。赞 ThreadLocalRandom.current().nextInt(0, 5000)

    关于google-bigquery - 当使用无界 PCollection 从 TextIO 到 BigQuery 时,数据卡在 BigQueryIO 内部的 Reshuffle/GroupByKey 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53266689/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com