- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在编写并行代码来枚举大量 CSV 文件,每个文件都包含历史股票数据(超过 6500 个代码),并计算每只股票是否已达到历史最高点。
我已经实现了一个线程池和 TThread 后代类,以在线程之间平均拆分符号列表,然后将这些线程分配到我的 i7 机器的单独核心。我将线程设置为每个线程在取消挂起之前都拥有它们在创建时所需的所有数据的副本,因此在线程处理时不需要锁定或继续。所有线程完成后,我将每个线程的结果数据汇总到主程序中。
我目前已经使用提到的几个多线程内存管理器测试了我的代码 https://stackoverflow.com/questions/6072269/need-multi-threading-memory-manager/6076407#6076407 .到目前为止,SapMM 似乎是最有效的一种,不会导致访问冲突。
问题在于,添加更多线程并没有相应地加快完成所有高点计算所需的时间。使用 2 个“core-d”线程不会将运行时间减少 1/2,但 3 个不会完全减少到 1/3,而 4 个不会减少接近 1/4。
线程数1、2、3、4
预计加速时间(毫米:秒)6:37,3:17 1/2,2:12 1/3,1:39 1/4
实际时间 (mm:ss)6:37,4:07,3:05,2:51
我已经到了需要一些额外的洞察力才能完全加速此操作的地步。我需要弄清楚为什么多核加速会下降,而不仅仅是“在问题的边缘涉猎”。那么,是什么导致此代码停止获得相应的 yield ,我需要做什么才能获得这些 yield ?简而言之,是否有其他方法可以加快我正在做的解析,例如而不是使用 TMemoryStream?
我正在使用的代码如下。
我正在使用 Delphi XE4 Enterprise。
在每个线程中,我循环遍历每个符号并:
(1) 已经过测试并且不需要任何时间(将所有 6500 一次加载到内存中所用的总时间不到 1 秒)。我在 (2) 中使用的过程是一直花费的时间,如下所示:
unit uTest;
implementation
uses
SysUtils, Math, Classes;
type
TDayIndexData = record
Date: TDate;
Open, High, Low, Close, AdjClose,
Volume: extended;
end;
type
TTimeUnit = (tuDay, tuWeek, tuMonth, tuYear);
TTimePeriod = record
Length: integer;
TimeUnit: TTimeUnit;
end;
//#NO CHANGE
const
AllDataPeriodStr = 'All Data';
type
TRatePeriod = record
PeriodStr: string;
TimePeriod: TTimePeriod;
end;
type
TFieldType = (ftDate, ftOpen, ftHigh, ftLow, ftClose, ftVolume, ftAdjClose);
const CSV_DELIM_CHARSET = [#0..#31, ',',#127];
type
TShallowEquityNewHighInfoRetrievalResults = record
Success: boolean;
High: extended;
end;
function ShallowEquityNewHighInfoRetrieval(
AStream: TStream;
ARatePeriod: TRatePeriod;
AGetNormalData: boolean = False): TShallowEquityNewHighInfoRetrievalResults;
var
vStreamSize: int64;
function EOF: boolean;
begin
Result := AStream.Position >= vStreamSize;//AStream.Size;
end;
procedure GotoEOF;
begin
AStream.Seek(0, soFromEnd);
end;
//#OPTIMIZE
//var
//vBuffer: FileString;
type
FileChar = AnsiChar;
FileString = AnsiString;
const
ResultCharSize = SizeOf(FileChar);
var
MRReadChar: FileChar;
procedure ReadNextChar;
begin
if not EOF then
AStream.Read(MRReadChar, SizeOf(MRReadChar)) else
raise EInvalidOperation.Create('Unexpected end of file found');
end;
var
vPossDelimChars: boolean;
procedure SkipExistingDelimChars;
begin
//*INTENTION: prevents redundant SkipDelimChars calls, which is destructive
if not vPossDelimChars then Exit;
//not requiring DelimChars
if EOF then Exit;
repeat
ReadNextChar;
until EOF or not (MRReadChar in CSV_DELIM_CHARSET);
//#*NOTE: technically can be true if EOF,
//but if EOF then CurChar is never used 3/13/2014
vPossDelimChars := False;
end;
function SOF: boolean;
begin
Result := AStream.Position = 0;
end;
function NextChars(ACount: integer): FileString;
begin
//#OPTIMIZE: condition
if ResultCharSize = 1 then
begin
SetLength(Result, Min(ACount, vStreamSize{AStream.Size} - AStream.Position));
AStream.Read(Pointer(Result)^, Length(Result));
AStream.Seek(-Length(Result), soFromCurrent);
end else
begin
SetLength(Result, Min(ACount, (vStreamSize{AStream.Size} - AStream.Position) div ResultCharSize));
AStream.Read(Pointer(Result)^, Length(Result) * ResultCharSize);
AStream.Seek(-Length(Result) * ResultCharSize, soFromCurrent);
end;
end;
procedure GotoNextChars(ACount: integer);
begin
//#OPTIMIZE: condition
if ResultCharSize = 1 then
AStream.Seek(ACount, soFromCurrent) else
AStream.Seek(ACount*SizeOf(FileChar), soFromCurrent);
end;
procedure GotoPrevChars(ACount: integer);
begin
//#OPTIMIZE: condition
if ResultCharSize = 1 then
AStream.Seek(-ACount, soFromCurrent) else
AStream.Seek(-ACount*SizeOf(FileChar), soFromCurrent);
end;
procedure GotoPreceedingEOLN(ForItem: boolean = False);
var
vOrigPos: integer;
const
NMinRowChars = 17;//Length('3-13-13,1,1,1,1,1')
begin
//assumes will not hit SOF
//assumes ending CRLF taken care of by other places
vOrigPos := AStream.Position;
vPossDelimChars := True;
while (NextChars(2) <> #13#10) or (AStream.Position = vOrigPos) do
if (Length(NextChars(2)) = 2) and (NextChars(2)[2] = #10) and
(AStream.Position < vOrigPos - SizeOf(FileChar)) then
begin
GotoNextChars(1);
Exit;
end else
if (AStream.Position = vOrigPos) and ForItem then
GotoPrevChars(NMinRowChars) else
GotoPrevChars(1);
end;
var
CurField: string;
CurCol: integer;
procedure InitParsingState;
begin
//Initialize Parsing State
CurCol := -1;
vPossDelimChars := True;
SkipExistingDelimChars;
vStreamSize := AStream.Size;
end;
procedure BacktrackTo(APos: integer; ASafeMode: boolean = False);
begin
if ASafeMode then
AStream.Seek(Pred(APos), soFromBeginning) else
AStream.Seek(APos, soFromBeginning);
ReadNextChar;
vPossDelimChars := False;
CurCol := Ord(High(TFieldType));
end;
procedure ReadQuotedText;
var
vHadPrevQuoteChar: boolean;
begin
vHadPrevQuoteChar := False;
while MRReadChar = '"' do
begin
if vHadPrevQuoteChar then
CurField := CurField + MRReadChar;
ReadNextChar;
while MRReadChar <> '"' do
begin
CurField := CurField + MRReadChar;
ReadNextChar;
end;
if EOF then
break;
ReadNextChar;
vHadPrevQuoteChar := True;
end;
end;
procedure GetNextFieldValue;
begin
if EOF then Exit;
CurCol := (CurCol+1) mod Succ(Ord(High(TFieldType)));
CurField := '';
if MRReadChar = '"' then
ReadQuotedText else
begin
repeat
CurField := CurField + MRReadChar;
if not EOF then
ReadNextChar;
until EOF or (MRReadChar in CSV_DELIM_CHARSET);
if EOF then
if not (MRReadChar in CSV_DELIM_CHARSET) then
CurField := CurField + MRReadChar;
end;
vPossDelimChars := True;
SkipExistingDelimChars;
end;
var
ColFieldTypes: array [Ord(Low(TFieldType))..Ord(High(TFieldType))] of TFieldType;
procedure ResolveCurColFieldType;
var
vField: string;
begin
vField := LowerCase(CurField);
if vField = 'date' then
ColFieldTypes[CurCol] := ftDate else
if vField = 'open' then
ColFieldTypes[CurCol] := ftOpen else
if vField = 'high' then
ColFieldTypes[CurCol] := ftHigh else
if vField = 'low' then
ColFieldTypes[CurCol] := ftLow else
if vField = 'close' then
ColFieldTypes[CurCol] := ftClose else
if vField = 'volume' then
ColFieldTypes[CurCol] := ftVolume else
if Pos('close', vField) > 0 then
ColFieldTypes[CurCol] := ftAdjClose else
raise EInvalidOperation.Create('Unrecognized file format: unrecognized column name found.');
end;
procedure WriteItemAsFieldValue(var AData: TDayIndexData);
begin
case ColFieldTypes[CurCol] of
ftDate:AData.Date := ExStrToDate(CurField);
ftOpen:AData.Open := StrToFloat(CurField);
ftHigh:AData.High := StrToFloat(CurField);
ftLow:AData.Low := StrToFloat(CurField);
ftClose:AData.Close := StrToFloat(CurField);
ftVolume:AData.Volume := StrToFloat(CurField);
ftAdjClose:AData.AdjClose := StrToFloat(CurField);
end;
end;
procedure VerifyFields;
var
iField: TFieldType;
iColumn: integer;
IsUsedFlags: array [Low(TFieldType)..High(TFieldType)] of boolean;
begin
//* Set all to false
for iField := Low(TFieldType) to High(TFieldType) do
IsUsedFlags[iField] := False;
//* set found to true
for iColumn := Low(ColFieldTypes) to High(ColFieldTypes) do
IsUsedFlags[ColFieldTypes[iColumn]] := True;
//* throw error on first one not found
for iField := Low(TFieldType) to High(TFieldType) do
if not IsUsedFlags[iField] then
begin
raise EInvalidOperation.Create('Bad file format: one or more column names are missing!');
break;
end;
end;
procedure LoadHeader;
var
iField: TFieldType;
begin
for iField := Low(TFieldType) to High(TFieldType) do
begin
GetNextFieldValue;
ResolveCurColFieldType;
end;
VerifyFields;
if EOF then
raise EInvalidOperation.Create('Cannot complete shallow Equity New High Info Retrieval: Not enough Data')
end;
procedure LoadRowInto(var ADayData: TDayIndexData);
var
iField: TFieldType;
begin
for iField := Low(TFieldType) to High(TFieldType) do
begin
GetNextFieldValue;
WriteItemAsFieldValue(ADayData);
end;
end;
var
OrderReversed: boolean;
vTopDay,
vBottomDay,
vFirstDay,
vEarlierDay,
vLastDay: TDayIndexData;
vBeginDate: TDate;
vBeforeLastDayPos,
vFirstDayPos,
vAfterFirstDayPos: integer;
function HasUnprocessedDays: boolean;
begin
//** use Position of stream because we don't always have the first day in the
// file, due to optimization
Result := (
((AStream.Position > vFirstDayPos) and not OrderReversed) or
(((AStream.Position < AStream.Size - SizeOf(FileChar)*Length(#13#10)) or
(AStream.Position < AStream.Size - SizeOf(FileChar)*Length(#10)))
and OrderReversed));
end;
function NotYetCoveredTimePeriod: boolean;
begin
Result :=
(ARatePeriod.PeriodStr = AllDataPeriodStr)
or
(
(ARatePeriod.PeriodStr <> AllDataPeriodStr) and
(vEarlierDay.Date >= vBeginDate)
);
end;
function FoundAllNeededData: boolean;
begin
Result := (
(ARatePeriod.PeriodStr <> AllDataPeriodStr) and
(vEarlierDay.Date <= vBeginDate)
) or
(ARatePeriod.PeriodStr = AllDataPeriodStr);
end;
procedure GotoLastDay;
begin
//** Goto End of File
GotoEOF;
//** Goto Just before Last Day
GotoPreceedingEOLN;
if (AStream.Position = AStream.Size - SizeOf(FileChar)*Length(#13#10)) or
(AStream.Position = AStream.Size - SizeOf(FileChar)*Length(#10)) then
GotoPreceedingEOLN;
SkipExistingDelimChars;
end;
procedure DetermineDataOrder;
begin
//#ASSUMPTION: assume end day at BOTTOM of file if latest data less than 2 days ago
//Problem when NDays = 2 ?
if Trunc(Now) - Trunc(vBottomDay.Date) >= 2 then
begin
//** Get Top Day
BacktrackTo(vFirstDayPos, True);
LoadRowInto(vTopDay);
//** Determine what order the data is in
OrderReversed := vBottomDay.Date < vTopDay.Date;
if not OrderReversed then
BacktrackTo(vBeforeLastDayPos, True);
if OrderReversed then
vFirstDay := vBottomDay else
vFirstDay := vTopDay;
if OrderReversed then
vLastDay := vTopDay else
vLastDay := vBottomDay;
end else
begin
OrderReversed := False;
//vLastDay := vTopDay;
vLastDay := vBottomDay;
end;
end;
procedure LoadPrevRow;
var
vBeforeDayPos: integer;
begin
GotoPreceedingEOLN(True);
vBeforeDayPos := AStream.Position;
SkipExistingDelimChars;
LoadRowInto(vEarlierDay);
AStream.Seek(vBeforeDayPos, soFromBeginning);
end;
begin
//* Initialize
Result.Success := False;
AStream.Seek(0, soFromBeginning);
InitParsingState;
//** Load CSV Header
LoadHeader;
vFirstDayPos := AStream.Position;
//** Get Last Day
GotoLastDay;
vBeforeLastDayPos := AStream.Position;
LoadRowInto(vBottomDay);
//** IF Only 1 Data Day:
if vFirstDayPos = vBeforeLastDayPos then
begin
//return results
Result.Success := True;
Result.High := vBottomDay.High;
Exit;
end;
//** Go back to Last Day in File
BacktrackTo(vBeforeLastDayPos);
//** Determine what order the data is in
DetermineDataOrder;
//** Determine Date to scan back to if opted for
if ARatePeriod.PeriodStr <> AllDataPeriodStr then
vBeginDate := MoveDateBack(vLastDay.Date, ARatePeriod.TimePeriod);
//* Initialize Loop Variables
Result.High := vLastDay.High;
vEarlierDay := vLastDay;
while HasUnProcessedDays and NotYetCoveredTimePeriod do
begin
//** Goto Previous Day's Row
if OrderReversed then
LoadRowInto(vEarlierDay) else
LoadPrevRow;
//** Update High
if NotYetCoveredTimePeriod then
Result.High := Max(Result.High, vEarlierDay.High);
end;
Result.Success := FoundAllNeededData;
end;
end.
下面是一个 CSV 示例。请注意,有时 CSV 行项目在文件中的顺序相反(最新日期在前)。
Date,Open,High,Low,Close,Volume,Adj Close
11/3/2014,12,12.06,11.75,11.98,19700,11.98
11/4/2014,12,12,10.62,11.55,39200,11.55
11/5/2014,11.6,11.85,11.6,11.85,3100,11.85
11/6/2014,11.85,11.85,11.85,11.85,0,11.85
11/7/2014,11.5,11.5,10.35,11,35900,11
11/10/2014,11.12,11.12,11.12,11.12,200,11.12
11/11/2014,11.5,11.5,11.5,11.5,200,11.5
11/12/2014,11.75,11.85,11.15,11.45,3500,11.45
11/13/2014,11.45,11.45,11.45,11.45,0,11.45
11/14/2014,11.45,11.45,11.45,11.45,0,11.45
11/17/2014,11.07,11.28,11.07,11.28,1600,11.28
11/18/2014,11.07,11.74,11.06,11.74,8100,11.74
11/19/2014,11.1,11.5,11,11.5,11600,11.5
11/20/2014,11.1,11.5,11.1,11.5,3100,11.5
11/21/2014,11.49,11.5,11.23,11.25,15100,11.25
11/24/2014,11.25,11.35,11.25,11.25,900,11.25
11/25/2014,11.48,11.5,11.25,11.5,355300,11.5
11/26/2014,11.75,11.75,11.5,11.5,261300,11.5
11/28/2014,11.75,11.8,11.75,11.8,16300,11.8
12/1/2014,11.25,11.8,11.02,11.5,23800,11.5
12/2/2014,11.6,11.6,11.47,11.5,57600,11.5
12/3/2014,11.57,11.75,11.41,11.69,240700,11.69
12/4/2014,11.74,11.75,11.49,11.65,41100,11.65
12/5/2014,11.65,11.85,11.56,11.8,267200,11.8
12/8/2014,11.8,11.85,11.68,11.8,168700,11.8
最佳答案
首先,尝试将 Intel Threading Building Blocks 作为内存管理器。它可以很好地扩展到至少 16 个内核(我在 Why multithreaded memory allocate/deallocate intensive application does not scale with number of threads? 中遇到了类似的问题
一般来说,即使在主线程执行循环中使用英特尔 TBB,也要避免动态分配/取消分配内存。这些操作的规模总是很糟糕。
输入数据可以分布在不同的硬盘上,如果它们连接到不同的 Controller 上就更好了。如果您事先知道大小,即在线程循环中处理之前,可以对输入数据进行内存映射。
尽可能优化单线程处理(分析),然后识别不随线程数(线程数作为参数)缩放的部分。这些部分必须重写。 I/O 读取操作可以缓存和/或读取 n * 簇 block 中的数据。
这些相当笼统的建议是基于我在 Windows 7(高达 12 ht 内核和高达 128 GB RAM)上以线程并行处理 TB 大小的输入数据时收集的经验。
关于multithreading - 多色,多线程加速: reading through CSVs using TMemoryStream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27431110/
我正在尝试搜索我的谷歌驱动器中的所有 csvs,并将它们存储在具有电子表格格式的特定文件夹中。 我已经通过名称对特定的 csv 进行了相同的尝试,但现在我想要所有的 csv,但我不知道该怎么做。 fu
在过去的一周里,我一直在绞尽脑汁思考这个问题,现在还想不出该怎么办。 我目前有一个应用程序可以接收调查数据,并将其保存为 surveydata-mm-dd-yyyy 格式的 csv 文件。这通常适用于
这个站点和一般编程的新手(背景是生物学家)。 无论如何,我有一个任务是获取文本文件名、计算唯一行数、计算总行数并将其输出到 csv 文件中。这是我在 Cygwin 中使用的代码 #!/bin/bash
我有一个大文件,导入到 Pandas 的单个数据框中。我正在使用 pandas 按数据框中的行数将文件分成许多段。 例如:10 行:文件 1 得到 [0:4]文件 2 得到 [5:9] 有没有一种方法
我有一大堆 csv 文件。我想创建一个允许我执行此操作的循环; df_20180731 = pd.read_csv('path/cust_20180731.csv') 对于大约 36 个文件中
关闭。这个问题需要更多focused .它目前不接受答案。 想改善这个问题吗?更新问题,使其仅关注一个问题 editing this post . 4 个月前关闭。 Improve this ques
我正在编写并行代码来枚举大量 CSV 文件,每个文件都包含历史股票数据(超过 6500 个代码),并计算每只股票是否已达到历史最高点。 我已经实现了一个线程池和 TThread 后代类,以在线程之间平
我正在尝试创建一个简单的 csv: dataframe.to_csv( psv_file_name, encoding='utf-8', header=True, sep="|
大家好。所以我有一组 csv 文件,我想将它们作为表插入到我的 Java 程序中的 sqlite 数据库中。我用谷歌搜索并搜索了 SO,但我找不到任何关于如何将这些 csv 文件插入 sqlite 数
我想优化下面的代码。它有效,但如果可以更简洁有效地完成,我想提出建议。 import os import glob import pandas as pd import numpy as np fil
这道题类似于简单的mysql操作- UPDATE hpaai_month_div t, fahafa_monthly s SET t.col1=s.col1 WHERE t.col2=s.col2 A
我是一名优秀的程序员,十分优秀!