gpt4 book ai didi

java - 以大小受限的 block 将数据发送到数据库

转载 作者:搜寻专家 更新时间:2023-11-01 03:16:54 25 4
gpt4 key购买 nike

我有一个方法接受一个参数 Partition枚举。通过传递 partition 的不同值,此方法将在同一时间段由多个后台线程(最多 15 个)调用。 .这里dataHoldersByPartitionPartition 的 map 和 ConcurrentLinkedQueue<DataHolder> .

  private final ImmutableMap<Partition, ConcurrentLinkedQueue<DataHolder>> dataHoldersByPartition;

//... some code to populate entry in `dataHoldersByPartition`

private void validateAndSend(final Partition partition) {
ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
int totalSize = 0;
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null) {
byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
if (clientKeyBytes.length > 255)
continue;

byte[] processBytes = dataHolder.getProcessBytes();
int clientKeyLength = clientKeyBytes.length;
int processBytesLength = processBytes.length;

int additionalLength = clientKeyLength + processBytesLength;
if (totalSize + additionalLength > 50000) {
Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);
// here size of `message.serialize()` byte array should always be less than 50k at all cost
sendToDatabase(message.getAddress(), message.serialize());
clientKeyBytesAndProcessBytesHolder = new HashMap<>();
totalSize = 0;
}
clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
totalSize += additionalLength;
}
// calling again with remaining values only if clientKeyBytesAndProcessBytesHolder is not empty
if(!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
// here size of `message.serialize()` byte array should always be less than 50k at all cost
sendToDatabase(message.getAddress(), message.serialize());
}
}

下面是我的Message类:

public final class Message {
private final byte dataCenter;
private final byte recordVersion;
private final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte recordsPartition;
private final byte replicated;

public Message(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder, Partition recordPartition) {
this.clientKeyBytesAndProcessBytesHolder = clientKeyBytesAndProcessBytesHolder;
this.recordsPartition = (byte) recordPartition.getPartition();
this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}

// Output of this method should always be less than 50k always
public byte[] serialize() {
int bufferCapacity = getBufferCapacity(clientKeyBytesAndProcessBytesHolder); // 36 + dataSize + 1 + 1 + keyLength + 8 + 2;

ByteBuffer byteBuffer = ByteBuffer.allocate(bufferCapacity).order(ByteOrder.BIG_ENDIAN);
// header layout
byteBuffer.put(dataCenter).put(recordVersion).putInt(clientKeyBytesAndProcessBytesHolder.size())
.putInt(bufferCapacity).putLong(address).putLong(addressFrom).putLong(addressOrigin)
.put(recordsPartition).put(replicated);

// now the data layout
for (Map.Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
byte keyType = 0;
byte[] key = entry.getKey();
byte[] value = entry.getValue();
byte keyLength = (byte) key.length;
short valueLength = (short) value.length;

ByteBuffer dataBuffer = ByteBuffer.wrap(value);
long timestamp = valueLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();

byteBuffer.put(keyType).put(keyLength).put(key).putLong(timestamp).putShort(valueLength)
.put(value);
}
return byteBuffer.array();
}

private int getBufferCapacity(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
int size = 36;
for (Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
size += 1 + 1 + 8 + 2;
size += entry.getKey().length;
size += entry.getValue().length;
}
return size;
}

// getters and to string method here
}

基本上,我必须确保每当 sendToDatabase方法被调用,大小为 message.serialize()无论如何,字节数组都应该小于 50k。我的sendToDatabase方法发送来自 serialize 的字节数组方法。由于这种情况,我正在做以下验证以及其他一些事情。在方法中,我将迭代 dataHolders CLQ 和我将提取 clientKeyBytesprocessBytes从中。这是我正在做的验证:

  • 如果clientKeyBytes如果长度大于 255,那么我将跳过它并继续迭代。
  • 我会继续增加 totalSize变量将是 clientKeyLength 的总和和 processBytesLength ,还有这个 totalSize长度应始终小于 50000 字节。
  • 一旦达到 50000 限制,我将发送 clientKeyBytesAndProcessBytesHolder映射到 sendToDatabase方法并清除 map ,重置totalSize到 0 并重新开始填充。
  • 如果它没有达到那个限制并且dataHolders变空了,然后它会发送它有的任何东西。

我相信我当前的代码中存在一些错误,因为我的情况可能导致某些记录未正确发送或丢失在某处,我无法解决这个问题。看起来要正确地达到这个 50k 条件我可能必须使用 getBufferCapacity在调用 sendToDatabase 之前正确计算大小的方法方法?

最佳答案

我检查了你的代码,按照你的逻辑看起来不错。正如您所说,它将始终存储小于 50K 的信息,但实际上它会存储直到 50K 的信息。要使其小于 50K,您必须将 if 条件更改为 if (totalSize + additionalLength >= 50000)

如果您的代码仍然不能满足您的要求,即当 totalSize + additionalLength 大于 50k 时存储信息,我可以建议您一些想法。

由于有超过 50 个线程调用此方法,您需要考虑将代码中的两个部分同步。一个是全局变量,它是一个容器 dataHoldersByPartition 对象。如果在此容器对象中发生多个并发和并行搜索,结果可能并不完美。只需检查容器类型是否同步。如果没有像下面这样制作这个 block :-

synchronized(this){
ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
}

现在,我只能给出两个建议来解决这个问题。一个是代替 if (totalSize + additionalLength > 50000) 你可以检查对象的大小 clientKeyBytesAndProcessBytesHolder if(sizeof(clientKeyBytesAndProcessBytesHolder) >= 50000)(检查 java 中 sizeof 的适当方法)。第二个是缩小范围以检查它是否是多线程的副作用。所有这些建议都是为了找出问题所在的区域,并且只应从您的角度进行修复。

首先检查您的方法 validateAndSend 是否完全满足您的要求。为此,首先同步整个 validateAndSend 方法并检查是否一切正常或仍然有相同的结果。如果仍然有相同的结果,这意味着它不是因为多线程,而是您的编码不符合要求。如果它工作正常,则意味着它是多线程的问题。如果方法同步正在解决您的问题但降低了性能,您只需从中删除同步并集中可能导致问题的代码的每个小块并使其同步块(synchronized block)并在仍未解决您的问题时将其删除。这样,您最终找到了实际造成问题的代码块,并将其保留为同步以最终修复它。

例如第一次尝试:-

  `private synchronize void validateAndSend`

第二次尝试:从方法中删除同步关键字并执行以下步骤:-

           synchronize(this){
Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);
sendToDatabase(message.getAddress(), message.serialize());
}

如果您认为我没有正确理解您的意思,请告诉我。

关于java - 以大小受限的 block 将数据发送到数据库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46899081/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com