gpt4 book ai didi

apache-spark - Spark 结构化流式传输具有独特消息模式的多个 Kafka 主题

转载 作者:行者123 更新时间:2023-12-05 04:06:35 26 4
gpt4 key购买 nike

当前状态:

今天我构建了一个 Spark Structured Streaming 应用程序,它使用一个包含 JSON 消息的 Kafka 主题。嵌入在 Kafka 主题的值中包含一些关于消息字段的源和模式的信息。该消息的一个非常简化的版本如下所示:

{
"source": "Application A",
"schema": [{"col_name": "countryId", "col_type": "Integer"}, {"col_name": "name", "col_type": "String"}],
"message": {"countryId": "21", "name": "Poland"}
}

今天系统中有一些 Kafka 主题,我已经为每个主题部署了这个 Spark Structured Streaming 应用程序,使用 subscribe 选项。应用程序应用主题的独特模式(通过批量读取 Kafka 主题中的第一条消息并映射模式来破解)并以 Parquet 格式将其写入 HDFS。

期望状态:

我的组织很快就会开始制作越来越多的主题,我认为这种每个主题一个 Spark 应用程序的模式不会很好地扩展。最初似乎 subscribePattern 选项对我来说效果很好,因为这些主题在某种程度上具有层次结构形式,但现在我坚持应用架构并写入 HDFS 中的不同位置。

将来我们很可能会有数千个主题,但希望只有 25 个左右的 Spark 应用程序。

有没有人对如何实现这一点有建议?

最佳答案

与您的 kafka 生产者一起发送这些事件时,您还可以发送一个键和一个值。如果每个事件都有它的事件类型作为键,当从主题中读取流时,您还可以获得键:

val kafkaKvPair = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]

然后您可以过滤要处理的事件:

val events = kafkaKvPair
.filter(f => f._1 == "MY_EVENT_TYPE")

通过这种方式,如果您在一个 Spark 应用程序中订阅了多个主题,您可以处理任意数量的事件类型。

关于apache-spark - Spark 结构化流式传输具有独特消息模式的多个 Kafka 主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49781119/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com