gpt4 book ai didi

scala - 如何将 Spark 数据帧写入 Kinesis Stream?

转载 作者:行者123 更新时间:2023-12-02 19:56:20 25 4
gpt4 key购买 nike

我正在使用 Spark 流从 kafka 主题创建数据框。我想将 Dataframe 写入 Kinesis Producer。我知道目前还没有官方的 API。但是互联网上有多个可用的 API,但遗憾的是,它们都不适合我。星火版本:2.2斯卡拉:2.11

我尝试使用 https://github.com/awslabs/kinesis-kafka-connector并 build jar 。但是由于此 jar 和 spark API 之间的包名称冲突而出现错误。请帮忙。

########### 这里是别人的代码:
spark-shell --jars spark-sql-kinesis_2.11-2.2.0.jar,spark-sql-kafka-0-10_2.11-2.1.0.jar,spark-streaming-kafka-0-10-assembly_2.10-2.1.0.jar --files kafka_client_jaas_spark.conf --properties-file gobblin_migration.conf --conf spark.port.maxRetries=100 --driver-java-options "-Djava.security.auth.login.config=kafka_client_jaas_spark.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas_spark.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas_spark.conf"

import java.io.File
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.spark.SparkException
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import scala.sys.process._
import org.apache.log4j.{ Logger, Level, LogManager, PropertyConfigurator }
import org.apache.spark.sql.streaming.Trigger

val streamingInputDF =spark.readStream.format("kafka").option("kafka.bootstrap.servers","bootstrap server").option("subscribe", "<kafkatopic>").option("startingOffsets", "latest").option("failOnDataLoss", "false").option("kafka.security.protocol", "SASL_PLAINTEXT").load()

val xdf=streamingInputDF.select(col("partition").cast("String").alias("partitionKey"),col("value").alias("data"))

xdf.writeStream.format("kinesis").option("checkpointLocation", "<hdfspath>").outputMode("Append").option("streamName", "kinesisstreamname").option("endpointUrl","kinesisendpoint").option("awsAccessKeyId", "accesskey").option("awsSecretKey","secretkey").start().awaitTermination()

对于 jar spark-sql-kinesis_2.11-2.2.0.jar,转到 quoble ,下载适用于您的 spark 版本的包,构建 jar。

如果您在公司网络后面,请在启动 spark 之前设置代理。导出 http_proxy= http://server-ip:port/导出 https_proxy= https://server-ip:port/

最佳答案

Kafka Connect 是一项服务,您可以向其发布连接器规范(在本例中为 kinesis),然后由该服务负责运行连接器。它在处理记录时也支持很多转换。 Kafka Connect 插件不适用于 Spark 应用程序。

如果您的用例要求您在处理记录时执行一些业务逻辑,那么您可以选择 Spark Streaming 或 Structured Streaming 方法。

如果您想采用基于 Spark 的方法,以下是我能想到的 2 个选项。

  1. 使用结构化流媒体。您可以为 Kinesis 使用 Strucuted 流连接器。你可以找到一个 here .可能还有其他人。这是我所知道的唯一稳定的开源连接器。您可以找到使用 Kinesis 作为接收器的示例 here .

  2. 使用 Kinesis Producer Libraryaws-java-sdk-kinesis从您的 Spark Streaming 应用程序发布记录的库。此处使用 KPL 是首选方法。您可以执行 mapPartitions 并为每个分区创建一个 Kinesis 客户端,并使用这些库发布记录。 AWS 文档中有大量关于这 2 个库的示例。

关于scala - 如何将 Spark 数据帧写入 Kinesis Stream?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56942802/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com