gpt4 book ai didi

apache-flink - Flink : How to handle external app configuration changes in flink

转载 作者:行者123 更新时间:2023-12-04 04:38:15 28 4
gpt4 key购买 nike

我的要求是在一天内流式传输数百万条记录,并且它对外部配置参数有很大的依赖性。例如,用户可以随时在 Web 应用程序中更改所需的设置,并且在进行更改后,必须使用新的应用程序配置参数进行流式传输。这些是应用程序级别的配置,我们还有一些动态排除参数,每个数据都必须通过和过滤。

我看到 flink 没有在所有任务管理器和子任务之间共享的全局状态。拥有一个集中式缓存是一种选择,但对于每个参数,我都必须从缓存中读取它,这会增加延迟。请就处理此类场景的更好方法以及其他应用程序如何处理它提出建议。谢谢。

最佳答案

更新正在运行的流应用程序的配置是常见的要求。在 Flink 的 DataStream API 中,这可以使用所谓的 CoFlatMapFunction 来完成。它处理两个输入流。其中一个流可以是数据流,另一个可以是控制流。

以下示例展示了如何动态调整过滤掉超过特定长度的字符串的用户函数。

val data: DataStream[String] = ???
val control: DataStream[Int] = ???

val filtered: DataStream[String] = data
// broadcast all control messages to the following CoFlatMap subtasks
.connect(control.broadcast)
// process data and control messages
.flatMap(new DynLengthFilter)


class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] {

var length = 0

// filter strings by length
override def flatMap1(value: String, out: Collector[String]): Unit = {
if (value.length < length) {
out.collect(value)
}
}

// receive new filter length
override def flatMap2(value: Int, out: Collector[String]): Unit = {
length = value
}

override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length

override def restoreState(state: Int): Unit = {
length = state
}
}
DynLengthFilter用户函数实现 Checkpointed过滤器长度接口(interface)。如果发生故障,此信息会自动恢复。

关于apache-flink - Flink : How to handle external app configuration changes in flink,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39693919/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com