- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
任务是将消息从一个巨大的源主题路由到许多(几千个)目标主题。总体速率约为每秒几百万条记录。它现在几乎无法处理此类负载,我们正在寻找优化它的解决方案。但是,它似乎没有达到硬件或网络级别的任何限制,所以我认为它可以改进。延迟并不重要(几分钟延迟就可以),平均消息大小小于 1 KiB。
增加吞吐量最明显的方法是使 batch.size
和 linger.ms
更大。但问题是目标主题中的消息速率不同:取决于消息目标,速率可能从每秒几条消息到每秒数十万条消息不等。
据我了解(如果我错了,请纠正我),但是 batch.size
是每个分区的参数。因此,如果我们将 batch.size
设置得太大,我们就会耗尽内存,因为它乘以了许多目标主题,即使它们都只有一个分区。否则,如果 batch.size
较小,则生产者将过于频繁地向代理发送请求。在每个应用程序实例中,我们为所有目标主题使用一个生产者(ProduceRequest
可以包含不同主题的批处理)。为每个主题设置不同的参数的唯一方法是为每个主题使用单独的生产者,但这意味着数千个线程和许多上下文切换。
我们能否设置实际 ProduceRequest
的最小 大小,即像 batch.size
,但对于请求中的整体批处理,即某些东西与 max.request.size
相对?
或者有什么办法可以提高producer的吞吐量?
最佳答案
问题看起来可以解决,而且我们似乎已经解决了。 Kafka 流到 3k 主题不是什么大问题,但是有一些事情你应该注意:
Kafka-producer 尝试在开始时分配 batch.size * number_of_destination_partitions
内存。如果您的 batch.size
等于 10mb
和 3k
主题,每个主题有 1
分区,Kafka-producer 将需要至少 ~30gb
开始 ( source code )。因此,您拥有的目标分区越多,您需要设置的 batch.size
就越少,或者您需要的内存就越大。我们选择了小的 batch.size
每个目标主题的消息速率不会影响总体性能。 Kafka 生产者为每个请求发送多个批处理。这里 max.request.size
发挥作用( source code ,maxSize
是 max.request.size
)。 max.request.size
越高,每个请求可以发送的批处理越多。重要的是要了解达到 batch.size
或 linger.ms
不会立即触发向代理发送批处理。一旦批处理达到 batch.size
或 linger.ms
,它就会被标记为可发送,稍后将与其他批处理一起处理 (source code)。此外,batch.size
或 linger.ms
并不是将批处理标记为可发送的唯一原因(查看前面的链接)。这是实际发送批处理的地方(source code)。这就是为什么不需要每个目标主题的相同事件率,但仍然存在一些细微差别,接下来将进行描述。
2.1。关于 linger.ms
的几句话。不能确定它在这种情况下的作用。一方面,它越大,Kafka 生产者等待收集确切分区的消息的时间就越长,并且每个请求将发送该分区的更多数据。另一方面,似乎它越少,不同分区的更多批处理就可以打包到一个请求中。虽然不确定如何做得更好。
尽管 Kafka-producer 能够为每个请求发送多个批处理,但它不能为一个特定分区的每个请求发送多个批处理。这就是为什么如果目标主题的消息速率偏斜,则必须增加大多数加载主题的分区数以增加吞吐量。但始终有必要记住,分区数的增加会导致内存使用量的增加。
实际上,上面的信息帮助我们解决了性能问题。但可能还有其他我们还不知道的细微差别。
希望有用。
关于apache-kafka - 卡夫卡 : Throughput of producing to thousands of topics with different message rate,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/74275706/
我使用 streamsBuilder.table("myTopic") 为某个主题创建了一个 Ktable,并将其具体化为状态存储,以便我可以使用交互式查询. 每小时,我都想从该状态存储(以及关联的变
我正在kafka中进行数据复制。但是,kafka 日志文件的大小增长得非常快。一天大小达到 5 GB。作为这个问题的解决方案,我想立即删除已处理的数据。我正在 AdminClient 中使用删除记录方
我实际上正在使用 SASL 纯文本设置简单的 Kafka 身份验证并添加 ACL 授权。但是当我尝试使用数据时遇到问题。 [main] INFO org.apache.kafka.common.uti
我正在一个使用 Kafka 和 Akka Streams 的项目 reactive-kafka连接器。我们发现reactive-kafka使用它自己的调度程序(akka.kafka.default-d
我试图在HDP上运行简单的kafka生产者消费者示例,但面临以下异常。 [2016-03-03 18:26:38,683] WARN Fetching topic metadata with corr
我继承了一些正在实现到另一个项目中的 Kafka 代码,并遇到了一个问题...消费者收到来自生产者的 3995 条消息后,它崩溃并给出以下错误: ERROR Error while accepting
我正在尝试测试 Flink 程序以使用此 JSONKeyValueDeserializationSchema 类读取来自 Kafka 的 JSON 数据。但是我的 Intellij 没有找到这个类。我
我有一个简单的生产者-消费者设置:1 个生产者(作为一个线程)和 2 个消费者(作为 2 个进程)。生产者的run方法: def run(self): producer = K
我正在使用“node-rdkafka”npm 模块来构建用 Nodejs 编写的分布式服务架构。我们有一个计量用例,其中我们只允许每 n 秒消耗和处理一定数量的消息。例如,“主”主题有 100 条由生
我正在学习 Kafka,我想知道当我消费来自主题的消息时如何指定然后分区。 我找到了几张这样的图片: 这意味着一个消费者可以消费来自多个分区的消息,但一个分区只能由单个消费者(在消费者组内)读取。 此
我想从flink读取一个kafka主题 package Toletum.pruebas; import org.apache.flink.api.common.functions.MapFunctio
我阅读了 Kafka 网站上的文档,但是在尝试实现一个完整的最小示例(生产者 --> kafka --> 消费者)之后,我不太清楚“消费者状态”如何处理偏移量。 一些信息 我正在使用高级 API (J
刚开始使用Spring Kafka(2.1.4.RELEASE)和Kafka(1.0.0),但是当我添加事务时,处理速度降低了很多。 代码: spring.kafka.consumer.max-pol
我尝试在安全模式下使用kafka(0.9.1)。我会使用 Spark 读取数据,因此我必须将 JAAS conf 文件传递给 JVM。我使用这个 cmd 来开始我的工作: /opt/spa
目标:读取主题中的所有消息,然后终止进程。 我能够连续阅读以下消息: props.put("bootstrap.servers", kafkaBootstrapSrv); props.put("gro
我写了一个非常简单的 Flink 流作业,它使用 FlinkKafkaConsumer082 从 Kafka 获取数据。 protected DataStream getKafkaStream(Str
我使用的是kafka 2.10-0.9.0.1当我通过命令删除主题时,主题被标记为删除。 bin/kafka-topics.sh --zookeeper localhost:2181 --delete
当我实例化一个 Kafka 消费者时 KafkaConsumer consumer = new KafkaConsumer(props); 我收到这条消息 SLF4J: Failed to load
我有一个用例,我需要 100% 的可靠性、幂等性(无重复消息)以及我的 Kafka 分区中的顺序保留。我正在尝试使用事务 API 来设置概念证明来实现这一点。有一个名为“isolation.level
我们有一个 3 主机的 Kafka 集群。我们有 136 个主题,每个主题有 100 个分区,复制因子为 3。这使得我们的集群中有 13,600 个分区。 这是我们主题的合理配置吗? 最佳答案 太多了
我是一名优秀的程序员,十分优秀!