gpt4 book ai didi

C# 将数据插入 SQL 数据库的最快方法

转载 作者:太空狗 更新时间:2023-10-29 17:54:46 25 4
gpt4 key购买 nike

我正在从外部源(通过 Lightstreamer)接收(流)数据到我的 C# 应用程序中。我的 C# 应用程序从监听器接收数据。来自监听器的数据存储在队列(ConcurrentQueue)中。使用 TryDequeue 将队列每 0.5 秒清理一次到数据表中。然后使用 SqlBulkCopy 将 DataTable 复制到 SQL 数据库中。SQL 数据库将从登台表到达的新数据处理到最终表中。我目前每天收到大约 300'000 行(在接下来的几周内可能会大幅增加),我的目标是从收到数据到它们在最终 SQL 表中可用为止的时间保持在 1 秒以内。目前,我每秒必须处理的最大行数约为 50 行。

不幸的是,由于接收到越来越多的数据,我的逻辑性能越来越慢(仍然远低于 1 秒,但我想继续改进)。主要瓶颈(到目前为止)是将暂存数据(在 SQL 数据库上)处理到最终表中。为了提高性能,我想将临时表切换为内存优化表。最终表已经是内存优化表,因此它们肯定可以很好地协同工作。

我的问题:

  1. 有没有办法对内存优化表使用 SqlBulkCopy(C# 之外)? (据我所知目前还没有办法)
  2. 对于将从我的 C# 应用程序接收到的数据写入内存优化暂存表的最快方法有什么建议吗?

编辑(带解决方案):

在评论/答案和性能评估之后,我决定放弃批量插入并使用 SQLCommand 将我的数据作为表值参数的 IEnumerable 移交到 native 编译存储过程中,以将数据直接存储在我的内存优化中最终表(以及现在用作存档的“暂存”表的副本)。性能显着提高(甚至我还没有考虑并行插入(将在稍后阶段))。

部分代码如下:

内存优化的自定义表类型(将数据从C#移交给SQL(存储过程):

CREATE TYPE [Staging].[CityIndexIntradayLivePrices] AS TABLE(
[CityIndexInstrumentID] [int] NOT NULL,
[CityIndexTimeStamp] [bigint] NOT NULL,
[BidPrice] [numeric](18, 8) NOT NULL,
[AskPrice] [numeric](18, 8) NOT NULL,
INDEX [IndexCityIndexIntradayLivePrices] NONCLUSTERED
(
[CityIndexInstrumentID] ASC,
[CityIndexTimeStamp] ASC,
[BidPrice] ASC,
[AskPrice] ASC
)
)
WITH ( MEMORY_OPTIMIZED = ON )

用于将数据插入最终表和暂存(在本例中用作存档)的 native 编译存储过程:

create procedure [Staging].[spProcessCityIndexIntradayLivePricesStaging]
(
@ProcessingID int,
@CityIndexIntradayLivePrices Staging.CityIndexIntradayLivePrices readonly
)
with native_compilation, schemabinding, execute as owner
as
begin atomic
with (transaction isolation level=snapshot, language=N'us_english')


-- store prices

insert into TimeSeries.CityIndexIntradayLivePrices
(
ObjectID,
PerDateTime,
BidPrice,
AskPrice,
ProcessingID
)
select Objects.ObjectID,
CityIndexTimeStamp,
CityIndexIntradayLivePricesStaging.BidPrice,
CityIndexIntradayLivePricesStaging.AskPrice,
@ProcessingID
from @CityIndexIntradayLivePrices CityIndexIntradayLivePricesStaging,
Objects.Objects
where Objects.CityIndexInstrumentID = CityIndexIntradayLivePricesStaging.CityIndexInstrumentID


-- store data in staging table

insert into Staging.CityIndexIntradayLivePricesStaging
(
ImportProcessingID,
CityIndexInstrumentID,
CityIndexTimeStamp,
BidPrice,
AskPrice
)
select @ProcessingID,
CityIndexInstrumentID,
CityIndexTimeStamp,
BidPrice,
AskPrice
from @CityIndexIntradayLivePrices


end

IEnumerable 填充了来自队列的:

private static IEnumerable<SqlDataRecord> CreateSqlDataRecords()
{


// set columns (the sequence is important as the sequence will be accordingly to the sequence of columns in the table-value parameter)

SqlMetaData MetaDataCol1;
SqlMetaData MetaDataCol2;
SqlMetaData MetaDataCol3;
SqlMetaData MetaDataCol4;

MetaDataCol1 = new SqlMetaData("CityIndexInstrumentID", SqlDbType.Int);
MetaDataCol2 = new SqlMetaData("CityIndexTimeStamp", SqlDbType.BigInt);
MetaDataCol3 = new SqlMetaData("BidPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale
MetaDataCol4 = new SqlMetaData("AskPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale


// define sql data record with the columns

SqlDataRecord DataRecord = new SqlDataRecord(new SqlMetaData[] { MetaDataCol1, MetaDataCol2, MetaDataCol3, MetaDataCol4 });


// remove each price row from queue and add it to the sql data record

LightstreamerAPI.PriceDTO PriceDTO = new LightstreamerAPI.PriceDTO();

while (IntradayQuotesQueue.TryDequeue(out PriceDTO))
{

DataRecord.SetInt32(0, PriceDTO.MarketID); // city index market id
DataRecord.SetInt64(1, Convert.ToInt64((PriceDTO.TickDate.Replace(@"\/Date(", "")).Replace(@")\/", ""))); // @ is used to avoid problem with / as escape sequence)
DataRecord.SetDecimal(2, PriceDTO.Bid); // bid price
DataRecord.SetDecimal(3, PriceDTO.Offer); // ask price

yield return DataRecord;

}


}

每 0.5 秒处理一次数据:

public static void ChildThreadIntradayQuotesHandler(Int32 CityIndexInterfaceProcessingID)
{


try
{

// open new sql connection

using (SqlConnection TimeSeriesDatabaseSQLConnection = new SqlConnection("Data Source=XXX;Initial Catalog=XXX;Integrated Security=SSPI;MultipleActiveResultSets=false"))
{


// open connection

TimeSeriesDatabaseSQLConnection.Open();


// endless loop to keep thread alive

while(true)
{


// ensure queue has rows to process (otherwise no need to continue)

if(IntradayQuotesQueue.Count > 0)
{


// define stored procedure for sql command

SqlCommand InsertCommand = new SqlCommand("Staging.spProcessCityIndexIntradayLivePricesStaging", TimeSeriesDatabaseSQLConnection);


// set command type to stored procedure

InsertCommand.CommandType = CommandType.StoredProcedure;


// define sql parameters (table-value parameter gets data from CreateSqlDataRecords())

SqlParameter ParameterCityIndexIntradayLivePrices = InsertCommand.Parameters.AddWithValue("@CityIndexIntradayLivePrices", CreateSqlDataRecords()); // table-valued parameter
SqlParameter ParameterProcessingID = InsertCommand.Parameters.AddWithValue("@ProcessingID", CityIndexInterfaceProcessingID); // processing id parameter


// set sql db type to structured for table-value paramter (structured = special data type for specifying structured data contained in table-valued parameters)

ParameterCityIndexIntradayLivePrices.SqlDbType = SqlDbType.Structured;


// execute stored procedure

InsertCommand.ExecuteNonQuery();


}


// wait 0.5 seconds

Thread.Sleep(500);


}

}

}
catch (Exception e)
{

// handle error (standard error messages and update processing)

ThreadErrorHandling(CityIndexInterfaceProcessingID, "ChildThreadIntradayQuotesHandler (handler stopped now)", e);

};


}

最佳答案

使用 SQL Server 2016(它还不是 RTM,但在内存优化表方面已经比 2014 好很多)。然后使用 memory-optimized table variable或者只是爆炸一大堆 native stored procedure调用一个事务,每个事务执行一次插入,具体取决于您的场景中更快的速度(这会有所不同)。需要注意的几点:

  • 在一个事务中进行多次插入对于节省网络往返次数至关重要。虽然内存操作非常快,但 SQL Server 仍然需要确认每个操作。
  • 根据您生成数据的方式,您可能会发现并行化插入可以加快速度(不要过度;您会很快达到饱和点)。不要在这里自作聪明;杠杆 async/await和/或 Parallel.ForEach .
  • 如果要传递表值参数,最简单的方法是传递 DataTable作为参数值,但这不是最有效的方法——传递一个 IEnumerable<SqlDataRecord>。 .您可以使用迭代器方法生成值,因此只会分配固定数量的内存。

您必须进行一些试验才能找到传递数据的最佳方式;这在很大程度上取决于数据的大小以及获取数据的方式。

关于C# 将数据插入 SQL 数据库的最快方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36725999/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com