gpt4 book ai didi

java - 如何更新 Spark 流中的广播变量?

转载 作者:IT老高 更新时间:2023-10-28 21:15:18 27 4
gpt4 key购买 nike

我相信,我有一个相对常见的 Spark 流用例:

我有一个对象流,我想根据一些引用数据进行过滤

最初,我认为使用广播变量来实现这将是一件非常简单的事情:

public void startSparkEngine {
Broadcast<ReferenceData> refdataBroadcast
= sparkContext.broadcast(getRefData());

final JavaDStream<MyObject> filteredStream = objectStream.filter(obj -> {
final ReferenceData refData = refdataBroadcast.getValue();
return obj.getField().equals(refData.getField());
}

filteredStream.foreachRDD(rdd -> {
rdd.foreach(obj -> {
// Final processing of filtered objects
});
return null;
});
}

但是,尽管很少,我的引用数据会定期更改

我的印象是我可以在驱动程序上修改和重新广播我的变量,它会传播到每个 worker ,但是 Broadcast对象不是 Serializable并且需要是final .

我有什么选择?我能想到的三个解决方案是:

  1. 将引用数据查找移动到 forEachPartitionforEachRdd所以它完全取决于 worker 。然而,引用数据存在于 REST API 中,因此我还需要以某种方式存储一个计时器/计数器,以停止远程访问流中的每个元素。

  2. 每次 refdata 更改时都使用新的广播变量重新启动 Spark 上下文。

  3. 将引用数据转换为RDD,然后join以这样的方式流式传输,我现在正在流式传输 Pair<MyObject, RefData> ,尽管这会将引用数据与每个对象一起发送。

最佳答案

通过@Rohan Aletty 扩展答案。这是一个基于 ttl 刷新广播变量的 BroadcastWrapper 的示例代码

public class BroadcastWrapper {

private Broadcast<ReferenceData> broadcastVar;
private Date lastUpdatedAt = Calendar.getInstance().getTime();

private static BroadcastWrapper obj = new BroadcastWrapper();

private BroadcastWrapper(){}

public static BroadcastWrapper getInstance() {
return obj;
}

public JavaSparkContext getSparkContext(SparkContext sc) {
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
return jsc;
}

public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
Date currentDate = Calendar.getInstance().getTime();
long diff = currentDate.getTime()-lastUpdatedAt.getTime();
if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
if (var != null)
var.unpersist();
lastUpdatedAt = new Date(System.currentTimeMillis());

//Your logic to refresh
ReferenceData data = getRefData();

var = getSparkContext(sparkContext).broadcast(data);
}
return var;
}
}

您的代码如下所示:

public void startSparkEngine() {

final JavaDStream<MyObject> filteredStream = objectStream.transform(stream -> {
Broadcast<ReferenceData> refdataBroadcast = BroadcastWrapper.getInstance().updateAndGet(stream.context());

stream.filter(obj -> obj.getField().equals(refdataBroadcast.getValue().getField()));
});

filteredStream.foreachRDD(rdd -> {
rdd.foreach(obj -> {
// Final processing of filtered objects
});
return null;
});
}

这对我也适用于多集群。希望这会有所帮助

关于java - 如何更新 Spark 流中的广播变量?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33372264/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com