gpt4 book ai didi

multithreading - 多色,多线程加速: reading through CSVs using TMemoryStream

转载 作者:行者123 更新时间:2023-12-03 17:59:09 28 4
gpt4 key购买 nike

我正在编写并行代码来枚举大量 CSV 文件,每个文件都包含历史股票数据(超过 6500 个代码),并计算每只股票是否已达到历史最高点。

我已经实现了一个线程池和 TThread 后代类,以在线程之间平均拆分符号列表,然后将这些线程分配到我的 i7 机器的单独核心。我将线程设置为每个线程在取消挂起之前都拥有它们在创建时所需的所有数据的副本,因此在线程处理时不需要锁定或继续。所有线程完成后,我将每个线程的结果数据汇总到主程序中。

我目前已经使用提到的几个多线程内存管理器测试了我的代码 https://stackoverflow.com/questions/6072269/need-multi-threading-memory-manager/6076407#6076407 .到目前为止,SapMM 似乎是最有效的一种,不会导致访问冲突。

问题在于,添加更多线程并没有相应地加快完成所有高点计算所需的时间。使用 2 个“core-d”线​​程不会将运行时间减少 1/2,但 3 个不会完全减少到 1/3,而 4 个不会减少接近 1/4。

线程数1、2、3、4

预计加速时间(毫米:秒)6:37,3:17 1/2,2:12 1/3,1:39 1/4

实际时间 (mm:ss)6:37,4:07,3:05,2:51

我已经到了需要一些额外的洞察力才能完全加速此操作的地步。我需要弄清楚为什么多核加速会下降,而不仅仅是“在问题的边缘涉猎”。那么,是什么导致此代码停止获得相应的 yield ,我需要做什么才能获得这些 yield ?简而言之,是否有其他方法可以加快我正在做的解析,例如而不是使用 TMemoryStream?

我正在使用的代码如下。

我正在使用 Delphi XE4 Enterprise。

在每个线程中,我循环遍历每个符号并:

  1. 使用TMemoryStream.LoadFromFile加载交易品种的历史数据。
  2. 使用我编写的函数直接从 TMemoryStream 检索交易品种数据的最高价。

(1) 已经过测试并且不需要任何时间(将所有 6500 一次加载到内存中所用的总时间不到 1 秒)。我在 (2) 中使用的过程是一直花费的时间,如下所示:

unit uTest;

implementation

uses
SysUtils, Math, Classes;

type
TDayIndexData = record
Date: TDate;
Open, High, Low, Close, AdjClose,
Volume: extended;
end;

type
TTimeUnit = (tuDay, tuWeek, tuMonth, tuYear);

TTimePeriod = record
Length: integer;
TimeUnit: TTimeUnit;
end;

//#NO CHANGE
const
AllDataPeriodStr = 'All Data';

type
TRatePeriod = record
PeriodStr: string;
TimePeriod: TTimePeriod;
end;

type
TFieldType = (ftDate, ftOpen, ftHigh, ftLow, ftClose, ftVolume, ftAdjClose);

