gpt4 book ai didi

java - 光束 : writing per window element count with window boundaries

转载 作者:行者123 更新时间:2023-12-02 01:47:54 25 4
gpt4 key购买 nike

为了进行简单的概念验证,我尝试在两分钟的窗口中显示点击数据。我想要做的就是将每个窗口的计数以及窗口的边界打印到 BigQuery。在运行我的管道时,我不断收到以下错误:

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.RuntimeException: java.io.IOException: Insert failed: [{"errors":[{"debugInfo":"","location":"windowend","message":"This field is not a record.","reason":"invalid"}],"index":0}]

管道看起来像这样:

// Creating the pipeline
Pipeline p = Pipeline.create(options);

// Window items
PCollection<TableRow> counts = p.apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply("AddEventTimestamps", WithTimestamps.of(TotalCountPipeline::ExtractTimeStamp).withAllowedTimestampSkew(Duration.standardDays(10000)))
.apply("Window", Window.<String>into(
FixedWindows.of(Duration.standardHours(options.getWindowSize())))
.triggering(
AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardDays(10000))
.accumulatingFiredPanes())
.apply("CalculateSum", Combine.globally(Count.<String>combineFn()).withoutDefaults())
.apply("BigQueryFormat", ParDo.of(new FormatCountsFn()));

// Writing to BigQuery
counts.apply("WriteToBigQuery",BigQueryIO.writeTableRows()
.to(options.getOutputTable())
.withSchema(getSchema())
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

// Execute pipeline
p.run().waitUntilFinish();

我猜测这与 BigQuery 格式化函数有关,其实现如下:

static class FormatCountsFn extends DoFn<Long, TableRow> {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
TableRow row =
new TableRow()
.set("windowStart", window.maxTimestamp().toDateTime())
.set("count", c.element().intValue());
c.output(row);
}
}

受到 this post 的启发。有人能解释一下吗?我似乎无法理解它。

最佳答案

显然这个问题的答案与波束窗口无关,仅与 BigQuery 有关。将 DateTime 对象写入 BigQuery 行需要采用正确 yyyy-MM-dd HH:mm:ss 格式的字符串,这与我提供的 DateTime 对象形成对比。

关于java - 光束 : writing per window element count with window boundaries,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53474504/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com