gpt4 book ai didi

java - Apache 弗林克 : Custom InputFormat only runs with parallelism of 1

转载 作者:行者123 更新时间:2023-11-30 02:13:04 25 4
gpt4 key购买 nike

我正在为 Apache Flink 实现自定义输入格式。我创建了一个返回 3 行的虚拟输入格式。

public class ElasticsearchInputFormat extends GenericInputFormat<Row> {
@Override
public void configure(Configuration parameters) {
System.out.println("configuring");
}

@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
return cachedStatistics;
}

@Override
public void open(GenericInputSplit split) throws IOException {
System.out.println("opening: " + split);
super.open(split);
}

@Override
public void close() throws IOException {
System.out.println("closing");
super.close();
}

private int a = 0;

public boolean reachedEnd() throws IOException {
a++;
return a > 3;
}

public Row nextRecord(Row reuse) throws IOException {
Row r = new Row(2);
r.setField(0, "osman");
r.setField(1, "wow");
return r;
}
}

我的示例代码如下:

final ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
env.setParallelism(8);

DataSource<Row> input = env.createInput(new ElasticsearchInputFormat());

input.print();

然而,虽然并行度设置为8,但它打印:

configuring
opening: GenericSplit (0/1)
closing
osman,wow
osman,wow
osman,wow

为什么没有并行化?我想要有多个分割,这样它就可以被其他运算符并行使用。

最佳答案

createCollectionsEnvironment() 返回一个隐式并行度为 1 的特殊环境。来自 Javadocs...

Creates a {@link CollectionEnvironment} that uses Java Collections underneath. This will execute in a single thread in the current JVM. It is very fast but will fail if the data does not fit into memory. parallelism will always be 1. This is useful during implementation and for debugging.

关于java - Apache 弗林克 : Custom InputFormat only runs with parallelism of 1,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49543205/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com