gpt4 book ai didi

java - 通过 ByteBuffer 和 CQL 3 将 Java 对象序列化到 Cassandra 1.2

转载 作者:太空狗 更新时间:2023-10-29 22:49:12 24 4
gpt4 key购买 nike

我将下面的代码拼凑在一起,没有做任何复杂的事情——只是创建一个 byte[] 变量,将它写入 Cassandra 中的一个 blob 字段(v1.2,通过新的 Datastax CQL 库),然后再次读回。

当我放入它时它有 3 个元素长,而当我读回它时它有 84 个元素长...!这意味着我实际尝试做的事情(序列化 Java 对象)在尝试时失败并出现 org.apache.commons.lang.SerializationException: java.io.StreamCorruptedException: invalid stream header: 81000008 错误再次反序列化。

下面是一些演示我的问题的示例代码:

import java.nio.ByteBuffer;

import org.apache.commons.lang.SerializationUtils;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;

public class TestCassandraSerialization {


private Cluster cluster;
private Session session;

public TestCassandraSerialization(String node) {
connect(node);
}

private void connect(String node) {
cluster = Cluster.builder().addContactPoint(node).build();
Metadata metadata = cluster.getMetadata();
System.out.printf("Connected to %s\n", metadata.getClusterName());
for (Host host: metadata.getAllHosts()) {
System.out.printf("Datacenter: %s; Host: %s; Rack: %s\n",
host.getDatacenter(), host.getAddress(), host.getRack());
}
session = cluster.connect();
}

public void setUp() {
session.execute("CREATE KEYSPACE test_serialization WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};");

session.execute("CREATE TABLE test_serialization.test_table (id text PRIMARY KEY, data blob)");
}

public void tearDown() {
session.execute("DROP KEYSPACE test_serialization");
}

public void insertIntoTable(String key, byte[] data) {
PreparedStatement statement = session.prepare("INSERT INTO test_serialization.test_table (id,data) VALUES (?, ?)");
BoundStatement boundStatement = new BoundStatement(statement);
session.execute(boundStatement.bind(key,ByteBuffer.wrap(data)));
}

public byte[] readFromTable(String key) {
String q1 = "SELECT * FROM test_serialization.test_table WHERE id = '"+key+"';";

ResultSet results = session.execute(q1);
for (Row row : results) {
ByteBuffer data = row.getBytes("data");
return data.array();
}
return null;
}


public static boolean compareByteArrays(byte[] one, byte[] two) {
if (one.length > two.length) {
byte[] foo = one;
one = two;
two = foo;
}

// so now two is definitely the longer array
for (int i=0; i<one.length; i++) {
//System.out.printf("%d: %s\t%s\n", i, one[i], two[i]);
if (one[i] != two[i]) {
return false;
}
}
return true;
}


public static void main(String[] args) {
TestCassandraSerialization tester = new TestCassandraSerialization("localhost");

try {
tester.setUp();
byte[] dataIn = new byte[]{1,2,3};
tester.insertIntoTable("123", dataIn);
byte[] dataOut = tester.readFromTable("123");

System.out.println(dataIn);
System.out.println(dataOut);

System.out.println(dataIn.length); // prints "3"
System.out.println(dataOut.length); // prints "84"

System.out.println(compareByteArrays(dataIn, dataOut)); // prints false

String toSave = "Hello, world!";
dataIn = SerializationUtils.serialize(toSave);
tester.insertIntoTable("toSave", dataIn);
dataOut = tester.readFromTable("toSave");

System.out.println(dataIn.length); // prints "20"
System.out.println(dataOut.length); // prints "104"


// The below throws org.apache.commons.lang.SerializationException: java.io.StreamCorruptedException: invalid stream header: 81000008
String hasLoaded = (String) SerializationUtils.deserialize(dataOut);
System.out.println(hasLoaded);

} finally {
tester.tearDown();
}
}
}

看起来正确的东西进入了数据库:

cqlsh:flight_cache> select * from test_serialization.test_table;

id | data
--------+--------------------------------------------
123 | 0x010203
toSave | 0xaced000574000d48656c6c6f2c20776f726c6421

cqlsh:flight_cache>

所以它看起来像是读取而不是写入二进制数据时的错误。谁能给我任何关于我做错了什么的指示?

最佳答案

问题几乎可以肯定是因为 ByteBuffer.array() 返回的数组是完整的后备数组,但数据可能只包含在其中的一部分。

返回的有效数据从 ByteBuffer.arrayOffset() 开始,长度为 ByteBuffer.remaining()。要获取仅包含有效数据的字节数组,请在 readFromTable 中使用以下代码:

byte[] result = new byte[data.remaining()];
data.get(result);

然后您的数据就在结果中,您可以返回它。

关于java - 通过 ByteBuffer 和 CQL 3 将 Java 对象序列化到 Cassandra 1.2,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17282361/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com