gpt4 book ai didi

java - 如何在 Avro 中定义 LogicalType。 ( java )

转载 作者:搜寻专家 更新时间:2023-11-01 01:24:46 25 4
gpt4 key购买 nike

我需要能够标记 AVRO 架构中的某些字段,以便在序列化时对它们进行加密。

逻辑类型允许标记字段,并且与自定义转换一起应该允许让它们被 AVRO 透明地加密。


我在查找有关如何在 AVRO 中定义和使用新逻辑类型的文档时遇到了一些问题 (avro_1.8.2#Logical+Types)。
然后我决定在答案中分享我发现的内容,让其他人的生活更轻松,并在我做错事时获得一些反馈。

最佳答案

首先我定义了一个 logicalType 为:

public class EncryptedLogicalType extends LogicalType {
//The key to use as a reference to the type
public static final String ENCRYPTED_LOGICAL_TYPE_NAME = "encrypted";

EncryptedLogicalType() {
super(ENCRYPTED_LOGICAL_TYPE_NAME);
}

@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.BYTES) {
throw new IllegalArgumentException(
"Logical type 'encrypted' must be backed by bytes");
}
}
}

然后是新的转换:

public class EncryptedConversion extends Conversion<ByteBuffer> {
// Construct a unique instance for all the conversion. This have to be changed in case the conversion
// needs some runtime information (e.g.: an encryption key / a tenant_ID). If so, the get() method should
// return the appropriate conversion per key.
private static final EncryptedConversion INSTANCE = new EncryptedConversion();
public static final EncryptedConversion get(){ return INSTANCE; }
private EncryptedConversion(){ super(); }

//This conversion operates on ByteBuffer and returns ByteBuffer
@Override
public Class<ByteBuffer> getConvertedType() { return ByteBuffer.class; }

@Override
public String getLogicalTypeName() { return EncryptedLogicalType.ENCRYPTED_LOGICAL_TYPE_NAME; }

// fromBytes and toBytes have to be overridden as this conversion works on bytes. Other may need to be
// overridden. The types supported need to be updated also in EncryptedLogicalType#validate(Schema schema)
@Override
public ByteBuffer fromBytes(ByteBuffer value, Schema schema, LogicalType type) {
encryptedValue = __encryptionLogic__(value);
return encryptedValue;
}

@Override
public ByteBuffer toBytes(ByteBuffer value, Schema schema, LogicalType type) {
decryptedValue = __decryptionLogic__(value);
return decryptedValue;
}
}

.avsc 架构文件类似于:

{
"name": “MyMessageWithEncryptedField”,
"type": "record",
"fields": [
{"name": "payload","type" : {"type" : "bytes","logicalType" : "encrypted"}},
...

最后,在从架构文件生成的 MyMessageWithEncryptedField.java 类中,我添加了返回转换的方法:

@Override
public Conversion<?> getConversion(int fieldIndex) {
// This allow us to have a more flexible conversion retrieval, so we don't have to code it per field.
Schema fieldSchema = SCHEMA$.getFields().get(fieldIndex).schema();
if ((fieldSchema.getLogicalType() != null)
&& (fieldSchema.getLogicalType().getName() == EncryptedLogicalType.ENCRYPTED_LOGICAL_TYPE_NAME)){
// here we could pass to the get() method a runtime information, e.g.: a tenantId that can be found in the data structure.
return EncryptedConversion.get();
}
return null;
}

为了让它运行,我仍然需要在运行时注册类型:

LogicalTypes.register(EncryptedLogicalType.ENCRYPTED_LOGICAL_TYPE_NAME, new LogicalTypes.LogicalTypeFactory() {
private final LogicalType encryptedLogicalType = new EncryptedLogicalType();
@Override
public LogicalType fromSchema(Schema schema) {
return encryptedLogicalType;
}
});

一些注意事项:

  • 如果您的 logicalType 需要从架构定义中传入一些其他属性,您可以修改 LogicalType 类,以 avro.lang.java.avro.src.main.java.org.apache.avro.LogicalTypes.Decimal 为例
  • 最后一段代码(寄存器)当前在我的逻辑开始之前运行,但我计划将其移动到模式生成类 (MyMessageWithEncryptedField.java) 内的静态 block 中

关于java - 如何在 Avro 中定义 LogicalType。 ( java ),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49034266/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com