gpt4 book ai didi

java - "Malformed data length is negative",当尝试使用带有 Avro 数据源的 kafka 的 Spark 结构化流时

转载 作者:行者123 更新时间:2023-12-02 11:43:06 32 4
gpt4 key购买 nike

所以我一直在尝试使用 Kafka 和 Avro 数据进行 Angel Conde 的结构化流 Structured-Streaming Avro

然而,我的数据似乎有点复杂,其中有嵌套数据。这是我的代码,

private static Injection<GenericRecord, byte[]> recordInjection;
private static StructType type;
private static final String SNOQTT_SCHEMA = "{"
+"\"type\": \"record\","
+"\"name\": \"snoqttv2\","
+"\"fields\": ["
+" { \"name\": \"src_ip\", \"type\": \"string\" },"
+" { \"name\": \"classification\", \"type\": \"long\" },"
+" { \"name\": \"device_id\", \"type\": \"string\" },"
+" { \"name\": \"alert_msg\", \"type\": \"string\" },"
+" { \"name\": \"src_mac\", \"type\": \"string\" },"
+" { \"name\": \"sig_rev\", \"type\": \"long\" },"
+" { \"name\": \"sig_gen\", \"type\": \"long\" },"
+" { \"name\": \"dest_mac\", \"type\": \"string\" },"
+" { \"name\": \"packet_info\", \"type\": {"
+" \"type\": \"record\","
+" \"name\": \"packet_info\","
+" \"fields\": ["
+" { \"name\": \"DF\", \"type\": \"boolean\" },"
+" { \"name\": \"MF\", \"type\": \"boolean\" },"
+" { \"name\": \"ttl\", \"type\": \"long\" },"
+" { \"name\": \"len\", \"type\": \"long\" },"
+" { \"name\": \"offset\", \"type\": \"long\" }"
+" ],"
+" \"connect.name\": \"packet_info\" }},"
+" { \"name\": \"timestamp\", \"type\": \"string\" },"
+" { \"name\": \"sig_id\", \"type\": \"long\" },"
+" { \"name\": \"ip_type\", \"type\": \"string\" },"
+" { \"name\": \"dest_ip\", \"type\": \"string\" },"
+" { \"name\": \"priority\", \"type\": \"long\" }"
+"],"
+"\"connect.name\": \"snoqttv2\" }";

private static Schema.Parser parser = new Schema.Parser();
private static Schema schema = parser.parse(SNOQTT_SCHEMA);

static {
recordInjection = GenericAvroCodecs.toBinary(schema);
type = (StructType) SchemaConverters.toSqlType(schema).dataType();
}

public static void main(String[] args) throws StreamingQueryException{
// Set log4j untuk development langsung dari java
LogManager.getLogger("org.apache.spark").setLevel(Level.WARN);
LogManager.getLogger("akka").setLevel(Level.ERROR);

// Set konfigurasi untuk streamcontext dan sparkcontext
SparkConf conf = new SparkConf()
.setAppName("Snoqtt-Avro-Structured")
.setMaster("local[*]");

// Inisialisasi spark session
SparkSession sparkSession = SparkSession
.builder()
.config(conf)
.getOrCreate();

// Reduce task number
sparkSession.sqlContext().setConf("spark.sql.shuffle.partitions", "3");

// Mulai data stream di kafka
Dataset<Row> ds1 = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "snoqttv2")
.option("startingOffsets", "latest")
.load();

// Mulai streaming query

sparkSession.udf().register("deserialize", (byte[] data) -> {
GenericRecord record = recordInjection.invert(data).get();
return RowFactory.create(
record.get("timestamp").toString(),
record.get("device_id").toString(),
record.get("ip_type").toString(),
record.get("src_ip").toString(),
record.get("dest_ip").toString(),
record.get("src_mac").toString(),
record.get("dest_mac").toString(),
record.get("alert_msg").toString(),
record.get("sig_rev").toString(),
record.get("sig_gen").toString(),
record.get("sig_id").toString(),
record.get("classification").toString(),
record.get("priority").toString());
}, DataTypes.createStructType(type.fields()));

ds1.printSchema();
Dataset<Row> ds2 = ds1
.select("value").as(Encoders.BINARY())
.selectExpr("deserialize(value) as rows")
.select("rows.*");

ds2.printSchema();

StreamingQuery query1 = ds2
.groupBy("sig_id")
.count()
.writeStream()
.queryName("Signature ID Count Query")
.outputMode("complete")
.format("console")
.start();

query1.awaitTermination();
}

这一切都很有趣和游戏,直到我收到第一批消息,它遇到了错误

18/01/22 14:29:00 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 8) org.apache.spark.SparkException: Failed to execute user defined function($anonfun$27: (binary) => struct,timestamp:string,sig_id:bigint,ip_type:string,dest_ip:string,priority:bigint>) at ...

Caused by: com.twitter.bijection.InversionFailure: Failed to invert: [B@232f8415 at ...

Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -25 at ...

我做错了吗?或者我的嵌套架构是我的代码中的邪恶根源?感谢你们的帮助

最佳答案

刚刚使用嵌套架构和新的 avro 数据源的示例更新了存储库。 Repo

在使用新数据源之前,我尝试使用双射库并遇到与您发布的相同错误,但修复了它,删除 Kafka 临时文件夹以重置旧的排队数据。

最好的

关于java - "Malformed data length is negative",当尝试使用带有 Avro 数据源的 kafka 的 Spark 结构化流时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48392301/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com