gpt4 book ai didi

c# - 使用生产者/消费者模式和 SqlBulkCopy 将平面文件分块处理到 SQL Server 数据库中

转载 作者:行者123 更新时间:2023-11-30 12:18:31 26 4
gpt4 key购买 nike

希望大家多多包涵。我想提供尽可能多的信息。主要问题是如何创建一个将由多个线程使用的结构(如堆栈),这些线程将弹出一个值并使用它来处理一个大的平面文件,并可能一次又一次地循环直到整个文件被处理。如果一个文件有 100.000 条记录,可以由 5 个线程使用 2.000 行 block 处理然后每个线程将得到 10 个 block 来处理。

我的目标是移动平面文件中的数据(带有 Header...Subheader...Detail、Detail、Detail、...Detail、SubFooter、Subheader...Detail、Detail、Detail、...Detail , 子页脚,Subheader...Detail, Detail, Detail, ...Detail, SubFooter, Footer structure) into OLTP DB that has recovery mode to Simple (possible Full) into 3 tables: 1st representing Subheader's unique key present in Subheader row, 第二个中间表 SubheaderGroup,表示 2000 条记录 block 中的详细信息行分组(需要将 Subheader 的标识 PK 作为其 FK,第三个表示 FK 指向 Subheader PK 的详细信息行。

我正在进行手动事务管理,因为我可以有数万个详细信息行我正在使用一个特殊字段,该字段在加载期间在目标表中设置为 0,然后在文件处理结束时我正在执行事务更新,将此值更改为 1,这可以向其他应用程序发出加载完成的信号。

我想将这个平面文件分成多个相等的部分(行数相同),这些部分可以用多个线程处理,并使用从目标表元数据创建的 IDataReader 使用 SqlBulkCopy 导入。

我想使用生产者/消费者模式(如以下链接中的解释 - pdf 分析和代码示例)来使用带有 SqlBulkCopyOptions.TableLock 选项的 SqlBulkCopy。 http://sqlblog.com/blogs/alberto_ferrari/archive/2009/11/30/sqlbulkcopy-performance-analysis.aspx此模式允许创建多个生产者,并且需要等量的消费者订阅生产者才能使用该行。

在TestSqlBulkCopy项目中,DataProducer.cs文件中有一个模拟产生数千条记录的方法。

public void Produce (DataConsumer consumer, int numberOfRows) {
int bufferSize = 100000;
int numberOfBuffers = numberOfRows / bufferSize;

for (int bufferNumber = 0; bufferNumber < numberOfBuffers; bufferNumber++) {
DataTable buffer = consumer.GetBufferDataTable ();

for (int rowNumber = 0; rowNumber < bufferSize; rowNumber++) {
object[] values = GetRandomRow (consumer);
buffer.Rows.Add (values);
}
consumer.AddBufferDataTable (buffer);
}
}

此方法将在新线程的上下文中执行。我希望这个新线程只读取一个独特的原始平面文件 block ,另一个线程将开始处理下一个 block 。然后,消费者将使用 SqlBulkCopy ADO.NET 类将数据(发送给他们的数据)移动到 SQL Server 数据库。

