- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
这是我编写的 Apache Beam PTransform:
public class NormalizeTransform
extends PTransform<PCollection<String>, PCollection<SimpleTable>> {
@Override
public PCollection<SimpleTable> expand(PCollection<String> lines) {
ExtractFields extract_i = new ExtractFields();
PCollection<SimpleTable> table = lines
.apply("Extracting data model fields from lines",
ParDo.of(extract_i));
}
public class ExtractFields extends DoFn<String, SimpleTable> {
@ProcessElement
public void processElement(ProcessContext c){
try {
String line = c.element();
// fill table
for (Table_Struct st: this.struct){
String o = line.substring(st.pos_1, st.pos_2));
this.table.getClass().getField(st.Field_Name).set(
this.table, o);
}
c.output(this.table);
}
}
并且偶尔会出现以下错误IllegalMutationException
,这意味着我重复运行代码,有时有效,有时无效。
org.apache.beam.sdk.util.IllegalMutationException: PTransform Transform/Extracting data model fields from lines/ParMultiDo(ExtractFields) mutated value after it was output (new value was ). Values must not be mutated in any way after being output.
at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.commit(ImmutabilityCheckingBundleFactory.java:135)
at org.apache.beam.runners.direct.EvaluationContext.commitBundles(EvaluationContext.java:214)
at org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:163)
at org.apache.beam.runners.direct.ExecutorServiceParallelExecutor$TimerIterableCompletionCallback.handleResult(ExecutorServiceParallelExecutor.java:268)
at org.apache.beam.runners.direct.TransformExecutor.finishBundle(TransformExecutor.java:168)
at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:109)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
我认为我没有专门更改代码中任何位置的值的任何输出。 MutationDetectors 将比较两个值:previousValue 和 newValue。就我而言, previousValue 通常是一个输入值,而 newValue 是另一个输入值。为什么 Transform 会尝试使用一个输入值来修改另一个输入值?
最佳答案
我不确定 this.table
来自哪里。
但为了帮助您理解错误消息,请记住,可能会在多个输入上调用 processElement
。第一次调用将输出 this.table
。下一次调用将在输出之前改变 this.table
。
如果此突变发生在第一次调用输出 this.table
之后且下游代码有机会读取 this.table
之前,您将得到不正确的结果。因此,此错误表明您在输出引用后更改了 this.table
的内容 - 这是您不应该做的事情。
考虑 (1) 输出 this.table
的副本或 (2) 将表创建为本地字段。例如:
@ProcessElement
public void processElement(ProcessContext c){
try {
String line = c.element();
Table table = /* create the table */;
// fill table
for (Table_Struct st: this.struct){
String o = line.substring(st.pos_1, st.pos_2));
this.table.getClass().getField(st.Field_Name)
.set(table, o);
}
c.output(table);
}
}
另请注意,在每个 processElement
中执行反射可能会比预期慢。如果可以直接修改字段,可能会更好。
关于java - 来自 Beam PTransform 的 IllegalMutationException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44378033/
我是该项目的新手,我正在尝试在 Dataflow 和数据库之间创建连接器。 文档明确指出我应该使用 Source 和 Sink,但我看到很多人直接使用与 PInput 或 PDone 关联的 PTra
我使用 Beam 在本地构建并设法运行了一个令人满意的管道,我已准备好将作业发送到 DataFlow。 我计划只使用 save_main_session 管道选项来 pickle 我的 session
我正在使用 Apache Beam 为 TensorFlow 预处理数据。我想根据数据集中的示例数量选择 TFRecord 分片的数量。代码的相关部分是: EXAMPLES_PER_SHARD = 5
这是我编写的 Apache Beam PTransform: public class NormalizeTransform extends PTransform, PCollection> {
我正在尝试将 PTransform 应用于 PCollectionTuple,但无法弄清楚编译器为什么会提示。 我想这样做是为了将连接某些 csv 行所需的多个步骤抽象为单个 PTransform(P
升级到 Beam 2.0 后 Pipeline类(class)没有 getOptions()上课了。 我有一个复合 PTransform依赖于获取其 expand 中的选项方法: public cla
我是一名优秀的程序员,十分优秀!