gpt4 book ai didi

c++ - 使用 Rfc4180CsvParser 将 CSV 导入 Vertica 并排除标题行

转载 作者:搜寻专家 更新时间:2023-10-31 02:19:14 24 4
gpt4 key购买 nike

有没有办法在通过 Rfc4180CsvParser 导入数据时排除标题行? COPY 命令有一个 SKIP 选项,但在使用 Vertica SDK 中提供的 CSV 解析器时,该选项似乎不起作用。

背景

作为背景,COPY命令本身不会读取 CSV 文件。对于简单的 CSV 文件,可以说 COPY schema.table FROM '/data/myfile.csv' DELIMITER ',' ENCLOSED BY '"'; 但这对于具有字符串值的数据文件会失败嵌入引号。

添加 ESCAPE AS '"' 会产生错误 ERROR 3169: ENCLOSED BY and ESCAPE AS can not be the same value。这是一个问题,因为 CSV 值是由 " 封闭和转义。

救援的 Vertica SDK CsvParser 扩展

Vertica 在 /opt/vertica/sdk/examples 下提供了一个 SDK,其中包含可以编译成扩展的 C++ 程序。其中之一是 /opt/vertica/sdk/examples/ParserFunctions/Rfc4180CsvParser.cpp

效果很好,如下所示:

 cd /opt/vertica/sdk/examples
make clean
vsql
==> CREATE LIBRARY Rfc4180CsvParserLib AS '/opt/vertica/sdk/examples/build/Rfc4180CsvParser.so';
==> COPY myschema.mytable FROM '/data/myfile.csv' WITH PARSER Rfc4180CsvParser();

问题

上面的代码效果很好,除了它将数据文件的第一行作为一行导入。 COPY 命令有一个 SKIP 1 选项,但这不适用于解析器。

问题

是否可以编辑 Rfc4180CsvParser.cpp 以跳过第一行,或者更好的是,采用一些参数来指定要跳过的行数?

程序只有 135 行,但我不知道在哪里/如何做这个切口。提示?

复制下面的整个程序,因为我没有看到要链接到的公共(public)存储库...

Rfc4180CsvParser.cpp

/* Copyright (c) 2005 - 2012 Vertica, an HP company -*- C++ -*- */

#include "Vertica.h"
#include "StringParsers.h"
#include "csv.h"

using namespace Vertica;

// Note, the class template is mostly for demonstration purposes,
// so that the same class can use each of two string-parsers.
// Custom parsers can also just pick a string-parser to use.

/**
* A parser that parses something approximating the "official" CSV format
* as defined in IETF RFC-4180: <http://tools.ietf.org/html/rfc4180>
* Oddly enough, many "CSV" files don't actually conform to this standard
* for one reason or another. But for sources that do, this parser should
* be able to handle the data.
* Note that the CSV format does not specify how to handle different
* data types; it is entirely a string-based format.
* So we just use standard parsers based on the corresponding column type.
*/
template <class StringParsersImpl>
class LibCSVParser : public UDParser {
public:
LibCSVParser() : colNum(0) {}

// Keep a copy of the information about each column.
// Note that Vertica doesn't let us safely keep a reference to
// the internal copy of this data structure that it shows us.
// But keeping a copy is fine.
SizedColumnTypes colInfo;

// An instance of the class containing the methods that we're
// using to parse strings to the various relevant data types
StringParsersImpl sp;

/// Current column index
size_t colNum;

/// Parsing state for libcsv
struct csv_parser parser;

// Format strings
std::vector<std::string> formatStrings;

/**
* Given a field in string form (a pointer to the first character and
* a length), submit that field to Vertica.
* `colNum` is the column number from the input file; how many fields
* it is into the current record.
*/
bool handleField(size_t colNum, char* start, size_t len) {
if (colNum >= colInfo.getColumnCount()) {
// Ignore column overflow
return false;
}
// Empty colums are null.
if (len==0) {
writer->setNull(colNum);
return true;
} else {
return parseStringToType(start, len, colNum, colInfo.getColumnType(c
olNum), writer, sp);
}
}

static void handle_record(void *data, size_t len, void *p) {
static_cast<LibCSVParser*>(p)->handleField(static_cast<LibCSVParser*>(p)
->colNum++, (char*)data, len);
}

static void handle_end_of_row(int c, void *p) {
// Ignore 'c' (the terminating character); trust that it's correct
static_cast<LibCSVParser*>(p)->colNum = 0;
static_cast<LibCSVParser*>(p)->writer->next();
}

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input
, InputState input_state) {
size_t processed;
while ((processed = csv_parse(&parser, input.buf + input.offset, input.s
ize - input.offset,
handle_record, handle_end_of_row, this)) > 0) {
input.offset += processed;
}

if (input_state == END_OF_FILE && input.size == input.offset) {
csv_fini(&parser, handle_record, handle_end_of_row, this);
return DONE;
}

return INPUT_NEEDED;
}

