gpt4 book ai didi

java - 通过流功能流时间戳

转载 作者:行者123 更新时间:2023-11-30 10:37:09 25 4
gpt4 key购买 nike

每次使用 Spark Streaming 运行批处理时,如何/是否可能生成随机数或获取系统时间?

我有两个处理一批消息的函数:1 - 首先处理 key ,创建一个文件 (csv) 并写入 header 2 - 第二个处理每条消息并将数据添加到 csv

我希望将每个批处理的文件存储在单独的文件夹中:

/output/folderBatch1/file1.csv, file2.csv, etc.csv
/output/folderBatch2/file1.csv, file2.csv, etc.csv
/output/folderBatch3/file1.csv, file2.csv, etc.csv

如何创建一个变量,即使只是一个 Spark Streaming 可以使用的简单计数器?

下面的代码获取系统时间,但因为它是“普通 Java”,所以它只执行一次,并且在批处理的每次运行中都是相同的值。

JavaPairInputDStream<String, byte[]> messages;
messages = KafkaUtils.createDirectStream(
jssc,
String.class,
byte[].class,
StringDecoder.class,
DefaultDecoder.class,
kafkaParams,
topicsSet
);

/**
* Declare what computation needs to be done
*/
JavaPairDStream<String, Iterable<byte[]>> groupedMessages = messages.groupByKey();

String time = Long.toString(System.currentTimeMillis()); //this is only ever run once and is the same value for each batch!

groupedMessages.map(new WriteHeaders(time)).print();

groupedMessages.map(new ProcessMessages(time)).print();

谢谢,KA.

最佳答案

您可以通过额外的 map 添加时间戳呼唤并顺其自然。这意味着不是 Iterable<byte[]> 类型的值,您的值为 Tuple2<Long, Iterable<byte[]>) :

JavaDStream<Tuple2<String, Tuple2<Long, Iterable<byte[]>>>> groupedWithTimeStamp = 
groupedMessages
.map((Function<Tuple2<String, Iterable<byte[]>>,
Tuple2<String, Tuple2<Long, Iterable<byte[]>>>>) kvp ->
new Tuple2<>(kvp._1, new Tuple2<>(System.currentTimeMillis(), kvp._2)));

现在您在每个 map 中都有时间戳从现在开始,您可以通过以下方式访问它:

groupedWithTimeStamp.map(value -> value._2._1); // This will access the timestamp.

关于java - 通过流功能流时间戳,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40238160/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com