gpt4 book ai didi

java - 为什么 Apache Flink 从数据流中删除事件?

转载 作者:行者123 更新时间:2023-11-30 02:13:45 25 4
gpt4 key购买 nike

在下面的单元测试用例中,生成了 numberOfElements 指定的一些事件,并将其作为数据流提供。该单元案例在生产线上随机失败。

assertEquals(numberOfElements, CollectSink.values.size());

任何解释 Apache Flink 跳过事件的原因。

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;

public class FlinkTest {

StreamExecutionEnvironment env;

@Before
public void setup() {
env = StreamExecutionEnvironment.createLocalEnvironment();
}

@Test
public void testStream1() throws Exception {
testStream();
}

@Test
public void testStream2() throws Exception {
testStream();
}

@Test
public void testStream3() throws Exception {
testStream();
}

@Test
public void testStream4() throws Exception {
testStream();
}


@Test
public void testStream() throws Exception {

final int numberOfElements = 50;

DataStream<Tuple2<String, Integer>> tupleStream = env.fromCollection(getCollectionOfBucketImps(numberOfElements));
CollectSink.values.clear();
tupleStream.addSink(new CollectSink());
env.execute();
sleep(2000);

assertEquals(numberOfElements, getCollectionOfBucketImps(numberOfElements).size());
assertEquals(numberOfElements, CollectSink.values.size());
}


public static List<Tuple2<String, Integer>> getCollectionOfBucketImps(int numberOfElements) throws InterruptedException {
List<Tuple2<String, Integer>> records = new ArrayList<>();
for (int i = 0; i < numberOfElements; i++) {
records.add(new Tuple2<>(Integer.toString(i % 10), i));
}
return records;
}

// create a testing sink
private static class CollectSink implements SinkFunction<Tuple2<String, Integer>> {

public static final List<Tuple2<String, Integer>> values = new ArrayList<>();

@Override
public synchronized void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
values.add(value);
}
}
}

例如,testStreamX 案例中的任何一个都会随机失败。

上下文:代码以 8 作为并行度设置运行,因为它运行的 cpu 有 8 个核心

最佳答案

我不知道你的工作的并行性(我想这是Flink可以分配的最大值)。看起来您可以在水槽的附加值上设置竞争条件。

解决方案

我已经运行了您的示例代码,将环境并行度设置为 1,一切正常。有关测试的文档示例使用此解决方案 link to documentation .

@Before
public void setup() {
env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
}

更好

您可以仅在接收器运算符上将并行度设置为 1,并保持管道其余部分的并行度。在下面的示例中,我添加了一个额外的映射函数,其映射运算符的强制并行度为 8。

public void testStream() throws Exception {

final int numberOfElements = 50;

DataStream<Tuple2<String, Integer>> tupleStream = env.fromCollection(getCollectionOfBucketImps(numberOfElements));
CollectSink.values.clear();
tupleStream
.map(new MapFunction<Tuple2<String,Integer>, Tuple2<String,Integer>>() {
@Override
public Tuple2<String,Integer> map(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {

stringIntegerTuple2.f0 += "- concat something";

return stringIntegerTuple2;
}
}).setParallelism(8)
.addSink(new CollectSink()).setParallelism(1);
env.execute();
sleep(2000);

assertEquals(numberOfElements, getCollectionOfBucketImps(numberOfElements).size());
assertEquals(numberOfElements, CollectSink.values.size());
}

关于java - 为什么 Apache Flink 从数据流中删除事件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49278579/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com