gpt4 book ai didi

apache-spark - 如何使用 pyspark 和自定义 python 函数处理 eventhub 流

转载 作者:行者123 更新时间:2023-12-02 02:56:06 27 4
gpt4 key购买 nike

我当前的设置是:

  • Spark 2.3.0 和 pyspark 2.2.1
  • 使用 Azure IOTHub/EventHub 的流媒体服务
  • 一些基于pandas、matplotlib等的自定义python函数

我正在使用 https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark-jupyter.md作为如何读取数据的示例,但是:

  • 不能使用 foreach sink,因为它没有在 python 中实现
  • 当我尝试调用 .rdd、.map 或 .flatMap 时出现异常:“必须使用 writeStream.start() 执行带有流源的查询”

获取流的每个元素并通过 python 函数传递它的正确方法是什么?

谢谢,

埃德

最佳答案

在第一步中,您定义一个数据帧,从您的 EventHub 或 IoT-Hub 中读取数据作为流:

from pyspark.sql.functions import *

df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load()

数据以二进制形式存储在 body 属性中。要获取正文的元素,您必须定义结构:

from pyspark.sql.types import *

Schema = StructType([StructField("name", StringType(), True),
StructField("dt", LongType(), True),
StructField("main", StructType(
[StructField("temp", DoubleType()),
StructField("pressure", DoubleType())])),
StructField("coord", StructType(
[StructField("lon", DoubleType()),
StructField("lat", DoubleType())]))
])

并将架构应用于转换为字符串的主体:

from pyspark.sql.functions import *

rawData = df. \
selectExpr("cast(Body as string) as json"). \
select(from_json("json", Schema).alias("data")). \
select("data.*")

在生成的数据框中,您可以应用函数,例如。 G。在“名称”列上调用自定义函数 u_make_hash:

 parsedData=rawData.select('name', u_make_hash(rawData['name']).alias("namehash"))  

关于apache-spark - 如何使用 pyspark 和自定义 python 函数处理 eventhub 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49365852/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com