- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个 kafka 主题,我正在通过 Kafka Producer 发送数据。现在,在消费者方面,我有两种选择。
<强>1。使用 KafkaConsumer - 下面是 kafkaConsumer 的代码,它从主题中读取数据并且工作正常。
@EnableKafka
@Configuration
@PropertySource("kaafka.properties")
public class RawEventKafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(RawEventKafkaConsumer.class);
@Autowired
private DataModelServiceImpl dataModelServiceImpl;
private PolicyExecutor policyExecutor;
public RawEventKafkaConsumer() {
policyExecutor = new PolicyExecutor();
}
@Value("${spring.kafka.topic}")
private String rawEventTopicName;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootStrapServer;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
@Value("${spring.kafka.consumer.key-deserializer}")
private String keyDeserializer;
@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeserializer;
@Bean
public DefaultKafkaConsumerFactory<String, BaseDataModel> rawEventConsumer() {
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
@Bean(name="kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, BaseDataModel> kafkaListenerContainerFactory() {
logger.info("kafkaListenerContainerFactory called..");
ConcurrentKafkaListenerContainerFactory<String, BaseDataModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(rawEventConsumer());
return factory;
}
@KafkaListener(topics = "rawEventTopic", containerFactory = "kafkaListenerContainerFactory")
public void listen(String baseDataModel) {
ObjectMapper mapper = new ObjectMapper();
BaseDataModel csvDataModel;
try {
csvDataModel = mapper.readValue(baseDataModel, BaseDataModel.class);
//saving the datamodel in elastic search.
//dataModelServiceImpl.save(csvDataModel);
System.out.println("Message received " + csvDataModel.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
<强>2。使用 Spark Stream 消费 kafkaTopic 数据 - 代码如下 -
@Service
public class RawEventSparkStreamConsumer {
private final Logger logger = LoggerFactory.getLogger(RawEventSparkStreamConsumer.class);
@Autowired
private DataModelServiceImpl dataModelServiceImpl;
@Autowired
private JavaStreamingContext streamingContext;
@Autowired
private JavaInputDStream<ConsumerRecord<String, String>> messages;
@PostConstruct
private void sparkRawEventConsumer() {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(()->{
messages.foreachRDD((rdd) -> {
System.out.println("RDD coming *************************______________________________---------------------.." + rdd.count());
rdd.foreach(record -> {
System.out.println("Data is comming...." + record);
});
});
streamingContext.start();
try {
streamingContext.awaitTermination();
} catch (InterruptedException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
});
}
}
consumer kafka consumer和Spark streaming都成功从topic中读取数据。现在我有一个问题,如果两者都在做同样的事情(从主题中读取数据)那么
谢谢。
最佳答案
简短的回答是,与 Kafka 消费者仅在单个 JVM 中运行并且您手动运行同一应用程序的多个实例以将其扩展相比,您需要一个 Spark 集群以分布式方式运行 Spark 代码。
换句话说,您将以不同的方式运行它们。 spark-submit
与 java -jar
。我不相信使用 Spring 会改变
另一个区别是“普通消费者”对 Kafka 配置有更多控制权,您一次只能获得一条记录。 Spark RDD 可以是许多事件,并且它们必须都是相同的“模式”,除非你想要复杂的解析逻辑,这比使用 ConsumerRecord
值更难编写 RDD 对象,这些值是为你。
总的来说,我认为将它们结合起来不是一个好主意。
如果他们正在阅读同一个主题,那么 Kafka 消费者协议(protocol)只能为每个分区分配一个消费者......目前还不清楚你的主题有多少个分区,但这可以解释为什么一个可以工作,但不能另一个
关于spring - Kafka Consumer 和 Spark-Kafka-Consumer 的区别,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55807101/
我已经开始研究 MassTransit 并正在编写将处理消息的类。当我从 Consumes 实现接口(interface)时我有四个选项:All , Selected , For和 Context .
我正在尝试找出消费者群体级别是否也有任何抵消。在 Kafka 中,Consumer Offset 是在 Consumer group 级别还是在该 consumer group 内的单个消费者? 最佳
我有一个我不理解的 java 编译器错误。看来消费者 和 Consumer(带有 T 扩展对象)在方法签名参数中不等效。请查看以下代码: import java.util.function.Consu
我在泛型方面遇到了一些麻烦,尽管找到了解决方法,但我不明白是什么阻止了我的代码编译。 我有一个显示 TreeTableView 的 JavaFX 项目:
C++11 标准定义了一个内存模型(1.7、1.10),其中包含内存排序,大致为“顺序一致”、“获取”、“消耗”、“释放”和“放松”。同样粗略地,一个程序只有在它是无种族的情况下才是正确的,如果所有
我有一个 kafka 主题,我正在通过 Kafka Producer 发送数据。现在,在消费者方面,我有两种选择。 1。使用 KafkaConsumer - 下面是 kafkaConsumer 的代码
我有四个当前消费者在 Amazon AWS 上收听同一个队列。从队列中拉取消息时,有时会出现同一条消息被两个不同的消费者消费的情况。请看下面的日志: 18:01:46,515 [jmsContaine
我正在设计一个系统,其中将有 n 个生产者和 m 个消费者,其中 n 和 m 是数字,n != m。 我想这样设计系统, 任何生产者在生产时不得阻止其他生产者 任何消费者都不应在消费时阻止其他消费者
我们有一个系统,我们希望将记录(例如联系人、客户、机会)从我们的系统推送到 SalesForce。 为此,我们使用了 ForceToolKit for .Net .我们成功地将联系人记录从我们的系统推
我怎样才能写一个方法来组合 Stream的 Consumers成单个 Consumer使用 Consumer.andThen(Consumer) ? 我的第一个版本是: Consumer combi
我需要开始使用 kafka。我很难理解消费者应该收到什么:据我了解,我们可以通过多种方式配置消费者: 示例 1: @KafkaListener(topics = "topic_name) public
我需要开始使用 kafka。我很难理解消费者应该收到什么:据我了解,我们可以通过多种方式配置消费者: 示例 1: @KafkaListener(topics = "topic_name) public
我正在尝试在我的 scala play 应用程序中创建消费者 secret / key 对,但我似乎无法让它正常工作。我有以下代码 import org.apache.commons.codec.bi
我通过传递用户(消费者)名称使用 .NET 应用程序,我需要从 Salesforce 检索消费者 key 和消费者 key ,我该如何实现。 最佳答案 Consumer Key 和 Consumer
我想设置 至 0 .这似乎是另一个问题 ( JMS queue with multiple consumers ) 的答案,并在此 article 中进行了描述。在第 17.1.1 章中。我使用 JN
I have send message api to my users.When I send to message from my x numbers I need to wait 10-15
我有一个 java Kafka 消费者,我在其中批量获取 ConsumerRecords 进行处理。示例代码如下- while (true) { ConsumerRecords records
我正在为 iPhone 编写 Twitter/Facebook 应用程序。我有自己的 Apache/PHP 服务器。我只想把Consumer Key放在app里,然后我把Consumer Secret
Spring AMQP:比较多个消费者与每个消费者多个线程的性能 我正处于从 Spring 文档学习 Spring AMQP 的阶段。我不清楚提高异步消息消费率的首选方法:根据 Spring 文档 (
我正在制作一个需要 oAuth 1.0 身份验证的应用程序。我可以访问客户提供的消费者 key 和消费者 secret 。我曾尝试使用 AFNetworking 进行此操作,但效果不佳。有人可以建议我
我是一名优秀的程序员,十分优秀!