gpt4 book ai didi

scala - 在 Scala 中导入 avro 模式

转载 作者:行者123 更新时间:2023-12-01 09:37:02 24 4
gpt4 key购买 nike

我正在编写一个简单的 Twitter 程序,我正在使用 Kafka 阅读推文并希望使用 Avro 进行序列化。到目前为止,我刚刚在 Scala 中设置了 Twitter 配置,现在想使用此配置阅读推文。

如何在我的程序中导入文件 tweets.avsc 中定义的以下 avro 架构?

{
"namespace": "tweetavro",
"type": "record",
"name": "Tweet",
"fields": [
{"name": "name", "type": "string"},
{"name": "text", "type": "string"}
]
}

我按照网络上的一些示例显示了类似 import tweetavro.Tweet 以在 Scala 中导入模式,以便我们可以像

def main (args: Array[String]) {
val twitterStream = TwitterStream.getStream
twitterStream.addListener(new OnTweetPosted(s => sendToKafka(toTweet(s))))
twitterStream.filter(filterUsOnly)
}

private def toTweet(s: Status): Tweet = {
new Tweet(s.getUser.getName, s.getText)
}

private def sendToKafka(t:Tweet) {
println(toJson(t.getSchema).apply(t))
val tweetEnc = toBinary[Tweet].apply(t)
val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, tweetEnc)
kafkaProducer.send(msg)
}

我遵循相同的规则并在 pom.xml

中使用以下插件
<!-- AVRO MAVEN PLUGIN -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.7.7</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/scala/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>


<!-- MAVEN COMPILER PLUGIN -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>

完成所有这些之后,我仍然无法import tweetavro.Tweet

有人可以帮忙吗?

谢谢!

最佳答案

你也可以使用 avro4s .根据模式定义您的案例类(或生成它)。我们称该类为 Tweet。然后创建一个 AvroOutputStream,它将从案例类中推断模式,并用于序列化实例。然后我们可以写入字节数组,并将其发送到 kafka。例如:

val tweet: Tweet= ... // the instance you want to serialize

val out = new ByteArrayOutputStream // we collect the serialized output in this
val avro = AvroOutputStream[Tweet](out) // you specify the type here as well
avro.write(tweet)
avro.close()

val bytes = out.toByteArray
val msg = new KeyedMessage[String, Array[Byte]](KafkaTopic, bytes)
kafkaProducer.send(msg)

关于scala - 在 Scala 中导入 avro 模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31763571/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com