gpt4 book ai didi

C# 使用 DataReader 和 CSV writer 批量/ block 导出 SQL Server 大表

转载 作者:行者123 更新时间:2023-12-05 01:09:19 24 4
gpt4 key购买 nike

我开发了一个 CSV 批处理写入器。但与 BCP 相比,这个过程似乎相当缓慢。我唯一的要求是将没有标识或主键列的大表导出到多个小型 CSV 文件中,并用相应的批处理 id 命名。

BCP 的问题是它只会写入一个大文件。

我当前的流程是:读取数据并使用 CSV 写入器写入内存流我不断检查内存流是否大于特定的批处理大小,然后我会将内存流异步复制到文本文件中。

在没有内存不足异常的情况下,我可以导出 250MB 的批量文件

但与 BCP 导出相比,此过程需要多 5 倍的时间。

有没有比我正在做的更好的方法来实现批量导出到 CSV。

请指教。

最佳答案

我想到了几个选项:

使用 FETCH/OFFSET

如果源查询能够轻松地在 SQL Server 中进行批处理(例如,您可以关闭的聚集索引),则 FETCH 和 OFFSET 基本上是免费的。

如果表是堆,则 FETCH/OFFSET 不是一个真正的选项,但您可能会考虑添加聚集索引,因为没有太多反对这样做的好论据(尽管对于 100 GB 表这样做会很昂贵:)

bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 0 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch1.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 20000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch2.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 40000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch3.csv -S Server -U sa -P Password -w
bcp "SELECT * FROM DemoTable ORDER BY ClusteredKey OFFSET 60000 ROWS FETCH NEXT 20000 ROWS ONLY" queryout fetch4.csv -S Server -U sa -P Password -w

使用 SqlDataReader

根据使用约 1.2 GB 表的测量结果,C# CSV SQL 导出(下图)的简单实现在同一表和系统上实现了 BCP 75% 的性能。 (它还具有 handling the CSV format correctly 关于嵌入式逗号、引号和 CRLF 的好处)。

static void Main(string[] args)
{
var con = new SqlConnection(@"Server=(local);Database=Demo;User Id=sa;Password=bar;");
con.Open();

var sqr = new SqlCommand("SELECT * FROM dbo.Table", con);

using (var reader = sqr.ExecuteReader())
using (var tw = File.CreateText("out.csv"))
{
while (reader.Read())
{
for (int i = 0; i < reader.FieldCount; i++)
{
if (i != 0)
{
tw.Write(',');
}

var val = FormatValue(reader[i]);
if (val == null)
{
// no-op
}
else if (val.IndexOfAny(new[] { '"', ',', '\r', '\n' }) >= 0)
{
tw.Write('"');
tw.Write(val.Replace("\"", "\"\""));
tw.Write('"');
}
else
{
tw.Write(val);
}
}
tw.Write("\r\n");
}
}
}

private static string FormatValue(object v)
{
if (v == null)
{
return null;
}
if (v is DateTime dt)
{
return dt.ToString("O");
}
if (v is DateTimeOffset dto)
{
return dto.ToString("O");
}
if (v is byte[] ba)
{
var sb = new StringBuilder(2 + ba.Length * 2);
sb.Append("0x");
for (int i = 0; i < ba.Length; i++)
{
sb.Append(ba[i].ToString("X2"));
}
return sb.ToString();
}
return v.ToString();
}

性能似乎受到 GC 处理如此多的字符串分配的限制 - 因此,如果需要更高的性能,将其翻译成非 CLR 语言(例如 C++)可能会与 BCP 的性能相匹配。

使用 SSIS

SSIS 可以在一个包中执行所有步骤。确切的步骤可能最好留给另一个问题,但基本上相当于为“文件编号”合成一列并使用平面文件目标。 Bad example of this

使用 SSIS 生成一个大的 CSV,然后拆分它

如果您使用 SSIS(直接或通过 using the Export Data Wizard ),您将获得一个可以拆分的符合 RFC 4180 的 CSV 文件。拆分此类文件的示例工具是:

