gpt4 book ai didi

java - Kafka AVRO 消费者 : MySQL Decimal to Java Decimal

转载 作者:行者123 更新时间:2023-11-29 07:29:13 24 4
gpt4 key购买 nike

我正在尝试使用 MySQL 表中的记录,该表包含 3 列 (Axis, Price, lastname) 及其数据类型 (int, decimal(14,4), varchar( 50)) 分别。

我插入了一条包含以下数据的记录 (1, 5.0000, John)

以下 Java 代码(使用 MySQL 连接器在 Confluent 平台中创建的主题中的 AVRO 记录)读取十进制列:Price,作为 java.nio.HeapByteBuffer 类型,所以我无法达到该列的值当我收到它时。

有没有办法将接收到的数据提取或转换为 Java 十进制或 double 据类型?

这是 MySQL 连接器属性文件:-

{
"name": "mysql-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"incrementing.column.name": "Axis",
"tasks.max": "1",
"table.whitelist": "ticket",
"mode": "incrementing",
"topic.prefix": "mysql-",
"name": "mysql-source",
"validate.non.null": "false",
"connection.url": "jdbc:mysql://localhost:3306/ticket?
user=user&password=password"
}
}

这是代码:-

    public static void main(String[] args) throws InterruptedException, 
IOException {

Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");


props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

String topic = "sql-ticket";
final Consumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(props);
consumer.subscribe(Arrays.asList(topic));

try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(100);
for (ConsumerRecord<String, GenericRecord> record : records) {
System.out.printf("value = %s \n", record.value().get("Price"));
}
}
} finally {
consumer.close();
}

}

enter image description here

最佳答案

好吧,我终于找到了解决方案。

Heapbytebuffer 需要转换为 byte[] 数组,然后我使用 BigInteger 从创建的字节数组构造值,然后我创建了一个 BigDecimal 变量,它采用 BigInteger 的值,我用 movePointLeft(4) 设置小数点,这是比例(在我的例子中:4)一切都按预期进行。

    ByteBuffer buf = (ByteBuffer) record.value().get(("Price"));
byte[] arr = new byte[buf.remaining()];
buf.get(arr);
BigInteger bi =new BigInteger(1,arr);
BigDecimal bd = new BigDecimal(bi).movePointLeft(4);
System.out.println(bd);

这是结果(左边是输出,右边是 MySQL):-

enter image description here

关于java - Kafka AVRO 消费者 : MySQL Decimal to Java Decimal,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52319875/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com