gpt4 book ai didi

scala - 将带有参数的案例类作为案例类转换为 Avro 消息以发送到 Kafka

转载 作者:行者123 更新时间:2023-12-03 01:38:19 42 4
gpt4 key购买 nike

我使用的案例类具有嵌套案例类和Seq[嵌套案例类]问题是当我尝试使用 KafkaAvroSerializer 对其进行序列化时,它会抛出:

Caused by: java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDe.getSchema(AbstractKafkaAvroSerDe.java:115)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:71)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:60)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:879)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:728)```

最佳答案

如果您想将 Avro 与 Scala 结构(例如案例类)一起使用,我建议您使用 Avro4s 。它对所有 scala 功能具有 native 支持,甚至可以根据您的模型创建架构(如果您需要的话)。

自动类型类派生存在一些问题。这是我学到的。

至少使用avro4s版本2.0.4

一些宏生成带有编译器警告的代码,并且还会破坏疣去除器。我们必须添加以下注释才能编译我们的代码(有时错误是找不到隐式错误,但它是由宏生成代码中的错误引起的):

@com.github.ghik.silencer.silent
@SuppressWarnings(Array("org.wartremover.warts.Null", "org.wartremover.warts.AsInstanceOf", "org.wartremover.warts.StringPlusAny"))

下一个自动类型类派生一次仅适用于一个级别。我创建了一个对象来保存架构的所有 SchemaForDecoderEncoder 实例。然后,我从最内部的类型开始显式地构建类型类实例。我还使用隐式来验证每个 ADT 是否能够在移至下一个 ADT 之前解决。例如:

sealed trait Notification
object Notification {
final case class Outstanding(attempts: Int) extends Notification
final case class Complete(attemts: Int, completedAt: Instant) extends Notification
}

sealed trait Job
final case class EnqueuedJob(id: String, enqueuedAt: Instant) extends Job
final case class RunningJob(id: String, enqueuedAt: Instant, startedAt: Instant) extends Job
final case class FinishedJob(id: String, enqueuedAt: Instant, startedAt: Instant, completedAt: Instant) extends Job

object Schema {

// Explicitly define schema for ADT instances
implicit val schemaForNotificationComplete: SchemaFor[Notification.Complete] = SchemaFor.applyMacro
implicit val schemaForNotificationOutstanding: SchemaFor[Notification.Outstanding] = SchemaFor.applyMacro

// Verify Notification ADT is defined
implicitly[SchemaFor[Notification]]
implicitly[Decoder[Notification]]
implicitly[Encoder[Notification]]

// Explicitly define schema, decoder and encoder for ADT instances
implicit val schemaForEnqueuedJob: SchemaFor[EnqueuedJob] = SchemaFor.applyMacro
implicit val decodeEnqueuedJob: Decoder[EnqueuedJob] = Decoder.applyMacro
implicit val encodeEnqueuedJob: Encoder[EnqueuedJob] = Encoder.applyMacro

implicit val schemaForRunningJob: SchemaFor[RunningJob] = SchemaFor.applyMacro
implicit val decodeRunningJob: Decoder[RunningJob] = Decoder.applyMacro
implicit val encodeRunningJob: Encoder[RunningJob] = Encoder.applyMacro

implicit val schemaForFinishedJob: SchemaFor[FinishedJob] = SchemaFor.applyMacro
implicit val decodeFinishedJob: Decoder[FinishedJob] = Decoder.applyMacro
implicit val encodeFinishedJob: Encoder[FinishedJob] = Encoder.applyMacro

// Verify Notification ADT is defined
implicitly[Encoder[Job]]
implicitly[Decoder[Job]]
implicitly[SchemaFor[Job]]

// And so on until complete nested ADT is defined
}

关于scala - 将带有参数的案例类作为案例类转换为 Avro 消息以发送到 Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56172798/

42 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com