gpt4 book ai didi

java - 如何在特定用例的reduceByKeyAndWindow()中实现invFunc

转载 作者:行者123 更新时间:2023-12-01 11:03:00 30 4
gpt4 key购买 nike

我正在使用 Spark Streaming 来处理文件流。多个文件批量到达并 Spark 处理所有文件中的数据。我的用途是获取后续批处理的文件中每条记录的总和。例如:

  • 键:key_1 值:10 --> 批处理1
  • 键:key_1 值:05 --> 批处理1
  • 键:key_1 值:19 --> 批处理2
  • 键:key_1 值:11 --> 批处理3
  • 键:key_1 值:10 --> 批处理4

我需要输出如下内容:

  • 处理第一批后,我需要输出 => key: key_1 val: 15
  • 处理第二批后,我需要输出 => key: key_1 val: 34
  • 处理完第三批后,我需要输出为 => key: key_1 val: 45
  • 处理完第 4 批后,我需要输出为 => key: key_1 val: 55
  • 处理完第 5 批后,我需要输出为 => key: key_1 val: 55

我使用 reduceByKeyAndWindow() 编写的 Spark 代码如下:

JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKeyAndWindow(GET_GRP_SUM, Durations.minutes(2*batchInterval), Durations.minutes(batchInterval));

private static final Function2<Summary, Summary, Summary> GET_GRP_SUM = new Function2<Summary, Summary, Summary>() {
private static final long serialVersionUID = 1L;

public Summary call(Summary s1, Summary s2) throws Exception {
try {

Summary s = new Summary();

long grpCnt = s1.getDelta() + s2.getDelta();
s.setDeltaSum(grpCnt);

return s;
} catch (Exception e) {
logger.error(" ==== error in CKT_GRP_SUM ==== :"+e);
return new Summary();
}
}

};

我从上述实现中得到的输出如下:

  • 处理第一批后,我得到输出 => key: key_1 value: 15
  • 处理第二批后,我得到输出 => key: key_1 value: 34
  • 处理第三批后,我得到输出 => key: key_1 value: 30
  • 处理第四批后,我得到输出 => key: key_1 value: 21
  • 处理第 5 批后,我得到输出 => key: key_1 value: 10

根据reduceByKeyAndWindow()的输出,它似乎正在计算先前批处理数据和当前批处理数据的聚合。但我的要求是对上一批的聚合数据和当前批处理的数据进行聚合。所以按照上面的例子在第 4 批和第 5 批结束时,它应该输出为 [(((15)+19)+11)+10 = 55]。

我读到关于reduceByKeyAndWindow()invFunc可以实现以获得预期的输出。我尝试实现它类似于GET_GRP_SUM,但它没有给我预期的结果。任何有关正确实现以获得所需输出的帮助将不胜感激。

我正在使用 java 1.8.45 和 Spark 版本 1.4.1 以及 hadoop 版本 2.7.1 。

我使用reduceByKeyAndWindow()在invFunc上的实现

JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKeyAndWindow(GET_GRP_SUM, INV_GET_GRP_SUM, Durations.minutes(2*batchInterval), Durations.minutes(batchInterval));

private static final Function2<Summary, Summary, Summary> INV_GET_GRP_SUM = new Function2<Summary, Summary, Summary>() {
private static final long serialVersionUID = 1L;

public Summary call(Summary s1, Summary s2) throws Exception {
try {

Summary s = new Summary();

long grpCnt = s1.getDelta() + s2.getDelta();
s.setDeltaSum(grpCnt);

return s;

} catch (Exception e) {
logger.error(" ==== error in INV_GET_GRP_SUM ==== :"+e);
return new Summary();
}
}
};

我已经像上面一样实现了 invFunc,但这没有给我预期的输出。我在这里分析的是 s1 和 s2 给我以前批处理的聚合值,我认为我不太确定。

我尝试更改我的 invFunc 实现,如下所示:

private static final Function2<Summary, Summary, Summary> INV_GET_GRP_SUM = new Function2<Summary, Summary, Summary>() {
private static final long serialVersionUID = 1L;

public Summary call(Summary s1, Summary s2) throws Exception {
try {

return s1;

} catch (Exception e) {
logger.error(" ==== error in INV_GET_GRP_SUM ==== :"+e);
return new Summary();
}
}
};

这个实现给了我预期的输出。但我面临的问题是,reduceByKeyAndWindow() 和 invFunc 不会自动删除旧的键。我又浏览了几篇文章,发现我需要编写自己的过滤函数,该函数将删除具有 0 值(无值)的旧键。

我再次不确定如何编写过滤函数来删除具有 0 值(无值)的旧键,因为我没有具体了解 s1 和 s2 返回到 INV_GET_GRP_SUM 的内容。

最佳答案

使用UpdateStateByKey

您查看了吗updateStateByKey()来自流媒体 API?它允许您在批处理间隔之间维护键值对的状态,不断用与其关联的新信息(值)更新每个键。这非常适合您的用例,因为数据的先前状态将包含每个键的聚合总和,直到最新状态。有关此功能的更多信息可以在其用法 here 中找到在一个例子中here .

关于该函数的一个注意事项是它需要启用检查点,以便可以在每次迭代时保存状态。

(编辑:)

使用ReduceByKeyAndWindow

关于使用 reduceKeyAndWindow()call() 的第二个参数普通 funcinvFunc 的方法分别是添加新元素和减去旧元素。本质上,您是通过从新的时间片添加元素(您正在使用 GET_GRP_SUM 执行此操作)并从旧时间片中减去元素(您没有使用 INV_GET_GRP_SUM 执行此操作)来实现此窗口化减少。请注意,在第一次尝试中,您将旧值重新添加回当前窗口中的值,而在第二次尝试中,您将忽略移出窗口的值。

要从移出窗口的元素中减去旧值,您可能需要 INV_GET_GRP_SUM有类似于下面的逻辑(并且可以找到类似的正确实现 here ):

public Summary call(Summary s1, Summary s2) throws Exception {
try {

long grpCnt = s1.getDelta() - s2.getDelta();
s.setDeltaSum(grpCnt);

} catch (Exception e) {
logger.error(" ==== error in INV_GET_GRP_SUM ==== :"+e);
return new Summary();
}
}

对于您的另一个问题,似乎确实有一种方法可以过滤掉过期的 key ,并且正如您所提到的,它确实涉及编写过滤器函数。从 API 可以看出,此过滤器函数接受您的键值对并返回一个 boolean 值,该 boolean 值将设置为 true (如果您想保留该对)或 false (如果您想删除该对)。在这种情况下,由于您希望在值为零时删除您的对,因此您可以执行以下操作:

private static final Function<scala.Tuple2<String, Summary>, Boolean> FILTER_EXPIRED = new Function<scala.Tuple2<String, Summary>, Boolean>() {
public Boolean call(scala.Tuple2<String, Summary> s) {
return s.productElement(1) > 0;
}
}

然后你可以将其传递到你的reduceByKeyAndWindow()中函数(请注意,您应该在此处传入分区参数来确定 DStream 中的 RDD 将使用多少个分区):

JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKeyAndWindow(GET_GRP_SUM, INV_GET_GRP_SUM, Durations.minutes(2*batchInterval), Durations.minutes(batchInterval), partitions, FILTER_EXPIRED);

关于java - 如何在特定用例的reduceByKeyAndWindow()中实现invFunc,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33197680/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com