gpt4 book ai didi

apache-kafka - 发布和使用不同类型消息的最佳方式是什么?

转载 作者:行者123 更新时间:2023-12-01 06:15:23 25 4
gpt4 key购买 nike

卡夫卡 0.8V

我想发布/使用 byte[] 对象、java bean 对象、可序列化对象等等。

为这种类型的场景定义发布者和消费者的最佳方式是什么?当我消费来自消费者迭代器的消息时,我不知道它是什么类型的消息。任何人都可以为我提供有关如何设计此类场景的指南吗?

最佳答案

我为每个 Kafka 主题强制执行单一模式或对象类型。这样,当您收到消息时,您就知道自己收到了什么。

至少,您应该决定给定的主题是要保存 binary 还是 string 数据,并根据这些数据进一步编码。

例如,您可以有一个名为 Schema 的主题,其中包含存储为字符串的 JSON 编码对象。

如果您使用 JSON 和一种松散类型的语言(如 JavaScript),可能很容易在同一主题中存储具有不同架构的不同对象。使用 JavaScript,您只需调用 JSON.parse(...),查看生成的对象,然后弄清楚您想要用它做什么。

但是你不能用像 Scala 这样的严格类型的语言来做到这一点。 Scala JSON 解析器通常希望您将 JSON 解析为已定义的 Scala 类型,通常是 case class。他们不适用于此模型。

一个解决方案是保持一个模式/一个主题规则,但稍微作弊:将一个对象包装在一个对象中。一个典型的例子是一个 Action 对象,其中您有一个描述操作的 header ,以及一个具有依赖于 header 中列出的操作类型的架构的有效负载对象。想象一下这个伪架构:

{name: "Action", fields: [
{name: "actionType", type: "string"},
{name: "actionObject", type: "string"}
]}

这样,即使在强类型语言中,您也可以执行以下操作(同样这是伪代码):

action = JSONParser[Action].parse(msg)
switch(action.actionType) {
case "foo" => var foo = JSONParser[Foo].parse(action.actionObject)
case "bar" => var bar = JSONParser[Bar].parse(action.actionObject)
}

这种方法的一个巧妙之处在于,如果您有一个消费者只等待特定的 action.actionType,并且将忽略所有其他的,那么它是非常轻量级的只解码 header 并推迟解码 action.actionObject 直到需要的时候。

到目前为止,这都是关于字符串编码数据的。如果你想使用二进制数据,当然你也可以将它包装在 JSON 中,或者像 XML 这样的许多基于字符串的编码中的任何一种。但是也有很多二进制编码系统,比如 Thrift 和 Avro 。其实上面的伪schema是基于Avro的。您甚至可以在 Avro 中做一些很酷的事情,比如模式演变,除其他外,它提供了一种非常巧妙的方式来处理上述 Action 用例——而不是将对象包装在对象中,您可以定义一个schema 是其他模式的一个子集,只解码你想要的字段,在本例中只是 action.actionType 字段。这是对 schema evolution 的非常出色的描述.

简而言之,我推荐的是:

  1. 选择基于模式的编码系统(无论是 JSON、XML、Avro、随便)
  2. 每个主题规则执行一个模式

关于apache-kafka - 发布和使用不同类型消息的最佳方式是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36586251/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com