gpt4 book ai didi

apache-spark - 2 个具有相同消费者组 ID 的 Spark Stream 作业

转载 作者:行者123 更新时间:2023-12-02 20:23:51 25 4
gpt4 key购买 nike

我正在尝试对消费者群体进行实验

这是我的代码片段

public final class App {

private static final int INTERVAL = 5000;

public static void main(String[] args) throws Exception {

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "xxx:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", true);
kafkaParams.put("auto.commit.interval.ms","1000");
kafkaParams.put("security.protocol","SASL_PLAINTEXT");
kafkaParams.put("sasl.kerberos.service.name","kafka");
kafkaParams.put("retries","3");
kafkaParams.put(GROUP_ID_CONFIG,"mygroup");
kafkaParams.put("request.timeout.ms","210000");
kafkaParams.put("session.timeout.ms","180000");
kafkaParams.put("heartbeat.interval.ms","3000");
Collection<String> topics = Arrays.asList("venkat4");

SparkConf conf = new SparkConf();
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(INTERVAL));


final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);

stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
return new Tuple2<>(record.key(), record.value());
}
}).print();


ssc.start();
ssc.awaitTermination();


}

}

当我同时运行两个 Spark 流作业时,它失败并出现错误

Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition venkat4-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315) at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) at scala.Option.orElse(Option.scala:289)

根据这个https://www.wisdomjobs.com/e-university/apache-kafka-tutorial-1342/apache-kafka-consumer-group-example-19004.html创建具有同一组的单独的 kafka 消费者实例将创建分区的重新平衡。我相信消费者不会容忍这种重新平衡。我该如何解决这个问题

下面是使用的命令

SPARK_KAFKA_VERSION=0.10 Spark2-submit --num-executors 2 --master yarn --deploy-mode client --files jaas.conf#jaas.conf,hive.keytab#hive.keytab --driver-java-options “-Djava.security.auth.login.config=./jaas.conf”--class Streaming.App --conf“spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf “--conf Spark.streaming.kafka.consumer.cache.enabled=false 1-1.0-SNAPSHOT.jar

最佳答案

Per this https://www.wisdomjobs.com/e-university/apache-kafka-tutorial-1342/apache-kafka-consumer-group-example-19004.html creation of separate instance of kafka consumer with same group will create a rebalance of partitions. I believe the rebalance is not being tolerated by the consumer. How should I fix this

enter image description here

现在所有分区仅由一个消费者使用。如果数据摄取率很高,消费者以摄取速度消费数据可能会很慢。

enter image description here

在同一个consumergroup中添加更多consumer来消费某个topic的数据,提高消费率。 Spark Streaming 使用这种方法在 Kafka 分区和 Spark 分区之间实现 1:1 并行度。 Spark 将在内部处理它。

如果消费者的数量多于主题分区的数量,那么它将处于空闲状态并且资源未得到充分利用。始终建议消费者应小于或等于分区数。

Kafka will re-balance, if more processes/threads are added. The ZooKeeper can be reconfigured by Kafka cluster, if any consumer or broker fails to send heartbeat to ZooKeeper.

每当任何代理发生故障或向现有主题添加新分区时,Kafka 都会重新平衡分区存储。这是 kafka 特有的如何平衡代理中各个分区的数据。

Spark 流在 Kafka 分区和 Spark 分区之间提供简单的 1:1 并行性。如果您没有使用 ConsumerStraties.Assign 提供任何分区详细信息,则从给定主题的所有分区进行消费。

Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group. Kafka guarantees that a message is only ever read by a single consumer in the group.

当您启动第二个 Spark Streaming 作业时,另一个消费者尝试使用同一消费者组 ID 中的同一分区。所以它会抛出错误。

val alertTopics = Array("testtopic")

val kafkaParams = Map[String, Object](
"bootstrap.servers" -> sparkJobConfig.kafkaBrokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> sparkJobConfig.kafkaConsumerGroup,
"auto.offset.reset" -> "latest"
)

val streamContext = new StreamingContext(sparkContext, Seconds(sparkJobConfig.streamBatchInterval.toLong))

val streamData = KafkaUtils.createDirectStream(streamContext, PreferConsistent, Subscribe[String, String](alertTopics, kafkaParams))

如果您想使用特定于分区的 Spark 作业,请使用以下代码。

val topicPartitionsList =  List(new TopicPartition("topic",1))

val alertReqStream1 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))

https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#consumerstrategies

Consumers can join a group by using the samegroup.id.

val topicPartitionsList =  List(new TopicPartition("topic",3), new TopicPartition("topic",4))

val alertReqStream2 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))

再添加两个消费者就是添加到同一个 groupid 中。

请阅读 Spark-Kafka 集成指南。 https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html

希望这有帮助。

关于apache-spark - 2 个具有相同消费者组 ID 的 Spark Stream 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50551651/

25 4 0