gpt4 book ai didi

spring - Kafka Consumer 和 Spark-Kafka-Consumer 的区别

转载 作者:行者123 更新时间:2023-12-04 14:20:05 25 4
gpt4 key购买 nike

我有一个 kafka 主题,我正在通过 Kafka Producer 发送数据。现在,在消费者方面,我有两种选择。

<强>1。使用 KafkaConsumer - 下面是 kafkaConsumer 的代码,它从主题中读取数据并且工作正常。

  @EnableKafka
@Configuration
@PropertySource("kaafka.properties")
public class RawEventKafkaConsumer {

private static final Logger logger = LoggerFactory.getLogger(RawEventKafkaConsumer.class);
@Autowired
private DataModelServiceImpl dataModelServiceImpl;

private PolicyExecutor policyExecutor;

public RawEventKafkaConsumer() {
policyExecutor = new PolicyExecutor();
}


@Value("${spring.kafka.topic}")
private String rawEventTopicName;

@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;

@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootStrapServer;

@Value("${spring.kafka.consumer.group-id}")
private String groupId;

@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;

@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;

@Value("${spring.kafka.consumer.key-deserializer}")
private String keyDeserializer;

@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeserializer;



@Bean
public DefaultKafkaConsumerFactory<String, BaseDataModel> rawEventConsumer() {
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}

@Bean(name="kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, BaseDataModel> kafkaListenerContainerFactory() {
logger.info("kafkaListenerContainerFactory called..");
ConcurrentKafkaListenerContainerFactory<String, BaseDataModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(rawEventConsumer());
return factory;
}

@KafkaListener(topics = "rawEventTopic", containerFactory = "kafkaListenerContainerFactory")
public void listen(String baseDataModel) {

ObjectMapper mapper = new ObjectMapper();
BaseDataModel csvDataModel;
try {
csvDataModel = mapper.readValue(baseDataModel, BaseDataModel.class);

//saving the datamodel in elastic search.
//dataModelServiceImpl.save(csvDataModel);
System.out.println("Message received " + csvDataModel.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

}

<强>2。使用 Spark Stream 消费 kafkaTopic 数据 - 代码如下 -

 @Service
public class RawEventSparkStreamConsumer {

private final Logger logger = LoggerFactory.getLogger(RawEventSparkStreamConsumer.class);

@Autowired
private DataModelServiceImpl dataModelServiceImpl;


@Autowired
private JavaStreamingContext streamingContext;

@Autowired
private JavaInputDStream<ConsumerRecord<String, String>> messages;


@PostConstruct
private void sparkRawEventConsumer() {

ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(()->{
messages.foreachRDD((rdd) -> {
System.out.println("RDD coming *************************______________________________---------------------.." + rdd.count());
rdd.foreach(record -> {
System.out.println("Data is comming...." + record);
});
});

streamingContext.start();

try {
streamingContext.awaitTermination();
} catch (InterruptedException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
});

}
}

consumer kafka consumer和Spark streaming都成功从topic中读取数据。现在我有一个问题,如果两者都在做同样的事情(从主题中读取数据)那么

  1. 两者有什么区别?
  2. 我还面临另一个问题,kafka consume 和 Spark consumer 类都在同一个代码库中,所以如果我同时使用这两个类,那么 kafkaConsumer 代码将无法正常工作。

谢谢。

最佳答案

简短的回答是,与 Kafka 消费者仅在单个 JVM 中运行并且您手动运行同一应用程序的多个实例以将其扩展相比,您需要一个 Spark 集群以分布式方式运行 Spark 代码。

换句话说,您将以不同的方式运行它们。 spark-submitjava -jar。我不相信使用 Spring 会改变

另一个区别是“普通消费者”对 Kafka 配置有更多控制权,您一次只能获得一条记录。 Spark RDD 可以是许多事件,并且它们必须都是相同的“模式”,除非你想要复杂的解析逻辑,这比使用 ConsumerRecord 值更难编写 RDD 对象,这些值是为你。


总的来说,我认为将它们结合起来不是一个好主意。

如果他们正在阅读同一个主题,那么 Kafka 消费者协议(protocol)只能为每个分区分配一个消费者......目前还不清楚你的主题有多少个分区,但这可以解释为什么一个可以工作,但不能另一个

关于spring - Kafka Consumer 和 Spark-Kafka-Consumer 的区别,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55807101/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com