gpt4 book ai didi

mysql - 如何使SQL查询线程启动,然后在获得结果之前做其他工作?

转载 作者:行者123 更新时间:2023-12-03 15:00:41 28 4
gpt4 key购买 nike

我有一个程序,可以执行有限形式的多线程。它用Delphi编写,并使用libmysql.dll(C API)访问MySQL服务器。该程序必须处理一长串记录,每条记录大约需要0.1s。将其视为一个大循环。所有数据库访问均由工作线程完成,该工作线程可以预取下一条记录或写入结果,因此主线程不必等待。

在此循环的顶部,我们首先等待预取线程,获取结果,然后让预取线程执行下一条记录的查询。这个想法是预取线程将立即发送查询,并在主线程完成循环时等待结果。

它通常以这种方式工作。但是请注意,没有什么可以确保预取线程立即运行。我发现通常直到主线程循环并开始等待预取时才发送查询。

我通过在启动预取线程后立即调用sleep(0)来解决此问题。这样,主线程放弃了剩余的时间片,希望预取线程现在可以运行并发送查询。然后,该线程将在等待时进入休眠状态,这允许主线程再次运行。
当然,操作系统中运行着更多的线程,但这确实在某种程度上起作用。

我真正想要发生的是主线程发送查询,然后让工作线程等待结果。使用libmysql.dll我打电话

result := mysql_query(p.SqlCon,pChar(p.query));

在工作线程中。相反,我想让主线程调用类似
mysql_threadedquery(p.SqlCon,pChar(p.query),thread);

一旦数据消失,这将交出任务。

有人知道吗?

这确实是一个调度问题,所以我可以尝试以更高的优先级启动预取线程,然后在发送查询后降低其优先级。但是同样,我没有任何mysql调用可以将发送查询与接收结果分开。

也许它在那里,我只是不知道。请赐教。

补充问题:

有谁认为可以通过以比主线程更高的优先级运行预取线程来解决此问题?这个想法是,预取将立即抢占主线程并发送查询。然后它将进入休眠状态,等待服务器回复。同时主线程将运行。

补充:当前实现的细节

该程序对MySQL数据库中包含的数据执行计算。有3300万个项目,每秒添加的数量更多。该程序连续运行,处理新项目,有时还会重新分析旧项目。它从表中获取要分析的项目列表,因此在传递(当前项目)开始时,它知道将需要的下一个项目ID。

由于每个项目都是独立的,因此这是进行多处理的理想目标。最简单的方法是在多台计算机上运行程序的多个实例。该程序通过性能分析,重写和算法重新设计进行了高度优化。但是,如果没有数据不足,则单个实例仍会使用100%的CPU内核。我在两个四核工作站上运行4-8个副本。但是以这种速度,他们必须花时间在MySQL服务器上等待。 (服务器/数据库架构的优化是另一个主题。)

我在该过程中实现了多线程,只是为了避免阻塞SQL调用。这就是为什么我称其为“有限多线程”。辅助线程有一项任务:发送命令并等待结果。 (好的,有两个任务。)

原来,有6个阻止任务与6个表相关联。这些读取数据中的两个,其他四个写入结果。这些足够相似,可以由常见的Task结构定义。指向此Task的指针将传递到线程池管理器,该线程池管理器分配一个线程来执行工作。主线程可以通过任务结构检查任务状态。

这使主线程代码非常简单。当它需要执行Task1时,它会等待Task1不忙,将SQL命令放入Task1中并放手。当Task1不再繁忙时,它将包含结果(如果有)。

写入结果的4个任务是微不足道的。主线程在进入下一个项目时有一个Task写记录。完成该项目后,可确保在开始另一个写入之前已完成之前的写入。

2个阅读线程不那么琐碎。通过将读取传递给线程然后等待结果,将无法获得任何结果。而是,这些任务会预取下一项的数据。因此,执行此阻塞任务的主线程会检查预取是否已完成;如有必要,等待完成预取,然后从任务中获取数据。最后,它使用下一个项目ID重新发出任务。

这个想法是让prefetch任务立即发出查询并等待MySQL服务器。然后,主线程可以处理当前项目,并且在它从下一个项目开始时,它所需的数据就在预取任务中。

这样就完成了线程,线程池,同步,数据结构等工作。一切正常。我剩下的是一个调度问题。

调度问题是这样的:所有的速度提高都是在服务器获取下一个项目时处理当前项目。我们在处理当前项目之前发出预取任务,但是如何保证它开始呢? OS调度程序不知道预取任务立即发出查询很重要,然后它什么也没有做,只有等待。

OS调度程序试图做到“公平”,并允许每个任务在指定的时间片内运行。我最糟糕的情况是:主线程接收其切片并发出预取,然后完成当前项并必须等待下一项。等待释放剩余的时间片,因此调度程序将启动预取线程,该线程将发出查询,然后等待。现在,两个线程都在等待。当服务器指示查询完成时,预取线程重新启动,并请求结果(数据集),然后休眠。服务器提供结果后,预取线程将唤醒,标记“任务已完成”并终止。最后,主线程重新启动并从完成的Task中获取数据。

