gpt4 book ai didi

scala - 如何将数据集写入Kafka主题?

转载 作者:行者123 更新时间:2023-12-03 00:33:07 25 4
gpt4 key购买 nike

我使用的是 Spark 2.1.0 和 Kafka 0.9.0。

我正在尝试将批处理 Spark 作业的输出推送到 kafka。该作业应该每小时运行一次,但不是流式运行。

在网上寻找答案时,我只能找到 kafka 与 Spark 流的集成,而没有找到与批处理作业集成的信息。

有人知道这样的事情是否可行吗?

谢谢

更新:

正如用户8371915所提到的,我尝试遵循Writing the output of Batch Queries to Kafka中所做的事情.

我使用了 Spark shell:

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

这是我尝试过的简单代码:

val df = Seq(("Rey", "23"), ("John", "44")).toDF("key", "value")
val newdf = df.select(to_json(struct(df.columns.map(column):_*)).alias("value"))
newdf.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "alerts").save()

但我收到错误:

java.lang.RuntimeException: org.apache.spark.sql.kafka010.KafkaSourceProvider does not allow create table as select.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:497)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
... 50 elided

知道这与什么有关吗?

谢谢

最佳答案

tl;dr 您使用过时的 Spark 版本。 2.2 及更高版本中启用写入。

开箱即用,您可以使用 Kafka SQL 连接器(与结构化流处理相同)。包括

  • spark-sql-kafka 在您的依赖项中。
  • 将数据转换为至少包含 StringTypeBinaryType 类型的 value 列的 DataFrame
  • 将数据写入Kafka:

    df   
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers", server)
    .save()

关注Structured Streaming docs了解详细信息(以 Writing the output of Batch Queries to Kafka 开头)。

关于scala - 如何将数据集写入Kafka主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49694107/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com