class Program
{
static void Main(string[] args)
{
int n = 0;
using (var src = File.OpenRead("rfc4180_in.csv"))
using (var dst = new CsvRfc4180SplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))
{
src.CopyTo(dst);
}
}
}

/// <summary>
/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least
/// cutAfterPosition bytes long.
/// </summary>
abstract class SplittingWriteStream : Stream
{
private long _TotalPosition;
private long CurrentStreamPos;
private readonly long CutAfterPosition;
private readonly Func<Stream> StreamCtor;
private Stream CurrentStream;

public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
{
if (cutAfterPosition < 0L)
{
throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));
}
this.CutAfterPosition = cutAfterPosition;

this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));
this.CurrentStream = createStream();
}

protected override void Dispose(bool disposing) => CurrentStream.Dispose();

public override void Flush() => CurrentStream.Flush();

public override void Write(byte[] buffer, int offset, int count)
{
// ignore count to always exceed cutAfterPosition
var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);
if (cutPoint < 0)
{
CurrentStream.Write(buffer, offset, count);
}
else
{
if (cutPoint > 0)
{
CurrentStream.Write(buffer, offset, cutPoint);
}

try
{
CurrentStream.Dispose();
}
finally
{
CurrentStream = null;
CurrentStreamPos = 0L;
CurrentStream = StreamCtor();
}

if (cutPoint != count)
{
CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);
}
}

CurrentStreamPos += count;
_TotalPosition += count;
}

protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);

#region Stream Write-only stubs

public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();

public override long Position
{
get => _TotalPosition;
set => throw new NotSupportedException();
}

#endregion
}

class CsvRfc4180SplittingWriteStream : SplittingWriteStream
{
public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}

bool inQuotedString;
bool lastWasQuote;
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
int? cutPoint = null;
for (int n = 0; n < count; n++)
{
var i = n + offset;
StepState(buffer[i]);

// check for CRLF if desired and not escaped
if (getCutPoint && !inQuotedString && cutPoint == null
&& buffer[i] == '\r' && n + 1 < count && buffer[i + 1] == '\n')
{
cutPoint = n;
}
}

return cutPoint ?? -1;
}

private void StepState(byte v)
{
var isQuote = v == '"';
if (lastWasQuote)
{
lastWasQuote = false;

if (isQuote)
{
// Double quotes:
// nop
// Inside quoted string == literal escape
// Outside quoted string == empty string
}
else
{
// quote with non-quote following == toggle quoted string
inQuotedString ^= true;
}
}
else
{
lastWasQuote = isQuote;
}
}
}

使用 BCP,然后即时拆分

如果需要 BCP,并且它对 CSV 的(糟糕)处理是可以容忍的,它可以写入命名管道流以动态拆分。

class Program
{
static void Main(string[] args)
{
Thread copyThread;
var pipeId = $"bcp_{Guid.NewGuid():n}";
// bcp requires read/write pipe
using (var np = new NamedPipeServerStream(pipeId))
{
copyThread = new Thread(_1 =>
{
np.WaitForConnection();
int n = 0;
// Use CrlfUtf16leSplittingWriteStream with -w (UTF 16 Little Endian)
// Use CrlfUtf8SplittingWriteStream other (UTF 8 / ANSII / ASCII / OEM)
using (var dst = new CrlfUtf16leSplittingWriteStream(() => File.Create($"rfc4180_out{n++}.csv"), 100 /* mb per chunk */ * 1024 * 1024))
{
np.CopyTo(dst);
}
});
copyThread.Name = "Write thread";
copyThread.IsBackground = true;
copyThread.Start();

var bcp = Process.Start(
@"C:\Program Files\Microsoft SQL Server\Client SDK\ODBC\170\Tools\Binn\bcp.exe",
$@"FWDB.Rx.RxBatches out \\.\pipe\{pipeId} -S (local) -U sa -P abc -w -t,");
bcp.WaitForExit();
}
copyThread.Join();
}
}

