gpt4 book ai didi

apache-beam - 通过带有 session 窗口的 TextIO.write 写入引发 GroupByKey 消费异常

转载 作者:行者123 更新时间:2023-12-04 22:52:29 26 4
gpt4 key购买 nike

在 Apache Beam 2.0.0 中使用 session 窗口并通过 TextIO.write 写入文件时,通过调用 TextIO.write() 生成以下异常:
java.lang.IllegalStateException: GroupByKey must have a valid Window merge function. Invalid because: WindowFn has already been consumed by previous GroupByKey
即使没有干预 GroupByKey 也会发生异常s 可能会消耗窗口。我已经包含了代码——主函数说明了问题,并包含了一个 2.0.0 的辅助策略编写器类。

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;


public class TestSessionWindowToFile {
/**
* Support class: a filename policy for getting one file per window.
* See https://github.com/apache/beam/blob/release-2.0.0/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
*/
public static class LocalPerWindowFiles extends FileBasedSink.FilenamePolicy {
private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute();
private final String prefix;

public LocalPerWindowFiles(String prefix) {
this.prefix = prefix;
}

public String filenamePrefixForWindow(IntervalWindow window) {
return String.format("%s-%s-%s",
prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
}

@Override
public ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext context, String extension) {
IntervalWindow window = (IntervalWindow) context.getWindow();
String filename = String.format(
"%s-%s-of-%s%s",
filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
extension);
return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}

@Override
public ResourceId unwindowedFilename(
ResourceId outputDirectory, Context context, String extension) {
throw new UnsupportedOperationException("Unsupported.");
}
}


/**
* Creating a session windows and then asking TextIO to write the results leads to
* "java.lang.IllegalStateException: GroupByKey must have a valid Window merge function.
* Invalid because: WindowFn has already been consumed by previous GroupByKey"
*/
public static void main(String[] args) {
Pipeline p = Pipeline.create();

PCollection<String> input = p.apply(
Create.timestamped(
TimestampedValue.of("this", new Instant(1)),
TimestampedValue.of("is", new Instant(2)),
TimestampedValue.of("a", new Instant(3)),
TimestampedValue.of("test", new Instant(4)),
TimestampedValue.of("test", new Instant(5)),
TimestampedValue.of("test", new Instant(50)),
TimestampedValue.of("test", new Instant(51)),
TimestampedValue.of("test", new Instant(52))
)
);

PCollection<String> windowedInputs = input
// session windowing fails:
.apply(Window.into(Sessions.withGapDuration(new org.joda.time.Duration(10))));
// sliding windowing succeeds:
//.apply(Window.into(SlidingWindows.of(new Duration(30)).every(new Duration(10))));

// Invoke counting of elements so that sessioning is more clear
PCollection<KV<String, Long>> counts =
windowedInputs.apply(Count.perElement());
PCollection<String> writeableStrings = counts.apply("Convert to text",
ParDo.of(new DoFn<KV<String, Long>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String word = c.element().getKey();
Long count = c.element().getValue();
c.output(String.format("%s,%d", word, count));
}
}));

writeableStrings
.apply(TextIO.write()
.to("i_am_ignored_given_filename_policy")
.withFilenamePolicy(new LocalPerWindowFiles("results/testSessionWindow"))
.withWindowedWrites()
.withNumShards(1)
);
p.run();
}
}

我没有看到澄清围绕水印/触发、时间戳组合、Window.remerge()ing 的意图没有任何影响,
或使用 Beam 2.1.0(并且 Beam 2.1.0 包含一个默认文件名策略,该策略知道如何编写窗口文件以及非窗口文件)。

日志记录表明 session 构建正确,并且 SlidingWindow 成功生成输出文件(使用类似 .apply( Window.into(SlidingWindows.of(new Duration(30)).every(new Duration(10)))); 之类的变体代替 Sessions )。这表明 session 窗口 + TextIO.write 的交互配置错误或行为异常。

如何修改这段代码,为每个键+开始+结束窗口分组编写一个文本文件?

最佳答案

这是 WriteFiles 转换中的一个错误。我已提交 https://issues.apache.org/jira/browse/BEAM-3122 .不幸的是,除了修复错误之外,我想不出解决方法。

关于apache-beam - 通过带有 session 窗口的 TextIO.write 写入引发 GroupByKey 消费异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46983318/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com