gpt4 book ai didi

spark-streaming - Kafkaconsumer 对多线程访问不安全

转载 作者:行者123 更新时间:2023-12-04 02:16:37 25 4
gpt4 key购买 nike

我使用下面的代码从 Kafka topic 读取数据,并处理数据。

JavaDStream<Row> transformedMessages = messages.flatMap(record -> processData(record))
.transform(new Function<JavaRDD<Row>, JavaRDD<Row>>() {
//JavaRDD<Row> records = ss.emptyDataFrame().toJavaRDD();
StructType schema = DataTypes.createStructType(fields);

public JavaRDD<Row> call(JavaRDD<Row> rdd) throws Exception {
records = rdd.union(records);
return rdd;
}
});

transformedMessages.foreachRDD(record -> {
//System.out.println("Aman" +record.count());
StructType schema = DataTypes.createStructType(fields);

Dataset ds = ss.createDataFrame(records, schema);
ds.createOrReplaceTempView("trades");
System.out.println(ds.count());
ds.show();

});

在运行代码时,我遇到以下异常:
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1624)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1197)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

事实上我只有一个 DStream,我不知道为什么我会收到这个异常。
我正在读取 Kafka 主题中的 3 个分区。我假设“createDirectStream”将创建 3 个消费者来读取数据。

下面是 KafkaConsumer 的代码,acquire 方法:
 private void acquire() {
this.ensureNotClosed();
long threadId = Thread.currentThread().getId();
if(threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
} else {
this.refcount.incrementAndGet();
}
}

最佳答案

Spark 2.2.0 有一个不使用缓存的解决方法。
只需使用 spark.streaming.kafka.consumer.cache.enabled 到 false .
看看这个 pull要求

关于spark-streaming - Kafkaconsumer 对多线程访问不安全,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44530234/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com