class CrlfUtf16leSplittingWriteStream : SplittingWriteStream
{
public CrlfUtf16leSplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}

protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
if (getCutPoint)
{
for (int n = 0; n < count - 3 /* CR 00 LF 00 */; n++)
{
var i = n + offset;
if (buffer[i] == '\r' && buffer[i + 1] == 0
&& buffer[i + 2] == '\n' && buffer[i + 3] == 0)
{
// split after CRLF
return n + 4;
}
}
}

return -1;
}
}

class CrlfUtf8SplittingWriteStream : SplittingWriteStream
{
public CrlfUtf8SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}

protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
if (getCutPoint)
{
for (int n = 0; n < count - 1 /* CR LF */; n++)
{
var i = n + offset;
if (buffer[i] == '\r' && buffer[i + 1] == '\n')
{
// split after CRLF
return n + 2;
}
}
}

return -1;
}
}

/// <summary>
/// Abstract class which uses ParseDataGetCutPoint to split the files into streams at least
/// cutAfterPosition bytes long.
/// </summary>
abstract class SplittingWriteStream : Stream
{
private long _TotalPosition;
private long CurrentStreamPos;
private readonly long CutAfterPosition;
private readonly Func<Stream> StreamCtor;
private Stream CurrentStream;

public SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
{
if (cutAfterPosition < 0L)
{
throw new ArgumentOutOfRangeException(nameof(cutAfterPosition));
}
this.CutAfterPosition = cutAfterPosition;

this.StreamCtor = createStream ?? throw new ArgumentNullException(nameof(createStream));
this.CurrentStream = createStream();
}

protected override void Dispose(bool disposing) => CurrentStream.Dispose();

public override void Flush() => CurrentStream.Flush();

public override void Write(byte[] buffer, int offset, int count)
{
// ignore count to always exceed cutAfterPosition
var cutPoint = ParseDataGetCutPoint(buffer, offset, count, getCutPoint: CurrentStreamPos > CutAfterPosition);
if (cutPoint < 0)
{
CurrentStream.Write(buffer, offset, count);
}
else
{
if (cutPoint > 0)
{
CurrentStream.Write(buffer, offset, cutPoint);
}

try
{
CurrentStream.Dispose();
}
finally
{
CurrentStream = null;
CurrentStreamPos = 0L;
CurrentStream = StreamCtor();
}

if (cutPoint != count)
{
CurrentStream.Write(buffer, offset + cutPoint, count - cutPoint);
}
}

CurrentStreamPos += count;
_TotalPosition += count;
}

protected abstract int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint);

#region Stream Write-only stubs

public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();

public override long Position
{
get => _TotalPosition;
set => throw new NotSupportedException();
}

#endregion
}

class CsvRfc4180SplittingWriteStream : SplittingWriteStream
{
public CsvRfc4180SplittingWriteStream(Func<Stream> createStream, long cutAfterPosition)
: base(createStream, cutAfterPosition)
{
}

bool inQuotedString;
bool lastWasQuote;
protected override int ParseDataGetCutPoint(byte[] buffer, int offset, int count, bool getCutPoint)
{
int? cutPoint = null;
for (int n = 0; n < count; n++)
{
var i = n + offset;
StepState(buffer[i]);

// check for CRLF if desired and not escaped
if (getCutPoint && !inQuotedString && cutPoint == null
&& buffer[i] == '\r' && n + 1 < count && buffer[i + 1] == '\n')
{
cutPoint = n;
}
}

return cutPoint ?? -1;
}

private void StepState(byte v)
{
var isQuote = v == '"';
if (lastWasQuote)
{
lastWasQuote = false;

if (isQuote)
{
// Double quotes:
// nop
// Inside quoted string == literal escape
// Outside quoted string == empty string
}
else
{
// quote with non-quote following == toggle quoted string
inQuotedString ^= true;
}
}
else
{
lastWasQuote = isQuote;
}
}
}

关于C# 使用 DataReader 和 CSV writer 批量/ block 导出 SQL Server 大表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65283808/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com