- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我编写了一个类来将 UUID 类型的对象自定义编码为字节,以便在 kafka 和 avro 之间传输。
要使用此类,我将 @AvroEncode(using=UUIDAsBytesEncoding.class)
放在目标对象中的 uuid 变量上方。 (这是由 apache avro reflect 库实现的)
我很难弄清楚如何让我的消费者自动使用自定义解码器。 (还是我必须进去手动解码?)。
这是我的 UUIDAsBytesEncoder 扩展 CustomEncoding 类:
public class UUIDAsBytesEncoding extends CustomEncoding<UUID> {
public UUIDAsBytesEncoding() {
List<Schema> union = Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES));
union.get(1).addProp("CustomEncoding", "UUIDAsBytesEncoding");
schema = Schema.createUnion(union);
}
@Override
protected void write(Object datum, Encoder out) throws IOException {
if(datum != null) {
// encode the position of the data in the union
out.writeLong(1);
// convert uuid to bytes
byte[] bytes = new byte[16];
Conversion.uuidToByteArray(((UUID) datum),bytes,0,16);
// encode length of data
out.writeLong(16);
// write the data
out.writeBytes(bytes);
} else {
// position of null in union
out.writeLong(0);
}
}
@Override
protected UUID read(Object reuse, Decoder in) throws IOException {
System.out.println("READING");
Long size = in.readLong();
Long leastSig = in.readLong();
Long mostSig = in.readLong();
return new UUID(mostSig, leastSig);
}
}
write 方法和编码工作正常,但 read 方法在反序列化时永远不会被调用。我将如何在消费者中实现它?
注册表中的架构如下所示:
{"type":"record","name":"Request","namespace":"xxxxxxx.xxx.xxx","fields":[{"name":"password","type":"string"},{"name":"email","type":"string"},{"name":"id","type":["null",{"type":"bytes","CustomEncoding":"UUIDAsBytesEncoding"}],"default":null}]} `
如果消费者不能自动使用该信息来使用 UUIDAsBytesEncoding 读取方法,那么我如何在我的消费者中找到标有该标签的数据?
我也在使用 confluent schema-registry。
如有任何帮助,我们将不胜感激!
最佳答案
最终找到了解决方案。编码不正确——内置的 writeBytes() 方法会自动为您写入长度。
然后在消费者中,我们必须通过 GenericDatumWriter 写入二进制流,然后使用 ReflectDatumReader 从二进制流中读取。这将自动调用 UUIAsBytesEncoding read() 方法并反序列化 UUID。
我的消费者看起来像这样(作为消费者组执行服务的一部分 walkthrough here ):
/**
* Start a single consumer instance
* This will use the schema built into the IndexedRecord to decode and create key/value for the message
*/
public void run() {
ConsumerIterator it = this.stream.iterator();
while (it.hasNext()) {
MessageAndMetadata messageAndMetadata = it.next();
try {
String key = (String) messageAndMetadata.key();
IndexedRecord value = (IndexedRecord) messageAndMetadata.message();
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
GenericDatumWriter<Object> genericRecordWriter = new GenericDatumWriter<>(value.getSchema());
genericRecordWriter.write(value, EncoderFactory.get().directBinaryEncoder(bytes, null));
ReflectDatumReader<T> reflectDatumReader = new ReflectDatumReader<>(value.getSchema());
T newObject = reflectDatumReader.read(null, DecoderFactory.get().binaryDecoder(bytes.toByteArray(), null));
IOUtils.closeQuietly(bytes);
System.out.println("************CONSUMED: " + key + ": "+ newObject);
} catch(SerializationException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("Shutting down Thread: " + this.threadNumber);
}
然后新的 UUIDAsBytesEncoding 看起来像:
public class UUIDAsBytesEncoding extends CustomEncoding<UUID> {
public UUIDAsBytesEncoding() {
List<Schema> union = Arrays.asList(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.BYTES));
union.get(1).addProp("CustomEncoding", "UUIDAsBytesEncoding");
schema = Schema.createUnion(union);
}
@Override
protected void write(Object datum, Encoder out) throws IOException {
if(datum != null) {
// encode the position of the data in the union
out.writeLong(1);
// convert uuid to bytes
byte[] bytes = new byte[16];
Conversion.uuidToByteArray(((UUID) datum), bytes, 0, 16);
// write the data
out.writeBytes(bytes);
} else {
// position of null in union
out.writeLong(0);
}
}
@Override
protected UUID read(Object reuse, Decoder in) throws IOException {
// get index in union
int index = in.readIndex();
if (index == 1) {
// read in 16 bytes of data
ByteBuffer b = ByteBuffer.allocate(16);
in.readBytes(b);
// convert
UUID uuid = Conversion.byteArrayToUuid(b.array(), 0);
return uuid;
} else {
// no uuid present
return null;
}
}
}
这也是如何实现 CustomEncoding avro 类的示例。当前版本的 avro 没有内置 UUID 序列化程序,因此这是该问题的解决方案。
关于java - Avro 在消费者端通过 kafka 自定义解码 UUID,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31931549/
我试图再次将反射的 UUID 转换回实际的 UUID 对象,但找不到方法,当我打印反射值时它看起来是正确的,但在尝试转换时我找不到方法。 package main import ( "fmt"
我想知道 UUID 是否是唯一的,即使它们是在不同的系统上生成的,这些系统可能采用不同的算法。例如,如果您在 MySQL 和 .Net 中生成了一堆 UUID,碰撞的可能性会更高,还是所有系统都使用完
是否可以一个接一个地创建两个重复的 UUID?我不熟悉 UUID 是如何生成的,但我猜想如果您在同一毫秒内从同一 MAC 地址创建了两个单独的 UUID,那么它们将完全相同。这是真的吗? 我想我是在问
当我使用 python uuid 模块中的 UUID() 函数检查我们的测试 uuid 之一时,我遇到了这种奇怪的行为。 从 uuid 导入 UUID uuid1 = UUID('00000000-0
开始使用 java.util.UUID。我的问题是如果我有两个 UUID 变量,比如 u1 和 u2,并且我想检查它们是否相等,我可以安全地使用表达式 u1 == u2 还是必须编写 u1 .equa
我浏览了 python UUID 模块的文档。 >>> uuid.uuid4() UUID('82fe5629-6680-4b13-a4e3-7a082f10e038') >>> uuid.uuid4
我正在创建一个程序,我在其中大量使用 UUID 来识别用户和组等内容。鉴于 UUID 已经被占用的可能性极低,我是否应该担心发生碰撞的可能性? 最佳答案 这在很大程度上取决于 A)您的要求 B)底层实
您应该使用哪个版本的 UUID?我看到很多帖子解释了每个版本的含义,但我很难弄清楚什么最适合哪些应用程序。 最佳答案 有两种不同的方式生成 UUID。 如果您只需要一个唯一 ID,则需要版本 1 或版
我知道我们可以轻松提取 uuid 版本号。有没有可靠的方法来提取时间戳、MAC 地址等信息? 谢谢! 最佳答案 符合标准的 UUID 可能是多种变体之一,它看起来像这样: AAAAAAAA-BBBB-
我可以干净地使用私有(private) UUID 变体/版本吗? 我使用我基本上认为是大整数的随机 UUID。现在,我想生成一个“私有(private)”UUID,它不基于众所周知的 5 个变体/版本
我已阅读 man 页面,但我不明白 name 和 namespace 的用途。 For version 3 and version 5 UUIDs the additional command lin
我目前正在项目中使用 boost::uuids::uuid,并且我想序列化包含 boost::uuids::uuid 的对象。我尝试了下面的简单示例,但出现错误: /usr/include/boost
我正在使用 Datastax Java 驱动程序在 Cassandra 数据库中执行基本的插入语句。我的主键列是uuid类型。从我在官方文档中看到的,在 Cassandra 中调用 uuid() 函数
会抛出异常吗? UUID() 是否会悄无声息地失败?是否有任何情况下“myStatus”来自 myStatus = True myUUID = uuid.UUID( someWeirdValue )
在我的 Android 应用程序中,我有这种采用 UUID 的方法。不幸的是,当我这样做时: OverviewEvent overviewevent = eventAdapter.getOvervie
我有一个简单的 mongo 迁移框架,它正在执行一些传递给它的脚本。 现在我想将我的 LUUID 迁移到 UUID。我写了以下内容: function fixIds(collectionName) {
我有一个非常奇怪的问题是我得到一个有效的 UUID 不是一个有效的 UUID,例如: 'fd31b6b5-325d-4b65-b496-d7e4d16c8a93' is not a valid UUI
我正在测试 Goa对于一个 API。我想使用 uuid 作为 ID 数据类型。我在 controller.go 中修改了以下函数: // Show runs the show action. func
我有一个包含 uuid 和系统列的表。我需要一个查询来仅返回具有 system=1 的 uuid,而不返回具有 system= 1 和 2 的 uuid 最佳答案 SELECT * FROM
我很想了解在 Avro 中编码一种非常特定类型的数据的最佳实践:UUID。 最佳答案 到目前为止,我发现的唯一方法是定义自定义 UUID: { "namespace" : "your.namesp
我是一名优秀的程序员,十分优秀!