- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
在 Apache Beam 2.0.0 中使用 session 窗口并通过 TextIO.write 写入文件时,通过调用 TextIO.write() 生成以下异常:java.lang.IllegalStateException: GroupByKey must have a valid Window merge function. Invalid because: WindowFn has already been consumed by previous GroupByKey
即使没有干预 GroupByKey
也会发生异常s 可能会消耗窗口。我已经包含了代码——主函数说明了问题,并包含了一个 2.0.0 的辅助策略编写器类。
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
public class TestSessionWindowToFile {
/**
* Support class: a filename policy for getting one file per window.
* See https://github.com/apache/beam/blob/release-2.0.0/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
*/
public static class LocalPerWindowFiles extends FileBasedSink.FilenamePolicy {
private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute();
private final String prefix;
public LocalPerWindowFiles(String prefix) {
this.prefix = prefix;
}
public String filenamePrefixForWindow(IntervalWindow window) {
return String.format("%s-%s-%s",
prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
}
@Override
public ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext context, String extension) {
IntervalWindow window = (IntervalWindow) context.getWindow();
String filename = String.format(
"%s-%s-of-%s%s",
filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
extension);
return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(
ResourceId outputDirectory, Context context, String extension) {
throw new UnsupportedOperationException("Unsupported.");
}
}
/**
* Creating a session windows and then asking TextIO to write the results leads to
* "java.lang.IllegalStateException: GroupByKey must have a valid Window merge function.
* Invalid because: WindowFn has already been consumed by previous GroupByKey"
*/
public static void main(String[] args) {
Pipeline p = Pipeline.create();
PCollection<String> input = p.apply(
Create.timestamped(
TimestampedValue.of("this", new Instant(1)),
TimestampedValue.of("is", new Instant(2)),
TimestampedValue.of("a", new Instant(3)),
TimestampedValue.of("test", new Instant(4)),
TimestampedValue.of("test", new Instant(5)),
TimestampedValue.of("test", new Instant(50)),
TimestampedValue.of("test", new Instant(51)),
TimestampedValue.of("test", new Instant(52))
)
);
PCollection<String> windowedInputs = input
// session windowing fails:
.apply(Window.into(Sessions.withGapDuration(new org.joda.time.Duration(10))));
// sliding windowing succeeds:
//.apply(Window.into(SlidingWindows.of(new Duration(30)).every(new Duration(10))));
// Invoke counting of elements so that sessioning is more clear
PCollection<KV<String, Long>> counts =
windowedInputs.apply(Count.perElement());
PCollection<String> writeableStrings = counts.apply("Convert to text",
ParDo.of(new DoFn<KV<String, Long>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String word = c.element().getKey();
Long count = c.element().getValue();
c.output(String.format("%s,%d", word, count));
}
}));
writeableStrings
.apply(TextIO.write()
.to("i_am_ignored_given_filename_policy")
.withFilenamePolicy(new LocalPerWindowFiles("results/testSessionWindow"))
.withWindowedWrites()
.withNumShards(1)
);
p.run();
}
}
.apply( Window.into(SlidingWindows.of(new Duration(30)).every(new Duration(10))));
之类的变体代替
Sessions
)。这表明 session 窗口 + TextIO.write 的交互配置错误或行为异常。
最佳答案
这是 WriteFiles 转换中的一个错误。我已提交 https://issues.apache.org/jira/browse/BEAM-3122 .不幸的是,除了修复错误之外,我想不出解决方法。
关于apache-beam - 通过带有 session 窗口的 TextIO.write 写入引发 GroupByKey 消费异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46983318/
我正在执行 UPDATE .WRITE() 语句,并发现它显然只有在您像这样定义它时才有效: string sql = "UPDATE [dbo].[Table] SET [Column].WRITE
我在 Unix 系统上用 C 编程。我知道: write(fd,"ABCD",4); 比这样做更好: write(fd, "A", 1); write(fd, "B", 1); write(fd, "
func hash(s string) uint32 { h := fnv.New32a() h.Write([]byte(s)) return h.Sum32() } 对于这
在经典的 asp 页面中,有人告诉我您可以使用 vbscript 或 jscript。而 jscript 就是 javascript。 所以我不确定 Response.Write、Response.W
当 openssl 子进程尝试 write() 到本地目录时,我收到此错误。在调用 write() 之前连接已关闭。它没有与 ssl 连接,因为我什至无法从 nodejs 文档启动示例代码。 我错过了
最近我在试验netty。我遇到了以下问题: ctx.channel().write(new TextWebSocketFrame("hello")) 没有在客户端返回 hello,但是 ctx.cha
请解释以下内容: def feed(data): import os print "DATA LEN: %s" % len(data) f = open("copy", "w") f.
有什么区别debug.write 和 Trace.write ?每个应该什么时候使用? 最佳答案 在典型的发布构建配置中,Debug class 被禁用并且什么都不做。 Trace但是,仍然可以在发行
我只是想知道,就性能而言,哪个更好(我在 FileStream 中使用 StreamWriter): 多次调用 Stream.Write(): StreamWriter sw = new Stream
我发现自己写给 stringwriter,然后在函数末尾执行 resp.Write(sw.ToString())。这是不必要的吗?如果我多次使用 HttpResponse.Write,即使我的页面是
我正在尝试通过 JavaScript 文件从 electron 打开一个新窗口,它可以工作,并打开了新窗口,但我无法将 HTML/文本写入新文件。我收到那个错误: Cannot read proper
我们对 QIODevice::write 的一般行为和具体的 QTcpSocket 实现感到非常困惑。有一个 similar question已经,但答案并不令人满意。主要的混淆源于分别提到的 byt
我知道这听起来像是一个愚蠢的问题: write(*,*) 和 write(6,*) ?我在我研究所的 super 计算机上运行一个复杂的代码,它通过一个不同于 6 的单元号输出一个数据文件,显然编译的
我有一个结构体,它可以通过一系列复杂的方法调用转换为文本,其中包含大量 write!调用。此文本可以写入文件或调试日志。我正在决定是否使用 fmt::Write 或 io::Write .我不能真正使
已关闭。这个问题是 not reproducible or was caused by typos 。目前不接受答案。 这个问题是由拼写错误或无法再重现的问题引起的。虽然类似的问题可能是 on-top
In the C standard library, an output can't be followed by an input and vice versa. 对于Linux API,可以在re
我希望能够为一件事做 document.write。然后延迟半秒,然后再记录。写一些。你知道这是否可能吗?而且,如果是这样,怎么办?到目前为止,我已经尝试过了,但没有奏效: document.writ
为什么通过 onclick 属性调用的 write() 函数解析为 document.write() 并替换文档?有什么办法可以阻止这种情况发生吗? Write Function Alternat
我想创建一个包含多个“页面”的文本文件,并将每个页面的字节偏移量记录在一个单独的文件中。为此,我将字符串打印到主输出文件并使用 bytes_written += file.write(str) 计算字
关闭。这个问题需要更多focused .它目前不接受答案。 想改进这个问题吗? 更新问题,使其只关注一个问题 editing this post . 关闭 8 年前。 Improve this qu
我是一名优秀的程序员,十分优秀!