virtual void setup(ServerInterface &srvInterface, SizedColumnTypes &returnTy
pe);
virtual void destroy(ServerInterface &srvInterface, SizedColumnTypes &return
Type) {
csv_free(&parser);
}
};

template <class StringParsersImpl>
void LibCSVParser<StringParsersImpl>::setup(ServerInterface &srvInterface, Sized
ColumnTypes &returnType) {
csv_init(&parser, CSV_APPEND_NULL);
colInfo = returnType;
}

template <>
void LibCSVParser<FormattedStringParsers>::setup(ServerInterface &srvInterface,
SizedColumnTypes &returnType) {
csv_init(&parser, CSV_APPEND_NULL);
colInfo = returnType;
if (formatStrings.size() != returnType.getColumnCount()) {
formatStrings.resize(returnType.getColumnCount(), "");
}
sp.setFormats(formatStrings);
}

template <class StringParsersImpl>
class LibCSVParserFactoryTmpl : public ParserFactory {
public:
virtual void plan(ServerInterface &srvInterface,
PerColumnParamReader &perColumnParamReader,
PlanContext &planCtxt) {}

virtual UDParser* prepare(ServerInterface &srvInterface,
PerColumnParamReader &perColumnParamReader,
PlanContext &planCtxt,
const SizedColumnTypes &returnType)
{
return vt_createFuncObj(srvInterface.allocator,
LibCSVParser<StringParsersImpl>);
}
};

typedef LibCSVParserFactoryTmpl<StringParsers> LibCSVParserFactory;
RegisterFactory(LibCSVParserFactory);

typedef LibCSVParserFactoryTmpl<FormattedStringParsers> FormattedLibCSVParserFac
tory;
RegisterFactory(FormattedLibCSVParserFactory);

最佳答案

快速而肮脏的方法就是硬编码。它使用回调到 handle_end_of_row。跟踪行号,只是不处理第一行。像这样的东西:

static void handle_end_of_row(int c, void *ptr) {
// Ignore 'c' (the terminating character); trust that it's correct
LibCSVParser *p = static_cast<LibCSVParser*>(ptr);
p->colNum = 0;

if (rowcnt <= 0) {
p->bad_field = "";
rowcnt++;
} else if (p->bad_field.empty()) {
p->writer->next();
} else {
// libcsv doesn't give us the whole row to reject.
// So just write to the log.
// TODO: Come up with something more clever.
if (p->currSrvInterface) {
p->currSrvInterface->log("Invalid CSV field value: '%s' Row skipped.",
p->bad_field.c_str());
}
p->bad_field = "";
}
}

此外,最好在 process 中初始化 rownum = 0,因为我认为它会为您的 COPY 语句中的每个文件调用它。可能有更聪明的方法来做到这一点。基本上,这只会处理记录然后丢弃它。

至于一般支持 SKIP...请查看 TraditionalCSVParser 以了解如何处理参数传递。您必须将它添加到解析器因子 prepare 并将值发送到 LibCSVParser 类并覆盖 getParameterType。然后在 LibCSVParser 中需要在构造函数中接受参数,并修改 process 以跳过第一个 skip 行。然后使用该值代替上面硬编码的 0

关于c++ - 使用 Rfc4180CsvParser 将 CSV 导入 Vertica 并排除标题行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33723698/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com