gpt4 book ai didi

java - 将spark sql 2.4.4数据帧中的Avro类型消息生成到Kafka

转载 作者:行者123 更新时间:2023-12-02 09:02:39 26 4
gpt4 key购买 nike

我正在尝试使用 Spark SQL 将 Avro 消息写入 Kafka。有人可以建议我如何在java中实现它吗?我找到了 scala 引用代码,但没有找到 Java。

我尝试过,但抛出错误,并且在哪里可以配置架构注册表。

aggr.selectExpr("CAST(order_id AS String) AS key", "to_avro(struct(*)) AS value").write().format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "aggr_topic").save();

或者请将 scala 代码复制到 java 。

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaURL)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryURL).as("key"),
from_avro($"value", "t-value", schemaRegistryURL).as("value"))

提前致谢。

最佳答案

该代码与 Java 中的代码完全相同,除了 val df

顺便说一下,

from_avro 仅存在于 databricks 环境中,无论如何,您都需要 writeStreamto_avro

另一种方法是使用foreachPartition将dataframe转换为RDD,然后手动创建一个新的KafkaProducer来发送事件

您可能还对 https://github.com/AbsaOSS/ABRiS 感兴趣

关于java - 将spark sql 2.4.4数据帧中的Avro类型消息生成到Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60053038/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com