gpt4 book ai didi

java - 设置 STRUCT 类型的默认值时 Kafka Connect API 错误

转载 作者:太空宇宙 更新时间:2023-11-04 09:11:22 25 4
gpt4 key购买 nike

我想设置一个默认值STRUCT,代码如下:

        SchemaBuilder schemaBuilder = SchemaBuilder.struct().name("homeAddress")
.field("province", SchemaBuilder.STRING_SCHEMA)
.field("city", SchemaBuilder.STRING_SCHEMA);
Struct defaultValue = new Struct(schemaBuilder.build())
.put("province", "aaaa")
.put("city", "bbbb");
Schema dataSchema = SchemaBuilder.struct().name("personMessage")
.field("address", schemaBuilder.defaultValue(defaultValue).build()).build();
Struct struct = new Struct(dataSchema);

但是我得到了如下错误

Exception in thread "main" org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
Caused by: org.apache.kafka.connect.errors.DataException: Struct schemas do not match.
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:251)
at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
... 1 more

我挖掘了 ConnectSchema.validateValue 的代码并找到了抛出异常的原因,

value的schema类型是ConnectSchema,但另一个是SchemaBuilder,则抛出异常。

case STRUCT:
Struct struct = (Struct) value;
if (!struct.schema().equals(schema))
throw new DataException("Struct schemas do not match.");
struct.validate();

等于方法

        if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ConnectSchema schema = (ConnectSchema) o;
return Objects.equals(optional, schema.optional) &&
Objects.equals(version, schema.version) &&
Objects.equals(name, schema.name) &&
Objects.equals(doc, schema.doc) &&
Objects.equals(type, schema.type) &&
Objects.deepEquals(defaultValue, schema.defaultValue) &&
Objects.equals(fields, schema.fields) &&
Objects.equals(keySchema, schema.keySchema) &&
Objects.equals(valueSchema, schema.valueSchema) &&
Objects.equals(parameters, schema.parameters);

任何人都可以帮助如何设置类型STRUCT的默认值

下面是方法“defaultValue”的代码:

public SchemaBuilder defaultValue(Object value) {
checkCanSet(DEFAULT_FIELD, defaultValue, value);
checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD);
try {
ConnectSchema.validateValue(this, value);
} catch (DataException e) {
throw new SchemaBuilderException("Invalid default value", e);
}
defaultValue = value;
return this;
}

如果我将 ConnectSchema.validateValue(this, value) 更改为 ConnectSchema.validateValue(this.builder(), value) 似乎就可以了,我不知道其他情况是否可以。

谢谢。

最佳答案

改变这个Struct defaultValue = new Struct(schemaBuilder.build())
Struct defaultValue = new Struct(schemaBuilder)

ConnectSchemaSchemaBuilder 两个类都实现 Schema schemaBuilder.schemaBuilder.build() 将所有值传递给 ConnectSchema 构造函数并返回新的 Schema 对象,并且该对象是不可变的。

看看这个。

System.out.println(struct); //empty 
System.out.println(struct.get("address")); //default values
System.out.println( ((Struct)struct.get("address")).getString("city") ); //default values
System.out.println( ((Struct)struct.get("address")).getString("province") ); //default values

默认值不会重写到结构字段。它们仅存在于架构定义中,如果字段为空,则从架构定义中返回默认值。

来自 Struct.class

  public Object get(Field field) {
Object val = this.values[field.index()];
if (val == null && field.schema().defaultValue() != null) {
val = field.schema().defaultValue();
}

return val;
}

关于java - 设置 STRUCT 类型的默认值时 Kafka Connect API 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59639356/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com