为了避免这种最坏情况的调度,我需要某种方法来确保在主线程继续处理当前项之前发出预取查询。到目前为止,我已经想到了三种方法来做到这一点:
  • 在发出预取任务之后,主线程立即调用Sleep(0)。这应该放弃其其余时间片。然后,我希望调度程序运行预取线程,该线程将发出查询,然后等待。然后,调度程序应该重新启动主线程(我希望。)听起来很糟糕,但实际上总比没有好。
  • 我可能会以比主线程更高的优先级来发出预取线程。这将导致调度程序立即运行它,即使它必须抢占主线程也是如此。它还可能会产生不良影响。后台工作线程获得更高的优先级似乎是不自然的。
  • 我可能会异步发出查询。即,将发送查询与接收结果分开。这样,我可以让主线程使用mysql_send_query(非阻塞)发送预取并继续当前项目。然后,当需要下一个项目时,它将调用mysql_read_query,它将阻塞直到数据可用为止。

  • 请注意,解决方案3甚至不使用工作线程。这看起来是最好的答案,但是需要重写一些底层代码。我目前正在寻找这种异步客户端-服务器访问的示例。

    我也希望对这些方法有经验的意见。我错过了什么,还是做错了什么?请注意,这都是工作代码。我不是在问如何做,而是要如何做得更好/更快。

    最佳答案

    Still, a single instance utilizes 100% of a CPU core when not data-starved. I run 4-8 copies on two quad-core workstations.



    我在这里有一个概念上的问题。在您的情况下,我将创建一个多进程解决方案,每个进程都在其单线程中完成所有工作,或者我将创建一个多线程解决方案,该解决方案仅限于任何特定计算机上的单个实例。一旦决定使用多个线程并接受增加的复杂性和难以修复的错误的可能性,则应最大程度地利用它们。将单个进程与多个线程一起使用可以使您使用不同数量的线程来读写数据库以及处理数据。线程数甚至可能在程序运行时发生变化,数据库和处理线程的比率也可能会变化。只有在您可以从程序的单个点控制所有线程的情况下,才可以进行这种工作的动态分区,而在多个进程中是不可能的。

    I implemented multi-threading in the process solely to avoid blocking on the SQL calls.



    有了多个过程,就没有必要这样做了。如果您的进程有时不消耗I/O,则它们不占用CPU资源,因此您可能只需要运行比计算机具有内核更多的进程即可。但是,此时您就有问题要知道要生成多少个进程,如果计算机也执行其他工作,则该进程可能会随着时间的流逝而再次发生变化。可以以相对简单的方式使单个进程中的线程解决方案适应不断变化的环境。

    So the threading, a thread pool, the synchronization, data structures, etc. are all done. And that all works. What I'm left with is a Scheduling Problem.



    您应该将其留给操作系统。只需拥有必要的线程池的单个进程即可。类似于以下内容:
  • 许多线程从数据库中读取记录,并将它们添加到具有上限的生产者-消费者队列,该上限在N到2 * N之间,其中N是系统中处理器核心的数量。这些线程将在整个队列上阻塞,并且它们可以具有更高的优先级,因此,它们将被安排为在队列有更多空间并且变得不受阻塞后立即运行。由于大多数情况下它们将在I/O上被阻止,因此它们的较高优先级应该不是问题。
    我不知道线程数是多少,您需要测量。
  • 许多处理线程,可能是系统中每个处理器内核一个。他们将从上一点提到的队列中获取工作项,如果该队列为空,则在该队列的块中。已处理的工作项目应转到另一个队列。
  • 多个线程,它们从第二个队列中获取已处理的工作项并将数据写回到数据库中。为了使第二个队列也应该有一个上限,以免将已处理的数据写回到数据库中不会导致已处理的数据堆积并填满您所有的进程内存空间。

  • 需要确定线程数,但是所有调度将由OS调度程序执行。关键是要有足够的线程来利用所有CPU内核,以及足够数量的辅助线程以使它们保持忙碌并处理其输出。如果这些线程来自池,那么您也可以在运行时自由调整它们的数量。

    Omni Thread Library提供了针对任务,任务池,生产者使用者队列以及实现此目标所需的所有其他解决方案。否则,您可以使用互斥锁编写自己的队列。

    The Scheduling Problem is this: All the speed gain is in processing the current Item while the server is fetching the next Item. We issue the prefetch task before processing the current item, but how do we guarantee that it starts?



    通过给予它更高的优先级。

    The OS scheduler does not know that it's important for the prefetch task to issue the query right away



    它会知道线程是否具有更高的优先级。

    The OS scheduler is trying to be "fair" and allow each task to run for an assigned time slice.



    仅适用于相同优先级的线程。较低优先级的线程不会获得任何CPU切片,而同一进程中的较高优先级的线程可以运行。
    [编辑:这并不完全正确,最后提供了更多信息。但是,它与事实非常接近,可以确保更高优先级的网络线程尽快发送和接收数据。]

    1. Right after issuing the prefetch task, the main thread calls Sleep(0).


    调用 Sleep()是强制线程按特定顺序执行的错误方法。根据它们执行的工作的优先级来设置线程优先级,并在不需要运行更高优先级的线程时使用OS原语来阻止它们。

    I could possibly issue the prefetch thread at a higher priority than the main thread. That should cause the scheduler to run it right away, even if it must preempt the main thread. It may also have undesirable effects. It seems unnatural for a background worker thread to get a higher priority.



    没有什么不自然的。这是使用线程的预期方式。您只需要确保更高优先级的线程迟早会阻塞,并且进入OS进行I/O(文件或网络)的任何线程都会阻塞。在我上面概述的方案中,高优先级线程也会在队列上阻塞。

    I could possibly issue the query asynchronously.



    我不会去那里。当您编写用于多个同时连接的服务器并且每个连接的线程成本过高时,此技术可能是必需的,但是在线程解决方案中阻止网络访问应该可以正常工作。

    编辑:

    感谢Jeroen Pluimers,让戳记可以更深入地了解这一点。正如他在评论中提供的链接中的信息所示,我的发言

    No lower priority thread will get any slice of CPU while a higher priority thread in the same process is runnable.



    是不正确的。长时间未运行的低优先级线程会获得随机优先级提升,即使更高优先级的线程可以运行,它的迟早也会获得CPU份额。有关此的更多信息,请参见 "Priority Inversion and Windows NT Scheduler"

    为了测试这一点,我用Delphi创建了一个简单的演示:
    type
    TForm1 = class(TForm)
    Label1: TLabel;
    Label2: TLabel;
    Label3: TLabel;
    Label4: TLabel;
    Label5: TLabel;
    Label6: TLabel;
    Timer1: TTimer;
    procedure FormCreate(Sender: TObject);
    procedure FormDestroy(Sender: TObject);
    procedure Timer1Timer(Sender: TObject);
    private
    fLoopCounters: array[0..5] of LongWord;
    fThreads: array[0..5] of TThread;
    end;

    var
    Form1: TForm1;

    implementation

    {$R *.DFM}

    // TTestThread

    type
    TTestThread = class(TThread)
    private
    fLoopCounterPtr: PLongWord;
    protected
    procedure Execute; override;
    public
    constructor Create(ALowerPriority: boolean; ALoopCounterPtr: PLongWord);
    end;

    constructor TTestThread.Create(ALowerPriority: boolean;
    ALoopCounterPtr: PLongWord);
    begin
    inherited Create(True);
    if ALowerPriority then
    Priority := tpLower;
    fLoopCounterPtr := ALoopCounterPtr;
    Resume;
    end;

    procedure TTestThread.Execute;
    begin
    while not Terminated do
    InterlockedIncrement(PInteger(fLoopCounterPtr)^);
    end;

    // TForm1

    procedure TForm1.FormCreate(Sender: TObject);
    var
    i: integer;
    begin
    for i := Low(fThreads) to High(fThreads) do
    // fThreads[i] := TTestThread.Create(True, @fLoopCounters[i]);
    fThreads[i] := TTestThread.Create(i >= 4, @fLoopCounters[i]);
    end;

    procedure TForm1.FormDestroy(Sender: TObject);
    var
    i: integer;
    begin
    for i := Low(fThreads) to High(fThreads) do begin
    if fThreads[i] <> nil then
    fThreads[i].Terminate;
    end;
    for i := Low(fThreads) to High(fThreads) do
    fThreads[i].Free;
    end;

    procedure TForm1.Timer1Timer(Sender: TObject);
    begin
    Label1.Caption := IntToStr(fLoopCounters[0]);
    Label2.Caption := IntToStr(fLoopCounters[1]);
    Label3.Caption := IntToStr(fLoopCounters[2]);
    Label4.Caption := IntToStr(fLoopCounters[3]);
    Label5.Caption := IntToStr(fLoopCounters[4]);
    Label6.Caption := IntToStr(fLoopCounters[5]);
    end;

    这将创建6个线程(在我的4核计算机上),或者全部具有较低的优先级,或者具有4个具有正常优先级和2个具有较低优先级的线程。在第一种情况下,所有6个线程都运行,但是CPU时间份额却截然不同:

    在第二种情况下,4个线程在CPU时间份额大致相等的情况下运行,但是其他两个线程也占有CPU的份额很少:

    但是CPU时间所占的份额非常小,远远低于其他线程所获得的百分比。

    回到您的问题:一个使用具有自定义优先级的多个线程并通过生产者-消费者队列耦合的程序应该是一个可行的解决方案。在正常情况下,数据库线程将大部分时间在网络操作或队列上阻塞。 Windows调度程序将确保即使是较低优先级的线程也不会完全饿死。

    关于mysql - 如何使SQL查询线程启动,然后在获得结果之前做其他工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3974746/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com