- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
使用什么配置可以只使用 KafkaProducer 发送一个 ProducerRecord 以避免批量发送消息?
或者不可能使用 KafkaProducer 只发送一条记录?
(使用 Kafka Streams 可以仅处理一条记录并将其发送到某个主题)。
我当前的配置是:
Properties kafkaProps = new Properties();
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put(ProducerConfig.ACKS_CONFIG, "1");
kafkaProps.put("retries", 3);
kafkaProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 120000);
kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024);
kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, 0);
kafkaProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024);
kafkaProps.put("compression.type", "gzip");
KafkaProducer producer = new KafkaProducer<String, String>(kafkaProps);
最佳答案
您可以在发送
之后立即显式刷新
生产者
关于java - 使用 KafkaProducer 发送恰好一个 ProducerRecord,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56825728/
查看文档,我不确定我是否理解使用 close() 之间的区别和 flush() . 这是flush()的文档 * Invoking this method makes all buffered re
我试图向kafka发送一些数据,但是当我运行我的代码时我得到了 13:20:17.688 [kafka-producer-network-thread | producer-1] DEBUG org.
我们有一个系统,它从用户接收数据并将数据推送到 kafka,只有当我们确定数据已推送时,我们才会向用户发送“OK”响应。 由于新的kafka使用的是异步发送(ProducerRecord,Callba
我在我的 Windows PC 上构建了一个小型测试环境,并写下了以下用于测试 kafka 的代码(使用来自 org.apache.kafka 的 kafka_2.10:0.9.0.1)。 packa
尝试向新主题发布第一条消息时,日志中出现以下错误。 [WARN ] [o.a.kafka.clients.NetworkClient][[Producer clientId=producer-1] E
我在 Scala 中使用 0.9 Kafka Java 客户端。 scala> val kafkaProducer = new KafkaProducer[String, String](props)
我无法从 Kafka Producer 发送消息。我的配置不起作用,如下所示: Properties properties = new Properties(); properties.setProp
使用什么配置可以只使用 KafkaProducer 发送一个 ProducerRecord 以避免批量发送消息? 或者不可能使用 KafkaProducer 只发送一条记录? (使用 Kafka St
在此处报告问题。生产者抛出以下错误。等了一个小时左右,错误消失了。但是当我遇到错误时,我可以使用 Kafka 工具列出主题,而 Kafka Python API 不起作用: kafka-console
我正在使用 kafka_2.11-2.2.1。我在 server.properties 中有以下设置。 message.max.bytes=20971520 replica.fetch.max.byt
我有一个简单的 spark streaming 作业,它跟踪 HDFS 目录,读取新添加的文件,并将其发送到 Kafka。 提交 spark 作业时它不起作用并抛出以下异常。 ImportError:
我们使用的 sasl 机制是 SCRAM-SHA-256 但 kafka 生产者将只接受 sasl_mechanism 作为 PLAIN, GSSAPI, OAUTHBEARER 下面的配置会报错 s
KafkaProducer send method两者都返回一个 Future 并接受一个回调。 在发送完成后使用一种机制而不是另一种机制来执行操作之间有什么根本区别吗? 最佳答案 查看您链接到的文档
我的动物园管理员和经纪人正在运行。 当我像这样从命令行发送消息时: from kafka import KafkaProducer producer = KafkaProducer(bootstrap
我正在尝试创建一个简单的 KafkaProducer 和 KafkaConsumer,以便我可以将数据发送到代理上的主题,然后验证是否已收到数据。下面是我用来定义消费者和生产者的两种方法,以及如何发送
我正在对 KafkaProducer 的一个非常简单的包装类进行单元测试,其发送方法就像这样 public class EntityProducer { private final Kafka
我有一个示例 Hello World REST API,并且编写了一些简单的 Kafka 生产者代码。 现在,我希望我的 Kafka Producer 在每次调用 REST API 时向主题发送消息,
我有一个非常简单的生产者,我在我的 windows 本地机器上通过 eclipse 运行...我真正想要的是将消息传递给 kafka,这样我就可以通过 zookeeper 查看代理。只是为了看看端到端
如果生成的记录失败,我想设置要触发的回调。最初,我只想记录失败的记录。 Confluent Kafka python 库提供了一种添加回调的机制: produce(topic[, value][, k
KafkaProducer 无法选择其属性中定义的 schema.registry.url 。 正如我们在下面的屏幕截图中看到的,架构注册表 url 是一个虚拟 url // variable whi
我是一名优秀的程序员,十分优秀!