gpt4 book ai didi

apache-spark - Spark Structured Streaming with Kafka 不支持startingOffset ="earliest"

转载 作者:行者123 更新时间:2023-12-04 21:29:40 24 4
gpt4 key购买 nike

我已经设置了 Spark Structured Streaming (Spark 2.3.2) 来读取 Kafka (2.0.0)。如果消息在 Spark 流作业开始之前进入主题,我将无法从主题的开头开始消费。 Spark 流的这种预期行为是否会忽略在 Spark Stream 作业初始运行之前生成的 Kafka 消息(即使使用 .option("stratingOffsets","earliest"))?

重现步骤

  • 在开始流式作业之前,创建 test主题(单个代理,单个分区)并为该主题生成消息(在我的示例中为 3 条消息)。
  • 使用以下命令启动 spark-shell:spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2.3.1.0.0-78 --repositories http://repo.hortonworks.com/content/repositories/releases/
  • 执行下面的 spark scala 代码。

  • // Local
    val df = spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9097")
    .option("failOnDataLoss","false")
    .option("stratingOffsets","earliest")
    .option("subscribe", "test")
    .load()

    // Sink Console
    val ds = df.writeStream.format("console").queryName("Write to console")
    .trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 second"))
    .start()

    预期与实际输出

    我希望流从 offset=1 开始。但是,它从 offset=3 开始读取。可以看到kafka客户端实际上是在重置起始偏移量: 2019-06-18 21:22:57 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
    我可以看到 Spark 流处理我在启动流作业后生成的消息。

    Spark 流的这种预期行为是否会忽略在 Spark Stream 作业初始运行之前生成的 Kafka 消息(即使使用 .option("stratingOffsets","earliest") )?
    2019-06-18 21:22:57 INFO  AppInfoParser:109 - Kafka version : 2.0.0.3.1.0.0-78
    2019-06-18 21:22:57 INFO AppInfoParser:110 - Kafka commitId : 0f47b27cde30d177
    2019-06-18 21:22:57 INFO MicroBatchExecution:54 - Starting new streaming query.
    2019-06-18 21:22:57 INFO Metadata:273 - Cluster ID: LqofSZfjTu29BhZm6hsgsg
    2019-06-18 21:22:57 INFO AbstractCoordinator:677 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Discovered group coordinator localhost:9097 (id: 2147483647 rack: null)
    2019-06-18 21:22:57 INFO ConsumerCoordinator:462 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Revoking previously assigned partitions []
    2019-06-18 21:22:57 INFO AbstractCoordinator:509 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] (Re-)joining group
    2019-06-18 21:22:57 INFO AbstractCoordinator:473 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Successfully joined group with generation 1
    2019-06-18 21:22:57 INFO ConsumerCoordinator:280 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Setting newly assigned partitions [test-0]
    2019-06-18 21:22:57 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
    2019-06-18 21:22:58 INFO KafkaSource:54 - Initial offsets: {"test":{"0":3}}
    2019-06-18 21:22:58 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
    2019-06-18 21:22:58 INFO MicroBatchExecution:54 - Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1560910978083,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider))
    2019-06-18 21:22:58 INFO KafkaSource:54 - GetBatch called with start = None, end = {"test":{"0":3}}

    Spark 批处理模式

    我能够确认批处理模式从头开始读取 - 所以 Kafka 保留配置没有问题

    val df = spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9097")
    .option("subscribe", "test")
    .load()

    df.count // Long = 3

    最佳答案

    哈哈这是一个简单的错字:“stratingOffsets”应该是“startingOffsets”

    关于apache-spark - Spark Structured Streaming with Kafka 不支持startingOffset ="earliest",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56659273/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com