- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我有一个方法接受一个参数 Partition
枚举。通过传递 partition
的不同值,此方法将在同一时间段由多个后台线程(最多 15 个)调用。 .这里dataHoldersByPartition
是 Partition
的 map 和 ConcurrentLinkedQueue<DataHolder>
.
private final ImmutableMap<Partition, ConcurrentLinkedQueue<DataHolder>> dataHoldersByPartition;
//... some code to populate entry in `dataHoldersByPartition`
private void validateAndSend(final Partition partition) {
ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
int totalSize = 0;
DataHolder dataHolder;
while ((dataHolder = dataHolders.poll()) != null) {
byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
if (clientKeyBytes.length > 255)
continue;
byte[] processBytes = dataHolder.getProcessBytes();
int clientKeyLength = clientKeyBytes.length;
int processBytesLength = processBytes.length;
int additionalLength = clientKeyLength + processBytesLength;
if (totalSize + additionalLength > 50000) {
Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);
// here size of `message.serialize()` byte array should always be less than 50k at all cost
sendToDatabase(message.getAddress(), message.serialize());
clientKeyBytesAndProcessBytesHolder = new HashMap<>();
totalSize = 0;
}
clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
totalSize += additionalLength;
}
// calling again with remaining values only if clientKeyBytesAndProcessBytesHolder is not empty
if(!clientKeyBytesAndProcessBytesHolder.isEmpty()) {
Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
// here size of `message.serialize()` byte array should always be less than 50k at all cost
sendToDatabase(message.getAddress(), message.serialize());
}
}
下面是我的Message
类:
public final class Message {
private final byte dataCenter;
private final byte recordVersion;
private final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder;
private final long address;
private final long addressFrom;
private final long addressOrigin;
private final byte recordsPartition;
private final byte replicated;
public Message(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder, Partition recordPartition) {
this.clientKeyBytesAndProcessBytesHolder = clientKeyBytesAndProcessBytesHolder;
this.recordsPartition = (byte) recordPartition.getPartition();
this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
this.recordVersion = 1;
this.replicated = 0;
long packedAddress = new Data().packAddress();
this.address = packedAddress;
this.addressFrom = 0L;
this.addressOrigin = packedAddress;
}
// Output of this method should always be less than 50k always
public byte[] serialize() {
int bufferCapacity = getBufferCapacity(clientKeyBytesAndProcessBytesHolder); // 36 + dataSize + 1 + 1 + keyLength + 8 + 2;
ByteBuffer byteBuffer = ByteBuffer.allocate(bufferCapacity).order(ByteOrder.BIG_ENDIAN);
// header layout
byteBuffer.put(dataCenter).put(recordVersion).putInt(clientKeyBytesAndProcessBytesHolder.size())
.putInt(bufferCapacity).putLong(address).putLong(addressFrom).putLong(addressOrigin)
.put(recordsPartition).put(replicated);
// now the data layout
for (Map.Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
byte keyType = 0;
byte[] key = entry.getKey();
byte[] value = entry.getValue();
byte keyLength = (byte) key.length;
short valueLength = (short) value.length;
ByteBuffer dataBuffer = ByteBuffer.wrap(value);
long timestamp = valueLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
byteBuffer.put(keyType).put(keyLength).put(key).putLong(timestamp).putShort(valueLength)
.put(value);
}
return byteBuffer.array();
}
private int getBufferCapacity(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) {
int size = 36;
for (Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) {
size += 1 + 1 + 8 + 2;
size += entry.getKey().length;
size += entry.getValue().length;
}
return size;
}
// getters and to string method here
}
基本上,我必须确保每当 sendToDatabase
方法被调用,大小为 message.serialize()
无论如何,字节数组都应该小于 50k。我的sendToDatabase
方法发送来自 serialize
的字节数组方法。由于这种情况,我正在做以下验证以及其他一些事情。在方法中,我将迭代 dataHolders
CLQ 和我将提取 clientKeyBytes
和 processBytes
从中。这是我正在做的验证:
clientKeyBytes
如果长度大于 255,那么我将跳过它并继续迭代。totalSize
变量将是 clientKeyLength
的总和和 processBytesLength
,还有这个 totalSize
长度应始终小于 50000 字节。clientKeyBytesAndProcessBytesHolder
映射到 sendToDatabase
方法并清除 map ,重置totalSize
到 0 并重新开始填充。dataHolders
变空了,然后它会发送它有的任何东西。我相信我当前的代码中存在一些错误,因为我的情况可能导致某些记录未正确发送或丢失在某处,我无法解决这个问题。看起来要正确地达到这个 50k 条件我可能必须使用 getBufferCapacity
在调用 sendToDatabase
之前正确计算大小的方法方法?
最佳答案
我检查了你的代码,按照你的逻辑看起来不错。正如您所说,它将始终存储小于 50K 的信息,但实际上它会存储直到 50K 的信息。要使其小于 50K,您必须将 if 条件更改为 if (totalSize + additionalLength >= 50000)
。
如果您的代码仍然不能满足您的要求,即当 totalSize + additionalLength
大于 50k 时存储信息,我可以建议您一些想法。
由于有超过 50 个线程调用此方法,您需要考虑将代码中的两个部分同步。一个是全局变量,它是一个容器 dataHoldersByPartition
对象。如果在此容器对象中发生多个并发和并行搜索,结果可能并不完美。只需检查容器类型是否同步。如果没有像下面这样制作这个 block :-
synchronized(this){
ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
}
现在,我只能给出两个建议来解决这个问题。一个是代替 if (totalSize + additionalLength > 50000)
你可以检查对象的大小 clientKeyBytesAndProcessBytesHolder
if(sizeof(clientKeyBytesAndProcessBytesHolder) >= 50000)
(检查 java 中 sizeof 的适当方法)。第二个是缩小范围以检查它是否是多线程的副作用。所有这些建议都是为了找出问题所在的区域,并且只应从您的角度进行修复。
首先检查您的方法 validateAndSend
是否完全满足您的要求。为此,首先同步整个 validateAndSend
方法并检查是否一切正常或仍然有相同的结果。如果仍然有相同的结果,这意味着它不是因为多线程,而是您的编码不符合要求。如果它工作正常,则意味着它是多线程的问题。如果方法同步正在解决您的问题但降低了性能,您只需从中删除同步并集中可能导致问题的代码的每个小块并使其同步块(synchronized block)并在仍未解决您的问题时将其删除。这样,您最终找到了实际造成问题的代码块,并将其保留为同步以最终修复它。
例如第一次尝试:-
`private synchronize void validateAndSend`
第二次尝试:从方法中删除同步关键字并执行以下步骤:-
synchronize(this){
Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);
sendToDatabase(message.getAddress(), message.serialize());
}
如果您认为我没有正确理解您的意思,请告诉我。
关于java - 以大小受限的 block 将数据发送到数据库,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46899081/
我的问题是如何在 python 中创建一个简单的数据库。我的例子是: User = { 'Name' : {'Firstname', 'Lastname'}, 'Address' : {'Street
我需要创建一个与远程数据库链接的应用程序! mysql 是最好的解决方案吗? Sqlite 是唯一的本地解决方案吗? 我使用下面的方法,我想知道它是否是最好的方法! NSString *evento
给定两台 MySQL 服务器,一台本地,一台远程。两者都有一个包含表 bohica 的数据库 foobar。本地服务器定义了用户 'myadmin'@'%' 和 'myadmin'@'localhos
我有以下灵活的搜索查询 Select {vt:code},{vt:productcode},{vw:code},{vw:productcode} from {abcd AS vt JOIN wxyz
好吧,我的电脑开始运行有点缓慢,所以我重置了 Windows,保留了我的文件。因为我的大脑还没有打开,所以我忘记事先备份我的 MySQL 数据库。我仍然拥有所有原始文件,因此我实际上仍然拥有数据库,但
如何将我的 Access 数据库 (.accdb) 转换为 SQLite 数据库 (.sqlite)? 请,任何帮助将不胜感激。 最佳答案 1)如果要转换 db 的结构,则应使用任何 DB 建模工具:
系统检查发现了一些问题: 警告:?:(mysql.W002)未为数据库连接“默认”设置 MySQL 严格模式 提示:MySQL 的严格模式通过将警告升级为错误来修复 MySQL 中的许多数据完整性问题
系统检查发现了一些问题: 警告:?:(mysql.W002)未为数据库连接“默认”设置 MySQL 严格模式 提示:MySQL 的严格模式通过将警告升级为错误来修复 MySQL 中的许多数据完整性问题
我想在相同的 phonegap 应用程序中使用 android 数据库。 更多说明: 我创建了 phonegap 应用程序,但 phonegap 应用程序不支持服务,所以我们已经在 java 中为 a
Time Tracker function clock() { var mytime = new Date(); var seconds
我需要在现有项目上实现一些事件的显示。我无法更改数据库结构。 在我的 Controller 中,我(从 ajax 请求)传递了一个时间戳,并且我需要显示之前的 8 个事件。因此,如果时间戳是(转换后)
我有一个可以收集和显示各种测量值的产品(不会详细介绍)。正如人们所期望的那样,显示部分是一个数据库+建立在其之上的网站(使用 Symfony)。 但是,我们可能还会创建一个 API 来向第三方公开数据
我们将 SQL Server 从 Azure VM 迁移到 Azure SQL 数据库。 Azure VM 为 DS2_V2、2 核、7GB RAM、最大 6400 IOPS Azure SQL 数据
我正在开发一个使用 MongoDB 数据库的程序,但我想问在通过 Java 执行 SQL 时是否可以使用内部数据库进行测试,例如 H2? 最佳答案 你可以尝试使用Testcontainers Test
已关闭。此问题不符合Stack Overflow guidelines 。目前不接受答案。 已关闭 9 年前。 此问题似乎与 a specific programming problem, a sof
我正在尝试使用 MSI 身份验证(无需用户名和密码)从 Azure 机器学习服务连接 Azure SQL 数据库。 我正在尝试在 Azure 机器学习服务上建立机器学习模型,目的是我需要数据,这就是我
我在我的 MySQL 数据库中使用这个查询来查找 my_column 不为空的所有行: SELECT * FROM my_table WHERE my_column != ""; 不幸的是,许多行在
我有那个基地:http://sqlfiddle.com/#!2/e5a24/2这是 WordPress 默认模式的简写。我已经删除了该示例不需要的字段。 如您所见,我的结果是“类别 1”的两倍。我喜欢
我有一张这样的 table : mysql> select * from users; +--------+----------+------------+-----------+ | userid
我有表: CREATE TABLE IF NOT EXISTS `category` ( `id` int(11) NOT NULL, `name` varchar(255) NOT NULL
我是一名优秀的程序员,十分优秀!