gpt4 book ai didi

logstash - 无法读取 Kafka - Avro Schema 消息

转载 作者:行者123 更新时间:2023-12-02 21:43:06 25 4
gpt4 key购买 nike

这个问题有解决办法吗???我无法读取 KAFKA-AVRO 架构消息。我正在尝试将消息从logstash发送到KAFKA再到HDFS。

以下是技术堆栈:

  1. Logstash 2.3 - 当前生产版本
  2. Confluence 3.0。
  3. 插件:A。 Logstash-kafka-输出插件b. Logstash-编解码器-avro。
  4. 动物园管理员:3.4.6
  5. 卡夫卡:0.10.0.0

Logstash 配置文件如下所示:

input {
stdin{}
}

filter {
mutate {
remove_field => ["@timestamp","@version"]
}
}

output {
kafka {
topic_id => 'logstash_logs14'

codec => avro {
schema_uri => "/opt/logstash/bin/schema.avsc"
}
}
}

schema.avsc 文件如下所示:

{
"type":"record",
"name":"myrecord",
"fields":[
{"name":"message","type":"string"},
{"name":"host","type":"string"}
]
}

运行了以下命令:

  1. 在自己的终端中启动 Zookeeper

    ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

2 在自己的终端中启动Kafka

./bin/kafka-server-start ./etc/kafka/server.properties

3 在自己的终端中启动架构注册表

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

4 从logstash 目录中,运行以下命令

bin/logstash -f ./bin/logstash.conf

5 运行上述命令后输入您希望发送到 kafka 的日志消息 例如:“ Hello World ”

6 使用 Kafka 中的主题

./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic logstash_logs14 --from-beginning
_While consuming we get the following error:_

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Processed a total of 1 messages
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

请告诉我如何解决这个问题

谢谢,乌彭德拉

最佳答案

您如何向 Kafka 写入/发布?您看到 SerializationException 是因为数据不是使用 schema-registry(或 KafkaAvroSerializer)写入的,但在使用 schema-registry 时,kafka-avro-console-consumer 内部使用 schema-registry(或 KafkaAvroDeserializer),它期望数据以某种格式(特别是 <magic byte><schemaId><data> )。如果您使用 kafka-avro-console- Producer 写入 avro 数据,那么您不应该得到此异常,或者您可以设置 KafkaAvroSerializer在键和值序列化程序的生产者属性中,并设置 schema-registry-url。

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");

关于logstash - 无法读取 Kafka - Avro Schema 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37704629/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com