- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试对消费者群体进行实验
这是我的代码片段
public final class App {
private static final int INTERVAL = 5000;
public static void main(String[] args) throws Exception {
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "xxx:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", true);
kafkaParams.put("auto.commit.interval.ms","1000");
kafkaParams.put("security.protocol","SASL_PLAINTEXT");
kafkaParams.put("sasl.kerberos.service.name","kafka");
kafkaParams.put("retries","3");
kafkaParams.put(GROUP_ID_CONFIG,"mygroup");
kafkaParams.put("request.timeout.ms","210000");
kafkaParams.put("session.timeout.ms","180000");
kafkaParams.put("heartbeat.interval.ms","3000");
Collection<String> topics = Arrays.asList("venkat4");
SparkConf conf = new SparkConf();
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(INTERVAL));
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
return new Tuple2<>(record.key(), record.value());
}
}).print();
ssc.start();
ssc.awaitTermination();
}
}
当我同时运行两个 Spark 流作业时,它失败并出现错误
Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition venkat4-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315) at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) at scala.Option.orElse(Option.scala:289)
根据这个https://www.wisdomjobs.com/e-university/apache-kafka-tutorial-1342/apache-kafka-consumer-group-example-19004.html创建具有同一组的单独的 kafka 消费者实例将创建分区的重新平衡。我相信消费者不会容忍这种重新平衡。我该如何解决这个问题
下面是使用的命令
SPARK_KAFKA_VERSION=0.10 Spark2-submit --num-executors 2 --master yarn --deploy-mode client --files jaas.conf#jaas.conf,hive.keytab#hive.keytab --driver-java-options “-Djava.security.auth.login.config=./jaas.conf”--class Streaming.App --conf“spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf “--conf Spark.streaming.kafka.consumer.cache.enabled=false 1-1.0-SNAPSHOT.jar
最佳答案
Per this https://www.wisdomjobs.com/e-university/apache-kafka-tutorial-1342/apache-kafka-consumer-group-example-19004.html creation of separate instance of kafka consumer with same group will create a rebalance of partitions. I believe the rebalance is not being tolerated by the consumer. How should I fix this
现在所有分区仅由一个消费者使用。如果数据摄取率很高,消费者以摄取速度消费数据可能会很慢。
在同一个consumergroup中添加更多consumer来消费某个topic的数据,提高消费率。 Spark Streaming 使用这种方法在 Kafka 分区和 Spark 分区之间实现 1:1 并行度。 Spark 将在内部处理它。
如果消费者的数量多于主题分区的数量,那么它将处于空闲状态并且资源未得到充分利用。始终建议消费者应小于或等于分区数。
Kafka will re-balance, if more processes/threads are added. The ZooKeeper can be reconfigured by Kafka cluster, if any consumer or broker fails to send heartbeat to ZooKeeper.
每当任何代理发生故障或向现有主题添加新分区时,Kafka 都会重新平衡分区存储。这是 kafka 特有的如何平衡代理中各个分区的数据。
Spark 流在 Kafka 分区和 Spark 分区之间提供简单的 1:1 并行性。如果您没有使用 ConsumerStraties.Assign 提供任何分区详细信息,则从给定主题的所有分区进行消费。
Kafka assigns the partitions of a topic to the consumer in a group, so that each partition is consumed by exactly one consumer in the group. Kafka guarantees that a message is only ever read by a single consumer in the group.
当您启动第二个 Spark Streaming 作业时,另一个消费者尝试使用同一消费者组 ID 中的同一分区。所以它会抛出错误。
val alertTopics = Array("testtopic")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> sparkJobConfig.kafkaBrokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> sparkJobConfig.kafkaConsumerGroup,
"auto.offset.reset" -> "latest"
)
val streamContext = new StreamingContext(sparkContext, Seconds(sparkJobConfig.streamBatchInterval.toLong))
val streamData = KafkaUtils.createDirectStream(streamContext, PreferConsistent, Subscribe[String, String](alertTopics, kafkaParams))
如果您想使用特定于分区的 Spark 作业,请使用以下代码。
val topicPartitionsList = List(new TopicPartition("topic",1))
val alertReqStream1 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))
https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#consumerstrategies
Consumers can join a group by using the samegroup.id.
val topicPartitionsList = List(new TopicPartition("topic",3), new TopicPartition("topic",4))
val alertReqStream2 = KafkaUtils.createDirectStream(streamContext, PreferConsistent, ConsumerStrategies.Assign(topicPartitionsList, kafkaParams))
再添加两个消费者就是添加到同一个 groupid 中。
请阅读 Spark-Kafka 集成指南。 https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html
希望这有帮助。
关于apache-spark - 2 个具有相同消费者组 ID 的 Spark Stream 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50551651/
我正在尝试实现具有以下签名的方法: public static Pair, Stream> flatten(Iterator, Stream>> iterator); 该方法的目标是将每种流类型展平
我有两个流从两个不同的 api 获取。 Stream get monthOutStream => monthOutController.stream; Stream get resultOutStre
Stream.of(int[])返回 Stream ,而 Stream.of(String[])返回 Stream . 为什么这两种方法的行为不同?两者都应该返回 Stream和 Stream或 St
我正在使用 rxdart在 dart 中处理流的包。我被困在处理一个特殊的问题上。 请看一下这个虚拟代码: final userId = BehaviorSubject(); Stream getSt
我到处都找遍了,还是没弄明白。我知道你可以用流建立两个关联: 用于支持数据存储的包装器意味着作为消费者和供应商之间的抽象层 数据随着时间的推移变得可用,而不是一次全部 SIMD 代表单指令,多数据;在
考虑下面的代码: List l=new ArrayList<>(); l.add(23);l.add(45);l.add(90); Stream str=l.stream
我有一个大型主干/requirejs 应用程序,我想迁移到 webpack,最新的“webpack”:“^4.27.1”,但我遇到了一个我无法解决的错误。 我一直在阅读 https://webpack
我正在使用 xmpp 开发聊天应用程序,根据我们的要求,我们有三台服务器 Apache Tomcat 7、ejabbered 2.1.11 和 mysql 5.5, to run xmppbot on
我知道如何使用 Java 库,并且我可以编写一些循环来执行我需要的操作,但问题更多,为什么 scala.collection.JavaConverters 中没有任何内容或scala.collecti
我正在尝试创建一个单一的衬里,它应该计算一个非常长的文本文件中的唯一单词。独特的词例如:márya fëdorovna scarlet-liveried,...所以基本上都是非英语词。 我的问题是我的
如果我有以下情况: StreamWriter MySW = null; try { Stream MyStream = new FileStream("asdf.txt"); MySW =
有人可以帮我将以下语句转换为 Java8: 我有一个像这样的 HashMap : private Map, List>> someMap; 我想在java8中转换以下逻辑: private Strin
有人可以帮我将以下语句转换为 Java8: 我有一个像这样的 HashMap : private Map, List>> someMap; 我想在java8中转换以下逻辑: private Strin
考虑两种测试方法parallel()和sequential(): @Test public void parallel() throws Exception { System.ou
我是 NodeJS 的新手,我基本上想做的是通过 HTTP 将 .pdf 上传到我的服务器。我正在使用 POST rquest 来处理 Content-Type multipart/form-data
哪个更好:MemoryStream.WriteTo(Stream destinationStream) 或 Stream.CopyTo(Stream destinationStream)?? 我正在谈
给定一个 Stream,我想创建一个新的 Stream,其中的元素在它们之间有时间延迟。 我尝试使用 tokio_core::reactor::Timeout 和 Stream 的 and_then
我是 Kafka Streams 和 Spring Cloud Stream 的新手,但在将集成相关代码移动到属性文件方面已经阅读了有关它的好东西,因此开发人员可以主要专注于事物的业务逻辑方面。 这里
源代码看起来非常相似:pump , pipe .为什么我要使用一个而不是另一个?一个只是另一个的更好版本吗? 最佳答案 Stream.pipe 现在显然是自 0.3.x 以来的首选方法,因此尽可能尝试
我正在寻找是否有更好的方法来解决我不得不使用这些签名的困境(注意:由于 Spock 测试,T[][] 是必需的,我提供 T[][] 作为数据提供商) 我的方法签名是: public T[][] cr
我是一名优秀的程序员,十分优秀!