gpt4 book ai didi

apache-kafka - Kafka Streams JDBC Source Long 不兼容

转载 作者:行者123 更新时间:2023-12-04 04:14:24 27 4
gpt4 key购买 nike

问题:在设置 Kafka 管道以使用 Kafka Connect JDBC 源和 Avro 序列化器和反序列化器提取数据后,当我尝试使用 Kafka Streams Java 应用程序将该数据读入 KStream 时,出现以下错误。

org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

我已尝试尽可能地遵循现有示例,但有些事情并没有意义。我将在下面包含所有代码/附加信息,但这里有几个问题......

  1. 我目前理解的最大差距之一是什么被用于 Avro 记录的“KEY”?在我身上出错的行(在运行时)与我告诉 KStream 键是 LONG 的事实有关,但是当检索 Avro 记录时,长度小于 8(LONG 的预期长度类型)。
    当我设置我的 JDBC 源时,那里没有任何东西可以识别 key 是什么 - 我在文档中没有看到任何东西可以让我相信我可以指定 key ,尽管我已经尝试过:

    curl -X POST \
    -H "Content-Type: application/json" \
    --data 'see next code block for formatted data' \
    http://localhost:8083/connectors

    // This is the data chunk used above but in a string - broke it apart for readability here
    {
    "name": "source-jdbc-ldw_applications",
    "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": 1,
    "connection.url": "jdbc:sqlserver://dbserver;databaseName=dbname;user=kafkareader;password=kafkareader;",
    "mode": "incrementing",
    "incrementing.column.name": "ApplicationID",
    "topic.prefix": "source-jdbc-",
    "poll.interval.ms": 30000,
    "table.whitelist": "LDW_Applications",
    "transforms": "setSchema",
    "transforms.setSchema.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
    "transforms.setSchema.schema.name": "com.mycompany.avro.Application",
    "transforms.setSchema.schema.version": "1"
    }
    }

有了上面的内容,我得到了运行报告的以下架构:

curl http://localhost:8081/subjects/source-jdbc-LDW_Applications-value/versions/1 |jq

这是它的输出:

{
"subject": "source-jdbc-LDW_Applications-value",
"version": 1,
"id": 9,
"schema": "{\"type\":\"record\",\"name\":\"Application\",\"namespace\":\"com.baydynamics.avro\",\"fields\":[{\"name\":\"ApplicationID\",\"type\":\"long\"},{\"name\":\"Name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"Description\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"Group\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"OwnerUserID\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"RiskScore\",\"type\":[\"null\",{\"type\":\"int\",\"connect.type\":\"int16\"}],\"default\":null},{\"name\":\"RiskRating\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ServiceLevelTierID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"LossPotentialID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ConfidentialityRequirementID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"IntegrityRequirementID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"AvailabilityRequirementID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ApplicationCategoryID\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.version\":1,\"connect.name\":\"com.baydynamics.avro.Application\"}"
}

要使该架构更漂亮:

{
"type":"record",
"name":"Application",
"namespace":"com.baydynamics.avro",
"fields":[
{
"name":"ApplicationID",
"type":"long"
},
{
"name":"Name",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"Description",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"Group",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"OwnerUserID",
"type":[
"null",
"long"
],
"default":null
},
{
"name":"RiskScore",
"type":[
"null",
{
"type":"int",
"connect.type":"int16"
}
],
"default":null
},
{
"name":"RiskRating",
"type":[
"null",
"string"
],
"default":null
},
{
"name":"ServiceLevelTierID",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"LossPotentialID",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"ConfidentialityRequirementID",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"IntegrityRequirementID",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"AvailabilityRequirementID",
"type":[
"null",
"int"
],
"default":null
},
{
"name":"ApplicationCategoryID",
"type":[
"null",
"long"
],
"default":null
}
],
"connect.version":1,
"connect.name":"com.baydynamics.avro.Application"
}

再一次,我没有看到任何东西表明上面的任何特定字段将是记录的键。

然后我进入了 Kafka Streams,我尝试将这些数据带入 KStream 中……然后它爆炸了……

final KStream<Long, Application> applicationStream = builder.stream(Serdes.Long(), applicationSerde, VULNERABILITY_TOPIC);

所以,事情是这样的,因为我知道幕后存储的数据在 SQL Server 中是一个 BIGINT,在 Java 中映射到一个 LONG,我将 KStream 的键类型设为 Long,然后我使用 Serdes.Long( ) 解串器作为 KStream 构建器的参数。

调试时,我看到原始记录的长度为 7,这就是它抛出错误的原因。显然 Avro 以一种更好压缩的方式序列化事物?我不知道。无论如何,问题是我什至不知道它认为它实际使用的是什么 key ?!那么谁知道呢——也许我对 Long 的假设是不正确的,因为它实际上并没有使用 ApplicationID 作为键?为什么我什至会认为它是?!

如有任何帮助,我们将不胜感激。我知道那里有很多信息,但简而言之..

  1. 使用 JDBC Kafka 连接将数据推送到主题
  2. 数据正在进入主题 - 我可以通过控制台看到它
  3. 尝试将该数据推送到流中,以便我可以对数据做一些很棒的事情,但由于 Serdes 与 Avro 记录不兼容,它在尝试填充流时失败了

更新 1:根据下面 Randall 的建议,我尝试了 SMT(单消息转换),现在我的每条记录都有一个 key ,这是朝着正确方向迈出的极好的一步,但出于某种原因,似乎并没有强制执行转换为 Long (INT64) 具有任何实际效果。我用 SMT 截取了连接器配置的一些屏幕截图,生成的记录(现在有一个 key !)和我在 Kafka 流中看到的相同错误: Screenshots mentioned above

最佳答案

Confluent JDBC source connector不生成带有键的记录。 feature request添加此支持已被记录。

与此同时,您可以使用单个消息转换从值中提取一些字段来创建键。内置 ValueToKey transform正是这样做的。 This blog post有一个 SMT 的例子。

关于apache-kafka - Kafka Streams JDBC Source Long 不兼容,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48504992/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com