gpt4 book ai didi

apache-spark - spark结构化流动态字符串过滤器

转载 作者:行者123 更新时间:2023-12-01 04:39:43 24 4
gpt4 key购买 nike

我们正在尝试对结构化流应用程序使用动态过滤器。

假设我们有以下 Spark 结构化流应用程序的伪实现:



spark.readStream()
.format("kafka")
.option(...)
...
.load()
.filter(getFilter()) <-- dynamic staff - def filter(conditionExpr: String):
.writeStream()
.format("kafka")
.option(.....)
.start();

和 getFilter 返回字符串

String getFilter() {
// dynamic staff to create expression
return expression; // eg. "column = true";
}

在当前版本的 Spark 中是否有可能具有动态过滤条件?我的意思是 getFilter()方法应该动态返回过滤条件(假设它每 10 分钟刷新一次)。我们试图研究广播变量,但不确定结构化流媒体是否支持这样的事情。

提交作业后似乎无法更新作业的配置。作为部署,我们使用 yarn .

每个建议/选项都受到高度赞赏。

编辑:
假设 getFilter()返回:

(columnA = 1 AND columnB = true) OR customHiveUDF(columnC, 'input') != 'required' OR columnD > 8

10 分钟后,我们可以有小的变化(在第一个 OR 之前没有第一个表达式),并且可能我们可以有一个新的表达式( columnA = 2 ),例如:

customHiveUDF(columnC, 'input') != 'required' OR columnD > 10 OR columnA = 2

目标是为一个 Spark 应用程序设置多个过滤器,并且不要提交多个作业。

最佳答案

广播变量在这里应该没问题。您可以编写类型化过滤器,例如:

query.filter(x => x > bv.value).writeStream(...)

其中 bv 是 Broadcast多变的。您可以按照此处所述进行更新: How can I update a broadcast variable in spark streaming?

其他解决方案是提供即 RCP 或 RESTful 端点并每 10 分钟询问此端点。例如(Java,因为这里更简单):
class EndpointProxy {

Configuration lastValue;
long lastUpdated
public static Configuration getConfiguration (){

if (lastUpdated + refreshRate > System.currentTimeMillis()){
lastUpdated = System.currentTimeMillis();
lastValue = askMyAPI();
}
return lastValue;
}
}


query.filter (x => x > EndpointProxy.getConfiguration().getX()).writeStream()

编辑:用户问题的hacky解决方法:

您可以创建 1 行 View :
//confsDF 应该在一些驱动程序端的单例中
var confsDF = Seq(some content).toDF("someColumn")
and then use:
query.crossJoin(confsDF.as("conf")) // cross join as we have only 1 value
.filter("hiveUDF(conf.someColumn)")
.writeStream()...

new Thread() {
confsDF = Seq(some new data).toDF("someColumn)
}.start();

此 hack 依赖于 Spark 默认执行模型 - 微批次。在每个触发器中都会重新构建查询,因此应考虑新数据。

您还可以在线程中执行以下操作:
Seq(some new data).toDF("someColumn).createOrReplaceTempView("conf")

然后在查询中:
.crossJoin(spark.table("conf"))

两者都应该工作。请记住,它不适用于连续处理模式

关于apache-spark - spark结构化流动态字符串过滤器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50906604/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com