gpt4 book ai didi

database - 同时访问数据库资源的算法

转载 作者:塔克拉玛干 更新时间:2023-11-03 05:22:45 25 4
gpt4 key购买 nike

不久前,我们实现了一个仓库管理应用程序,它可以跟踪我们在商店中拥有的每个产品的数量。我们解决了用数据库锁(select for update)并发访问数据的问题,但是当许多客户机试图从同一个存储中消费产品数量时,这种方法会导致性能低下。注意,我们只管理一小部分产品类型(少于10个),因此并发程度可能很高(而且,我们不关心库存重新填充)。我们曾想过将每个资源量分成更小的“bucket”,但这种方法可能会导致尝试消耗大于每个bucket容量的数量的客户机出现饥饿:我们应该管理bucket合并等等……
我的问题是:这个问题有一些被广泛接受的解决方案吗?我也在找学术文章,但题目似乎太宽泛了。
第1页:
我们的应用程序在集群环境中运行,因此我们不能依赖应用程序并发控制这个问题的目的是寻找一种算法,这种算法以不同于单行的方式构造和管理数据,但是保留了db事务(不管是否使用锁)所具有的所有优点。
P.S.2:对于您的信息,我们管理大量类似的仓库,示例集中在单个仓库上,但我们将所有数据保存在一个数据库中(价格都是相同的,等等)。

最佳答案

编辑:如果使用可以在多个进程/服务器(例如RabbitMQ)之间协调的排队程序,则下面的设置仍将在群集上工作。
您还可以使用更简单的队列算法,该算法只使用数据库,但缺点是它需要轮询(而rabbitmq这样的系统允许线程阻塞,直到消息可用)。创建一个请求表,其中有一列作为主键的uniquerequestIds(例如,一个随机uuid)、一个timestamp列、一个respourceType列和一个整数requestedQuantity列。您还需要一个logs表,其中一个惟一的requestId列作为主键,一个timestamp列,一个resourceType列,一个整数requestQuantity列,以及一个boolean/tinyint/whateversuccess列。
当客户机请求一定数量的resourcex时,它会生成一个随机的uuid,并使用uuid作为requestid向requests表添加一行,然后轮询logs表中的requestid。如果success列为true,则请求成功,否则失败。
带有数据库的服务器为每个资源分配一个线程或进程,例如processx负责resourcex。ProcessX从Requests表中检索resourceType = ResourceX中按时间戳排序的所有行,然后从请求中删除它们;然后按顺序处理每个请求,为每个成功的请求递减一个内存计数器,并在处理请求结束时更新Resources表中ResourceX的数量然后,它将每个请求及其success状态写入日志表。然后,它再次从requestType = RequestX的请求中检索所有请求,等等。
使用autoincrement整数作为请求主键,并让ProcessX按主键排序而不是按时间戳排序,可能会稍微高效一些。
一个选项是为每个资源分配一个DAOThread-这个线程是唯一访问该资源的数据库表的线程,因此在数据库级别没有锁定Workers(例如web会话)使用并发队列请求资源量-下面的示例使用javaBlockingQueue,但是大多数语言都有某种可以使用的并发队列实现。

public class Request {
final int value;
final BlockingQueue<ReturnMessage> queue;
}

public class ReturnMessage {
final int value;
final String resourceType;
final boolean isSuccess;
}

public class DAOThread implements Runnable {
private final int MAX_CHANGES = 10;
private String resourceType;
private int quantity;
private int changeCount = 0;
private DBTable table;
private BlockingQueue<Request> queue;

public DAOThread(DBTable table, BlockingQueue<Request> queue) {
this.table = table;
this.resourceType = table.select("resource_type");
this.quantity = table.select("quantity");
this.queue = queue;
}

public void run() {
while(true) {
Requester request = queue.take();
if(request.value <= quantity) {
quantity -= request.value;
if(++changeCount > MAX_CHANGES) {
changeCount = 0;
table.update("quantity", quantity);
}
request.queue.offer(new ReturnMessage(request.value, resourceType, true));
} else {
request.queue.offer(new ReturnMessage(request.value, resourceType, false));
}
}
}
}

public class Worker {
final Map<String, BlockingQueue<Request>> dbMap;
final SynchronousQueue<ReturnMessage> queue = new SynchronousQueue<>();

public class WorkerThread(Map<String, BlockingQueue<Request>> dbMap) {
this.dbMap = dbMap;
}

public boolean request(String resourceType, int value) {
dbMap.get(resourceType).offer(new Request(value, queue));
return queue.take();
}
}

工作线程将资源请求发送到相应的DAOThread队列;DAOThread按顺序处理这些请求,如果请求的值不超过数量,则更新本地资源数量并返回成功,否则保持数量不变并返回失败数据库只有在十次更新之后才会更新,以减少IO的数量;最大值更改越大,从系统故障中恢复就越复杂。您还可以有一个专用的IOThread来完成所有的数据库写操作,这样您就不需要重复任何日志记录或计时(例如,应该有一个计时器,每隔几秒钟就将当前数量刷新到数据库)。
worker使用synchronousqueue等待来自daothread的响应(synchronousqueue是一个只能容纳一个项的blockingqueue);如果worker正在其自己的线程中运行,则您可能希望将其替换为标准的多项blockingqueue,以便worker可以按任何顺序处理返回消息。
有些数据库(如 Riak数据库)对计数器具有本机支持,因此这可能会改进您的IO思想,减少或消除对最大值更改的需要。
通过引入 BufferThreads将请求缓冲到 DAOThreads,可以进一步提高吞吐量。
public class BufferThread implements Runnable {
final SynchronousQueue<ReturnMessage> returnQueue = new SynchronousQueue<>();
final int BUFFERSIZE = 10;

private DAOThread daoThread;
private BlockingQueue<Request> queue;
private ArrayList<Request> buffer = new ArrayList<>(BUFFERSIZE);
private int tempTotal = 0;

public BufferThread(DAOThread daoThread, BlockingQueue<Request> queue) {
this.daoThread = daoThread;
this.queue = queue;
}

public void run() {
while(true) {
Request request = queue.poll(100, TimeUnit.MILLISECONDS);
if(request != null) {
tempTotal += request.value;
buffer.add(request);
}
if(buffer.size() == BUFFERSIZE || request == null) {
daoThread.queue.offer(new Request(tempTotal, returnQueue));
ReturnMessage message = returnQueue.take();
if(message.isSuccess()) {
for(Request request: buffer) {
request.queue.offer(new ReturnMessage(request.value, daoThread.resourceType, message.isSuccess));
}
} else {
// send unbuffered requests to DAOThread to see if any can be satisfied
for(Request request: buffer) {
daoThread.queue.offer(request);
}
}
buffer.clear();
tempTotal = 0;
}
}
}
}

工作线程将其请求发送到BufferThreads,然后它们等待缓冲 BUFFERSIZE请求或等待100ms请求通过缓冲区( Request request = queue.poll(100, TimeUnit.MILLISECONDS)),此时它们将缓冲消息转发到 DAOThread每个 DAOThread可以有多个缓冲区,而不是向工作区发送 Map<String, BlockingQueue<Request>>,而是向每个 Map<String, ArrayList<BlockingQueue<Request>>>发送一个队列,工作区使用计数器或随机数生成器来确定要向哪个 BufferThread发送请求请注意,如果 BufferThread太大和/或您的 BUFFERSIZE太多,那么当工作人员等待缓冲区填满时,他们将遭受长时间的暂停。

关于database - 同时访问数据库资源的算法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26944277/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com