gpt4 book ai didi

apache-kafka - Kafka无法接收二进制数据

转载 作者:行者123 更新时间:2023-12-04 02:06:16 28 4
gpt4 key购买 nike

我正在尝试将二进制格式的音频剪辑发送到 Kafka 主题。

但是 Kafka 没有收到消息。

以下是我的制作人:

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

public class AudioProducer {

public static void main(String[] args) {
BasicConfigurator.configure();
System.out.println("program started");
Properties properties = new Properties();
properties.put("bootstrap.servers", "broker-host:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 26214400);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 2*26214400);
properties.put("max.request.size", 26214400);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<String,byte[]> producer = new KafkaProducer<String, byte[]>(properties);
try {
byte[] temp =Files.readAllBytes(Paths.get(args[0]));
System.out.println("input path:"+args[0]);
producer.send(new ProducerRecord<String,byte[]>("audio-queue", "test-key",temp ));
} catch (IOException e) {
e.printStackTrace();
}
producer.close();
System.out.println("program completed");
}

}

以下是 Kafka Debug模式的输出:

program started
0 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
compression.type = none
metric.reporters = []
metadata.max.age.ms = 300000
metadata.fetch.timeout.ms = 60000
acks = all
batch.size = 26214400
reconnect.backoff.ms = 10
bootstrap.servers = [broker-host:9092]
receive.buffer.bytes = 32768
retry.backoff.ms = 100
buffer.memory = 52428800
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
retries = 0
max.request.size = 26214400
block.on.buffer.full = true
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
metrics.sample.window.ms = 30000
send.buffer.bytes = 131072
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
linger.ms = 1
client.id =

86 [main] DEBUG org.apache.kafka.clients.producer.internals.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [Node(broker-host, 9092)], partitions = [])
105 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Starting Kafka producer I/O thread.
106 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - Kafka producer started
input path:AUD_0030.wav
190 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -1
190 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Init connection to node -1 for sending metadata request in the next iteration
190 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at broker-host:9092.
251 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -1
261 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node -1
351 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Trying to send metadata request to node -1
361 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, payload=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=producer-1}, body={topics=[audio-queue]})) to node -1
977 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Metadata - Updated cluster metadata version 2 to Cluster(nodes = [Node(1, broker-host, 9092)], partitions = [Partition(topic = audio-queue, partition = 0, leader = 1, replicas = [1,], isr = [1,]])
1021 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Beginning shutdown of Kafka producer I/O thread, sending remaining records.
1021 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 1 at 01hw508208.india.tcs.com:9092.
1037 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 1
11511 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Shutdown of Kafka producer I/O thread has completed.
11512 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - The Kafka producer has closed.
program completed

但是相同的主题和相同的程序可以很好地处理字符串消息。

我还检查了代理节点的 Kafka 日志。我只能找到字符串消息,但找不到二进制消息。

最佳答案

因为它解决了问题,我将把我的评论作为答案。

Kafka 不是文件服务器,在处理千字节范围内的消息时性能最佳。默认情况下,最大消息大小为 1 MB,可以通过设置代理 max.message.bytes 来覆盖它。属性(property)值(value)更高。

这样做还会导致消费者(在新的消费者 API 中)的最大提取量应该通过 fetch.max.bytes

增加

大消息有性能缺陷。当发送较大的文件时,应考虑将文件存储在存储系统(例如 S3)上,并且只将 URI 传递给 Kafka 周围的那些文件。

关于apache-kafka - Kafka无法接收二进制数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43137066/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com