gpt4 book ai didi

apache-spark - Spark Streaming - 基于过滤器参数分割输入流的最佳方法

转载 作者:行者123 更新时间:2023-11-30 08:41:01 25 4
gpt4 key购买 nike

我目前尝试创建某种监控解决方案 - 将一些数据写入 kafka,然后我使用 Spark Streaming 读取这些数据并进行处理。

为了预处理机器学习和异常检测的数据,我想根据一些过滤器参数分割流。到目前为止,我了解到 DStream 本身不能拆分为多个流。

我主要面临的问题是许多算法(如 KMeans)仅采用连续数据,而不是离散数据,例如url 或其他字符串。

我的理想要求是:

  • 从 kafka 读取数据并根据读取的内容生成字符串列表
  • 根据该字符串列表生成多个流 -(分割流、过滤流或任何最佳实践)
  • 使用这些流为每个流训练不同的模型以获得基线,然后将随后出现的所有内容与基线进行比较

我很高兴收到任何有关如何解决我的问题的建议。我无法想象 Spark 中没有涵盖这种情况 - 但是直到现在我还没有找到可行的解决方案。

最佳答案

我认为使用过滤器和映射从原始 DStream 创建派生 DStream 应该足够了:

val numericFeaturesDStream = originalDStream.filter(e => predicate(e)).map(e => extractFeatures(e))
val otherNumericFeaturesDStream = originalDStream.filter(e => predicate2(e)).map(e => extractOtherFeatures(e))

请注意,这些 filtermap 步骤可以合并在一个 collect 步骤中(不要与无参数 RDD.collect 混淆)将数据传送给驱动程序!!!)

val featuresStream = originalDStream.transform(rdd => 
rdd.collect{case (a,b,c,d,e) if c=="client" => Vectors.parse(a)}
)
streamingKMeans.trainOn(featuresStream)

我们还可以将一组动态的过滤 DStream 保存到某个集合中。这里我们使用一个包含我们用来过滤的键的映射:

originalDStream.cache() // important for performance when a DStream is branched out.
// filterKeys: Set[String]
val dstreamByFilterKey = filterKeys.map(key => key -> originalDStream.filter(e => (getKey(e)==key)))
// do something with the different DStreams in this structure ...

这些片段是需要用实际逻辑来完成的代码示例。

关于apache-spark - Spark Streaming - 基于过滤器参数分割输入流的最佳方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44902723/

25 4 0