const CSV_DELIM_CHARSET = [#0..#31, ',',#127];

type
TShallowEquityNewHighInfoRetrievalResults = record
Success: boolean;
High: extended;
end;

function ShallowEquityNewHighInfoRetrieval(
AStream: TStream;
ARatePeriod: TRatePeriod;
AGetNormalData: boolean = False): TShallowEquityNewHighInfoRetrievalResults;

var
vStreamSize: int64;

function EOF: boolean;
begin
Result := AStream.Position >= vStreamSize;//AStream.Size;
end;

procedure GotoEOF;
begin
AStream.Seek(0, soFromEnd);
end;

//#OPTIMIZE
//var
//vBuffer: FileString;

type
FileChar = AnsiChar;
FileString = AnsiString;

const
ResultCharSize = SizeOf(FileChar);

var
MRReadChar: FileChar;

procedure ReadNextChar;
begin
if not EOF then
AStream.Read(MRReadChar, SizeOf(MRReadChar)) else
raise EInvalidOperation.Create('Unexpected end of file found');
end;

var
vPossDelimChars: boolean;

procedure SkipExistingDelimChars;
begin
//*INTENTION: prevents redundant SkipDelimChars calls, which is destructive
if not vPossDelimChars then Exit;

//not requiring DelimChars

if EOF then Exit;

repeat
ReadNextChar;
until EOF or not (MRReadChar in CSV_DELIM_CHARSET);

//#*NOTE: technically can be true if EOF,
//but if EOF then CurChar is never used 3/13/2014
vPossDelimChars := False;
end;

function SOF: boolean;
begin
Result := AStream.Position = 0;
end;

function NextChars(ACount: integer): FileString;
begin
//#OPTIMIZE: condition
if ResultCharSize = 1 then
begin
SetLength(Result, Min(ACount, vStreamSize{AStream.Size} - AStream.Position));
AStream.Read(Pointer(Result)^, Length(Result));
AStream.Seek(-Length(Result), soFromCurrent);
end else
begin
SetLength(Result, Min(ACount, (vStreamSize{AStream.Size} - AStream.Position) div ResultCharSize));
AStream.Read(Pointer(Result)^, Length(Result) * ResultCharSize);
AStream.Seek(-Length(Result) * ResultCharSize, soFromCurrent);
end;
end;

procedure GotoNextChars(ACount: integer);
begin
//#OPTIMIZE: condition
if ResultCharSize = 1 then
AStream.Seek(ACount, soFromCurrent) else
AStream.Seek(ACount*SizeOf(FileChar), soFromCurrent);
end;

procedure GotoPrevChars(ACount: integer);
begin
//#OPTIMIZE: condition
if ResultCharSize = 1 then
AStream.Seek(-ACount, soFromCurrent) else
AStream.Seek(-ACount*SizeOf(FileChar), soFromCurrent);
end;

procedure GotoPreceedingEOLN(ForItem: boolean = False);
var
vOrigPos: integer;

const
NMinRowChars = 17;//Length('3-13-13,1,1,1,1,1')

begin
//assumes will not hit SOF
//assumes ending CRLF taken care of by other places
vOrigPos := AStream.Position;

vPossDelimChars := True;

while (NextChars(2) <> #13#10) or (AStream.Position = vOrigPos) do
if (Length(NextChars(2)) = 2) and (NextChars(2)[2] = #10) and
(AStream.Position < vOrigPos - SizeOf(FileChar)) then
begin
GotoNextChars(1);

Exit;
end else
if (AStream.Position = vOrigPos) and ForItem then
GotoPrevChars(NMinRowChars) else
GotoPrevChars(1);
end;

var
CurField: string;
CurCol: integer;

procedure InitParsingState;
begin
//Initialize Parsing State
CurCol := -1;
vPossDelimChars := True;
SkipExistingDelimChars;
vStreamSize := AStream.Size;
end;

procedure BacktrackTo(APos: integer; ASafeMode: boolean = False);
begin
if ASafeMode then
AStream.Seek(Pred(APos), soFromBeginning) else
AStream.Seek(APos, soFromBeginning);

ReadNextChar;
vPossDelimChars := False;
CurCol := Ord(High(TFieldType));
end;

procedure ReadQuotedText;
var
vHadPrevQuoteChar: boolean;
begin
vHadPrevQuoteChar := False;
while MRReadChar = '"' do
begin
if vHadPrevQuoteChar then
CurField := CurField + MRReadChar;
ReadNextChar;

while MRReadChar <> '"' do
begin
CurField := CurField + MRReadChar;
ReadNextChar;
end;

if EOF then
break;

ReadNextChar;
vHadPrevQuoteChar := True;
end;
end;

procedure GetNextFieldValue;
begin
if EOF then Exit;

CurCol := (CurCol+1) mod Succ(Ord(High(TFieldType)));
CurField := '';
if MRReadChar = '"' then
ReadQuotedText else
begin
repeat
CurField := CurField + MRReadChar;
if not EOF then
ReadNextChar;
until EOF or (MRReadChar in CSV_DELIM_CHARSET);
if EOF then
if not (MRReadChar in CSV_DELIM_CHARSET) then
CurField := CurField + MRReadChar;
end;
vPossDelimChars := True;

SkipExistingDelimChars;
end;

var
ColFieldTypes: array [Ord(Low(TFieldType))..Ord(High(TFieldType))] of TFieldType;

procedure ResolveCurColFieldType;
var
vField: string;
begin
vField := LowerCase(CurField);
if vField = 'date' then
ColFieldTypes[CurCol] := ftDate else
if vField = 'open' then
ColFieldTypes[CurCol] := ftOpen else
if vField = 'high' then
ColFieldTypes[CurCol] := ftHigh else
if vField = 'low' then
ColFieldTypes[CurCol] := ftLow else
if vField = 'close' then
ColFieldTypes[CurCol] := ftClose else
if vField = 'volume' then
ColFieldTypes[CurCol] := ftVolume else
if Pos('close', vField) > 0 then
ColFieldTypes[CurCol] := ftAdjClose else
raise EInvalidOperation.Create('Unrecognized file format: unrecognized column name found.');
end;

procedure WriteItemAsFieldValue(var AData: TDayIndexData);
begin
case ColFieldTypes[CurCol] of
ftDate:AData.Date := ExStrToDate(CurField);
ftOpen:AData.Open := StrToFloat(CurField);
ftHigh:AData.High := StrToFloat(CurField);
ftLow:AData.Low := StrToFloat(CurField);
ftClose:AData.Close := StrToFloat(CurField);
ftVolume:AData.Volume := StrToFloat(CurField);
ftAdjClose:AData.AdjClose := StrToFloat(CurField);
end;
end;

procedure VerifyFields;
var
iField: TFieldType;
iColumn: integer;

IsUsedFlags: array [Low(TFieldType)..High(TFieldType)] of boolean;

begin
//* Set all to false
for iField := Low(TFieldType) to High(TFieldType) do
IsUsedFlags[iField] := False;

//* set found to true
for iColumn := Low(ColFieldTypes) to High(ColFieldTypes) do
IsUsedFlags[ColFieldTypes[iColumn]] := True;

//* throw error on first one not found
for iField := Low(TFieldType) to High(TFieldType) do
if not IsUsedFlags[iField] then
begin
raise EInvalidOperation.Create('Bad file format: one or more column names are missing!');
break;
end;
end;

procedure LoadHeader;
var
iField: TFieldType;

begin
for iField := Low(TFieldType) to High(TFieldType) do
begin
GetNextFieldValue;
ResolveCurColFieldType;
end;

VerifyFields;

if EOF then
raise EInvalidOperation.Create('Cannot complete shallow Equity New High Info Retrieval: Not enough Data')
end;

procedure LoadRowInto(var ADayData: TDayIndexData);
var
iField: TFieldType;
begin
for iField := Low(TFieldType) to High(TFieldType) do
begin
GetNextFieldValue;
WriteItemAsFieldValue(ADayData);
end;
end;

var
OrderReversed: boolean;

vTopDay,
vBottomDay,

vFirstDay,
vEarlierDay,
vLastDay: TDayIndexData;

vBeginDate: TDate;

vBeforeLastDayPos,
vFirstDayPos,
vAfterFirstDayPos: integer;

function HasUnprocessedDays: boolean;
begin
//** use Position of stream because we don't always have the first day in the
// file, due to optimization
Result := (
((AStream.Position > vFirstDayPos) and not OrderReversed) or

(((AStream.Position < AStream.Size - SizeOf(FileChar)*Length(#13#10)) or
(AStream.Position < AStream.Size - SizeOf(FileChar)*Length(#10)))
and OrderReversed));
end;

function NotYetCoveredTimePeriod: boolean;
begin
Result :=
(ARatePeriod.PeriodStr = AllDataPeriodStr)
or
(
(ARatePeriod.PeriodStr <> AllDataPeriodStr) and
(vEarlierDay.Date >= vBeginDate)
);
end;

function FoundAllNeededData: boolean;
begin
Result := (
(ARatePeriod.PeriodStr <> AllDataPeriodStr) and
(vEarlierDay.Date <= vBeginDate)
) or
(ARatePeriod.PeriodStr = AllDataPeriodStr);
end;

procedure GotoLastDay;
begin
//** Goto End of File
GotoEOF;

//** Goto Just before Last Day
GotoPreceedingEOLN;
if (AStream.Position = AStream.Size - SizeOf(FileChar)*Length(#13#10)) or
(AStream.Position = AStream.Size - SizeOf(FileChar)*Length(#10)) then
GotoPreceedingEOLN;

SkipExistingDelimChars;
end;

procedure DetermineDataOrder;
begin
//#ASSUMPTION: assume end day at BOTTOM of file if latest data less than 2 days ago
//Problem when NDays = 2 ?

if Trunc(Now) - Trunc(vBottomDay.Date) >= 2 then
begin
//** Get Top Day
BacktrackTo(vFirstDayPos, True);
LoadRowInto(vTopDay);

//** Determine what order the data is in
OrderReversed := vBottomDay.Date < vTopDay.Date;

if not OrderReversed then
BacktrackTo(vBeforeLastDayPos, True);

if OrderReversed then
vFirstDay := vBottomDay else
vFirstDay := vTopDay;

if OrderReversed then
vLastDay := vTopDay else
vLastDay := vBottomDay;
end else
begin
OrderReversed := False;

//vLastDay := vTopDay;
vLastDay := vBottomDay;
end;
end;

procedure LoadPrevRow;
var
vBeforeDayPos: integer;

begin
GotoPreceedingEOLN(True);

vBeforeDayPos := AStream.Position;

SkipExistingDelimChars;
LoadRowInto(vEarlierDay);

AStream.Seek(vBeforeDayPos, soFromBeginning);
end;

begin
//* Initialize
Result.Success := False;
AStream.Seek(0, soFromBeginning);
InitParsingState;

//** Load CSV Header
LoadHeader;
vFirstDayPos := AStream.Position;

//** Get Last Day
GotoLastDay;
vBeforeLastDayPos := AStream.Position;
LoadRowInto(vBottomDay);

//** IF Only 1 Data Day:
if vFirstDayPos = vBeforeLastDayPos then
begin
//return results
Result.Success := True;
Result.High := vBottomDay.High;
Exit;
end;

//** Go back to Last Day in File
BacktrackTo(vBeforeLastDayPos);

//** Determine what order the data is in
DetermineDataOrder;

//** Determine Date to scan back to if opted for
if ARatePeriod.PeriodStr <> AllDataPeriodStr then
vBeginDate := MoveDateBack(vLastDay.Date, ARatePeriod.TimePeriod);

//* Initialize Loop Variables
Result.High := vLastDay.High;
vEarlierDay := vLastDay;

while HasUnProcessedDays and NotYetCoveredTimePeriod do
begin
//** Goto Previous Day's Row
if OrderReversed then
LoadRowInto(vEarlierDay) else
LoadPrevRow;

//** Update High
if NotYetCoveredTimePeriod then
Result.High := Max(Result.High, vEarlierDay.High);
end;

Result.Success := FoundAllNeededData;
end;

end.

下面是一个 CSV 示例。请注意,有时 CSV 行项目在文件中的顺序相反(最新日期在前)。

Date,Open,High,Low,Close,Volume,Adj Close
11/3/2014,12,12.06,11.75,11.98,19700,11.98
11/4/2014,12,12,10.62,11.55,39200,11.55
11/5/2014,11.6,11.85,11.6,11.85,3100,11.85
11/6/2014,11.85,11.85,11.85,11.85,0,11.85
11/7/2014,11.5,11.5,10.35,11,35900,11
11/10/2014,11.12,11.12,11.12,11.12,200,11.12
11/11/2014,11.5,11.5,11.5,11.5,200,11.5
11/12/2014,11.75,11.85,11.15,11.45,3500,11.45
11/13/2014,11.45,11.45,11.45,11.45,0,11.45
11/14/2014,11.45,11.45,11.45,11.45,0,11.45
11/17/2014,11.07,11.28,11.07,11.28,1600,11.28
11/18/2014,11.07,11.74,11.06,11.74,8100,11.74
11/19/2014,11.1,11.5,11,11.5,11600,11.5
11/20/2014,11.1,11.5,11.1,11.5,3100,11.5
11/21/2014,11.49,11.5,11.23,11.25,15100,11.25
11/24/2014,11.25,11.35,11.25,11.25,900,11.25
11/25/2014,11.48,11.5,11.25,11.5,355300,11.5
11/26/2014,11.75,11.75,11.5,11.5,261300,11.5
11/28/2014,11.75,11.8,11.75,11.8,16300,11.8
12/1/2014,11.25,11.8,11.02,11.5,23800,11.5
12/2/2014,11.6,11.6,11.47,11.5,57600,11.5
12/3/2014,11.57,11.75,11.41,11.69,240700,11.69
12/4/2014,11.74,11.75,11.49,11.65,41100,11.65
12/5/2014,11.65,11.85,11.56,11.8,267200,11.8
12/8/2014,11.8,11.85,11.68,11.8,168700,11.8

最佳答案

首先,尝试将 Intel Threading Building Blocks 作为内存管理器。它可以很好地扩展到至少 16 个内核(我在 Why multithreaded memory allocate/deallocate intensive application does not scale with number of threads? 中遇到了类似的问题

一般来说,即使在主线程执行循环中使用英特尔 TBB,也要避免动态分配/取消分配内存。这些操作的规模总是很糟糕。

  • 在循环开始前分配足够的内存(固定)用于处理,并在循环结束后释放。
  • 内部读取/处理循环仅将操作限制为与指针相关。只在必要时复制内存。

输入数据可以分布在不同的硬盘上,如果它们连接到不同的 Controller 上就更好了。如果您事先知道大小,即在线程循环中处理之前,可以对输入数据进行内存映射。

尽可能优化单线程处理(分析),然后识别不随线程数(线程数作为参数)缩放的部分。这些部分必须重写。 I/O 读取操作可以缓存和/或读取 n * 簇 block 中的数据。

这些相当笼统的建议是基于我在 Windows 7(高达 12 ht 内核和高达 128 GB RAM)上以线程并行处理 TB 大小的输入数据时收集的经验。

关于multithreading - 多色,多线程加速: reading through CSVs using TMemoryStream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27431110/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com