gpt4 book ai didi

apache-spark - Apache Spark Streaming 中的定期广播

转载 作者:行者123 更新时间:2023-12-03 07:07:41 43 4
gpt4 key购买 nike

我正在实现一个用于文本分类的流学习器。我的实现中有一些单值参数需要在新流项到达时进行更新。例如,我想在做出新预测时改变学习率。但是,我怀疑是否有办法在初始广播后广播变量。那么如果我每次更新变量时都需要广播它,会发生什么情况呢?如果有一种方法可以做到这一点,或者有一个解决方法可以实现我想要在 Spark Streaming 中完成的任务,我会很高兴听到它。

提前致谢。

最佳答案

我通过在广播变量上创建一个包装类来实现此目的。包装类的 updateAndGet 方法返回刷新后的广播变量。我在 dStream.transform 中调用此函数 -> 根据 Spark 文档

http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation

转换操作指出:“提供的函数在每个批处理间隔中都会被调用。这允许您执行时变的 RDD 操作,即 RDD 操作、分区数、广播变量等。可以更改批处理之间。”

BroadcastWrapper 类看起来像:

public class BroadcastWrapper {
private Broadcast<ReferenceData> broadcastVar;
private Date lastUpdatedAt = Calendar.getInstance().getTime();

private static BroadcastWrapper obj = new BroadcastWrapper();

private BroadcastWrapper(){}

public static BroadcastWrapper getInstance() {
return obj;
}

public JavaSparkContext getSparkContext(SparkContext sc) {
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc);
return jsc;
}

public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){
Date currentDate = Calendar.getInstance().getTime();
long diff = currentDate.getTime()-lastUpdatedAt.getTime();
if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms
if (var != null)
var.unpersist();
lastUpdatedAt = new Date(System.currentTimeMillis());

//Your logic to refresh
ReferenceData data = getRefData();

var = getSparkContext(sparkContext).broadcast(data);
}
return var;
}
}

您可以在允许RDD-RDD转换的stream.transform方法中使用此广播变量updateAndGet函数

objectStream.transform(stream -> {

Broadcast<Object> var = BroadcastWrapper.getInstance().updateAndGet(stream.context());

/**Your code to manipulate stream **/
});

请参阅此帖子中我的完整答案:https://stackoverflow.com/a/41259333/3166245

希望对你有帮助

关于apache-spark - Apache Spark Streaming 中的定期广播,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28573816/

43 4 0