所以这里的问题是关于主程序规定每个线程应该处理 lineFrom 到 lineTo 的内容,我认为这应该在线程创建期间发生。第二种解决方案可能是线程共享一些结构并使用它们独有的东西(如线程号或序列号)来查找共享结构(可能是堆栈并弹出一个值(在执行时锁定堆栈)然后下一个线程将然后拾取下一个值。主程序将拾取平面文件并确定 block 的大小并创建堆栈。

那么有人可以提供一些代码片段,关于多个线程如何处理一个文件并且只获取该文件的唯一部分的伪代码吗?

谢谢,拉德

最佳答案

对我来说效果最好的是使用队列来保存未处理的工作,并使用字典来跟踪进行中的工作:

  1. 创建一个 worker 类(Class)文件名、起始行和行数并有一个更新方法做数据库插入。传递一个回调方法,工作人员在完成时用来发出信号。
  2. 加载一个带有 worker 实例的队列类,每个 block 一个。
  3. 生成一个调度程序线程,该线程使一个worker 实例,启动它的更新方法,并将 worker 实例添加到字典中,以其线程的 ManagedThreadId 为键。做这个直到你的最大允许线程达到计数,如所述字典.计数。调度员等待直到一个线程结束然后启动另一个。有几种等待方式。
  4. 当每个线程结束时,它的回调从中删除其 ManagedThreadId字典。如果线程退出因为错误(例如连接超时)然后回调可以重新插入工作人员进入队列。这是个好地方更新您的用户界面。
  5. 您的用户界面可以显示事件线程、总进度和每个 block 的时间。它可以让用户调整事件线程的数量、暂停处理、显示错误或提前停止。
  6. 当队列和字典为空时,您就完成了。

作为控制台应用程序的演示代码:

using System;
using System.Collections.Generic;
using System.Threading;

namespace threadtest
{
public delegate void DoneCallbackDelegate(int idArg, bool successArg, string messageArg);

class Program
{
static void Main(string[] args)
{
Supervisor supv = new Supervisor();
supv.LoadQueue();
supv.Dispatch();
}
}

public class Supervisor
{
public Queue<Worker> pendingWork = new Queue<Worker>();
public Dictionary<int, Worker> activeWork = new Dictionary<int, Worker>();

private object pendingLock = new object();
private object activeLock = new object();

private int maxThreads = 200;

public void LoadQueue()
{
for (int i = 0; i < 1000; i++)
{
Worker worker = new Worker();
worker.Callback = new DoneCallbackDelegate(WorkerFinished);
lock (pendingLock)
{
pendingWork.Enqueue(worker);
}
}
}

public void Dispatch()
{
int activeThreadCount;

while (true)
{
lock (activeLock) { activeThreadCount = activeWork.Count; }
while (true)
{
lock (activeLock)
{
if (activeWork.Count == maxThreads) break;
}
lock (pendingWork)
{
if (pendingWork.Count > 0)
{
Worker worker = pendingWork.Dequeue();
Thread thread = new Thread(new ThreadStart(worker.DoWork));
thread.IsBackground = true;
worker.ThreadId = thread.ManagedThreadId;
lock (activeLock) { activeWork.Add(worker.ThreadId, worker); }
thread.Start();
}
else
{
break;
}
}
}
Thread.Sleep(200); // wait to see if any workers are done (many ways to do this)

lock (pendingLock)
lock (activeLock)
{
if ((pendingWork.Count == 0) && (activeWork.Count == 0)) break;
}
}
}

// remove finished threads from activeWork, resubmit if necessary, and update UI
public void WorkerFinished(int idArg, bool successArg, string messageArg)
{
lock (pendingLock)
lock (activeLock)
{
Worker worker = activeWork[idArg];
activeWork.Remove(idArg);
if (!successArg)
{
// check the message or something to see if you should resubmit thread
pendingWork.Enqueue(worker);
}
// update UI
int left = Console.CursorLeft;
int top = Console.CursorTop;
Console.WriteLine(string.Format("pending:{0} active:{1} ", pendingWork.Count, activeWork.Count));
Console.SetCursorPosition(left, top);
}
}
}

public class Worker
{
// this is where you put in your problem-unique stuff
public int ThreadId { get; set; }

DoneCallbackDelegate callback;
public DoneCallbackDelegate Callback { set { callback = value; } }

public void DoWork()
{
try
{
Thread.Sleep(new Random().Next(500, 5000)); // simulate some effort
callback(ThreadId, true, null);
}
catch (Exception ex)
{
callback(ThreadId, false, ex.ToString());
}
}
}
}

关于c# - 使用生产者/消费者模式和 SqlBulkCopy 将平面文件分块处理到 SQL Server 数据库中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2066503/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com