gpt4 book ai didi

java - 使用 Flink 时,字数统计数字总是在变化

转载 作者:行者123 更新时间:2023-12-02 08:52:21 27 4
gpt4 key购买 nike

我正在尝试使用 flink 创建字数统计示例。这是link对于单词数据(这是来自 flink github 帐户的示例)

当我用简单的java程序计算单词时:

public static void main(String[] args) throws Exception {
int count = 0;
for (String eachSentence : WordCountData.WORDS){
String[] splittedSentence = eachSentence.toLowerCase().split("\\W+");
for (String eachWord: splittedSentence){
count++;
}
}
System.out.println(count);
// result is 287
}

现在,当我使用 flink 执行此操作时,首先我会将句子拆分为单词。

DataStream<Tuple2<String, Integer>> readWordByWordStream = splitSentenceWordByWord(wordCountDataSource);

//...
public DataStream<Tuple2<String, Integer>> splitSentenceWordByWord(DataStream<String> wordDataSourceStream)
{
DataStream<Tuple2<String, Integer>> wordByWordStream = wordDataSourceStream.flatMap(new TempTransformation());
return wordByWordStream;
}

  • 这是我的TempTransformation类:
public class TempTransformation extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

@Override
public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception
{
String[] splittedSentence = input.toLowerCase().split("\\W+");
for (String eachWord : splittedSentence)
{
collector.collect(new Tuple2<String, Integer>(eachWord, 1));
}
}
}
  • 现在我将通过将其转换为 KeyedStream(按单词键入)来计算单词数
    public SingleOutputStreamOperator<String> keyedStreamExample(DataStream<Tuple2<String, Integer>> wordByWordStream)
{
return wordByWordStream.keyBy(0).timeWindow(Time.milliseconds(1)).apply(new TempWindowFunction());
}
  • TempWindowFunction():
public class TempWindowFunction extends RichWindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow> {
private Logger logger = LoggerFactory.getLogger(TempWindowFunction.class);
private int count = 0;
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception
{
logger.info("Key is:' {} ' and collected element for that key and count: {}", (Object) tuple.getField(0), count);
StringBuilder builder = new StringBuilder();
for (Tuple2 each : input)
{
String key = (String) each.getField(0);
Integer value = (Integer) each.getField(1);
String tupleStr = "[ " + key + " , " + value + "]";
builder.append(tupleStr);
count ++;
}
logger.info("All tuples {}", builder.toString());
logger.info("Exit method");
logger.info("----");
}
}
  • 在 Flink 的本地环境中运行此作业后,输出始终在变化,以下是一些示例:
18:09:40,086 INFO  com.sampleFlinkProject.transformations.TempWindowFunction     - Key is:' rub ' and collected element for that key and count: 86
18:09:40,086 INFO TempWindowFunction - All tuples [ rub , 1]
18:09:40,086 INFO TempWindowFunction - Exit method
18:09:40,086 INFO TempWindowFunction - ----
18:09:40,086 INFO TempWindowFunction - Key is:' for ' and collected element for that key and count: 87
18:09:40,086 INFO TempWindowFunction - All tuples [ for , 1]
18:09:40,086 INFO TempWindowFunction - Exit method
18:09:40,086 INFO TempWindowFunction - ----

// another running outputs:

18:36:21,660 INFO TempWindowFunction - Key is:' for ' and collected element for that key and count: 103
18:36:21,660 INFO TempWindowFunction - All tuples [ for , 1]
18:36:21,660 INFO TempWindowFunction - Exit method
18:36:21,660 INFO TempWindowFunction - ----
18:36:21,662 INFO TempWindowFunction - Key is:' coil ' and collected element for that key and count: 104
18:36:21,662 INFO TempWindowFunction - All tuples [ coil , 1]
18:36:21,662 INFO TempWindowFunction - Exit method
18:36:21,662 INFO TempWindowFunction - ----
  • 最后,这是执行设置
//...
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
//...
  • 为什么 Flink 每次执行都会给出不同的输出?

最佳答案

应用程序中不确定性的一个来源是处理时间窗口(1 毫秒长)。每当您使用处理时间进行窗口处理时,窗口最终都会包含在该时间间隔内发生并得到处理的所有事件。 (事件时间窗口确实具有确定性,因为它们基于事件中的时间戳。)窗口如此短会夸大这种效果。

关于java - 使用 Flink 时,字数统计数字总是在变化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60694613/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com