使用 Rfc4180CsvParser 将 CSV 导入 Vertica 并排除 header 行
Import CSV into Vertica using Rfc4180CsvParser and exclude header row
有没有办法在通过 Rfc4180CsvParser 导入数据时排除 header 行? COPY
命令有一个 SKIP
选项,但在使用 Vertica SDK 中提供的 CSV 解析器时该选项似乎不起作用。
背景
作为背景,COPY
命令本身不会读取 CSV 文件。对于简单的 CSV 文件,可以说 COPY schema.table FROM '/data/myfile.csv' DELIMITER ',' ENCLOSED BY '"';
但对于具有带嵌入引号的字符串值的数据文件,这将失败。
添加 ESCAPE AS '"'
将产生错误 ERROR 3169: ENCLOSED BY and ESCAPE AS can not be the same value
。这是一个问题,因为 CSV 值被 "
.
包围和转义
救援的 Vertica SDK CsvParser 扩展
Vertica 在 /opt/vertica/sdk/examples
下提供了一个 SDK,其中包含可以编译成扩展的 C++ 程序。其中之一是 /opt/vertica/sdk/examples/ParserFunctions/Rfc4180CsvParser.cpp
。
效果很好,如下所示:
cd /opt/vertica/sdk/examples
make clean
vsql
==> CREATE LIBRARY Rfc4180CsvParserLib AS '/opt/vertica/sdk/examples/build/Rfc4180CsvParser.so';
==> COPY myschema.mytable FROM '/data/myfile.csv' WITH PARSER Rfc4180CsvParser();
问题
上面的工作很好除了它将数据文件的第一行作为一行导入。 COPY
命令有一个 SKIP 1
选项,但这不适用于解析器。
问题
是否可以编辑 Rfc4180CsvParser.cpp
以跳过第一行,或者更好的是,采用一些参数来指定要跳过的行数?
程序只有 135 行,但我没有看到 where/how 做这个切口。提示?
复制下面的整个程序,因为我没有看到 public 到 link 的回购到...
Rfc4180CsvParser.cpp
/* Copyright (c) 2005 - 2012 Vertica, an HP company -*- C++ -*- */
#include "Vertica.h"
#include "StringParsers.h"
#include "csv.h"
using namespace Vertica;
// Note, the class template is mostly for demonstration purposes,
// so that the same class can use each of two string-parsers.
// Custom parsers can also just pick a string-parser to use.
/**
* A parser that parses something approximating the "official" CSV format
* as defined in IETF RFC-4180: <http://tools.ietf.org/html/rfc4180>
* Oddly enough, many "CSV" files don't actually conform to this standard
* for one reason or another. But for sources that do, this parser should
* be able to handle the data.
* Note that the CSV format does not specify how to handle different
* data types; it is entirely a string-based format.
* So we just use standard parsers based on the corresponding column type.
*/
template <class StringParsersImpl>
class LibCSVParser : public UDParser {
public:
LibCSVParser() : colNum(0) {}
// Keep a copy of the information about each column.
// Note that Vertica doesn't let us safely keep a reference to
// the internal copy of this data structure that it shows us.
// But keeping a copy is fine.
SizedColumnTypes colInfo;
// An instance of the class containing the methods that we're
// using to parse strings to the various relevant data types
StringParsersImpl sp;
/// Current column index
size_t colNum;
/// Parsing state for libcsv
struct csv_parser parser;
// Format strings
std::vector<std::string> formatStrings;
/**
* Given a field in string form (a pointer to the first character and
* a length), submit that field to Vertica.
* `colNum` is the column number from the input file; how many fields
* it is into the current record.
*/
bool handleField(size_t colNum, char* start, size_t len) {
if (colNum >= colInfo.getColumnCount()) {
// Ignore column overflow
return false;
}
// Empty colums are null.
if (len==0) {
writer->setNull(colNum);
return true;
} else {
return parseStringToType(start, len, colNum, colInfo.getColumnType(c
olNum), writer, sp);
}
}
static void handle_record(void *data, size_t len, void *p) {
static_cast<LibCSVParser*>(p)->handleField(static_cast<LibCSVParser*>(p)
->colNum++, (char*)data, len);
}
static void handle_end_of_row(int c, void *p) {
// Ignore 'c' (the terminating character); trust that it's correct
static_cast<LibCSVParser*>(p)->colNum = 0;
static_cast<LibCSVParser*>(p)->writer->next();
}
virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input
, InputState input_state) {
size_t processed;
while ((processed = csv_parse(&parser, input.buf + input.offset, input.s
ize - input.offset,
handle_record, handle_end_of_row, this)) > 0) {
input.offset += processed;
}
if (input_state == END_OF_FILE && input.size == input.offset) {
csv_fini(&parser, handle_record, handle_end_of_row, this);
return DONE;
}
return INPUT_NEEDED;
}
virtual void setup(ServerInterface &srvInterface, SizedColumnTypes &returnTy
pe);
virtual void destroy(ServerInterface &srvInterface, SizedColumnTypes &return
Type) {
csv_free(&parser);
}
};
template <class StringParsersImpl>
void LibCSVParser<StringParsersImpl>::setup(ServerInterface &srvInterface, Sized
ColumnTypes &returnType) {
csv_init(&parser, CSV_APPEND_NULL);
colInfo = returnType;
}
template <>
void LibCSVParser<FormattedStringParsers>::setup(ServerInterface &srvInterface,
SizedColumnTypes &returnType) {
csv_init(&parser, CSV_APPEND_NULL);
colInfo = returnType;
if (formatStrings.size() != returnType.getColumnCount()) {
formatStrings.resize(returnType.getColumnCount(), "");
}
sp.setFormats(formatStrings);
}
template <class StringParsersImpl>
class LibCSVParserFactoryTmpl : public ParserFactory {
public:
virtual void plan(ServerInterface &srvInterface,
PerColumnParamReader &perColumnParamReader,
PlanContext &planCtxt) {}
virtual UDParser* prepare(ServerInterface &srvInterface,
PerColumnParamReader &perColumnParamReader,
PlanContext &planCtxt,
const SizedColumnTypes &returnType)
{
return vt_createFuncObj(srvInterface.allocator,
LibCSVParser<StringParsersImpl>);
}
};
typedef LibCSVParserFactoryTmpl<StringParsers> LibCSVParserFactory;
RegisterFactory(LibCSVParserFactory);
typedef LibCSVParserFactoryTmpl<FormattedStringParsers> FormattedLibCSVParserFac
tory;
RegisterFactory(FormattedLibCSVParserFactory);
快速而肮脏的方法是对其进行硬编码。它正在使用 handle_end_of_row
的回调。跟踪行号,只是不处理第一行。类似于:
static void handle_end_of_row(int c, void *ptr) {
// Ignore 'c' (the terminating character); trust that it's correct
LibCSVParser *p = static_cast<LibCSVParser*>(ptr);
p->colNum = 0;
if (rowcnt <= 0) {
p->bad_field = "";
rowcnt++;
} else if (p->bad_field.empty()) {
p->writer->next();
} else {
// libcsv doesn't give us the whole row to reject.
// So just write to the log.
// TODO: Come up with something more clever.
if (p->currSrvInterface) {
p->currSrvInterface->log("Invalid CSV field value: '%s' Row skipped.",
p->bad_field.c_str());
}
p->bad_field = "";
}
}
此外,最好在 process
中初始化 rownum = 0
,因为我认为它会为您的 COPY
语句中的每个文件调用它。可能有更聪明的方法来做到这一点。基本上,这只会处理记录然后丢弃它。
至于一般支持 SKIP
...查看 TraditionalCSVParser
如何处理参数传递。您必须将它添加到解析器因子 prepare
并将值发送到 LibCSVParser
class 并覆盖 getParameterType
。然后在LibCSVParser
中需要在构造函数中接受参数,修改process
跳过前skip
行。然后使用该值代替上面的硬编码 0
。
有没有办法在通过 Rfc4180CsvParser 导入数据时排除 header 行? COPY
命令有一个 SKIP
选项,但在使用 Vertica SDK 中提供的 CSV 解析器时该选项似乎不起作用。
背景
作为背景,COPY
命令本身不会读取 CSV 文件。对于简单的 CSV 文件,可以说 COPY schema.table FROM '/data/myfile.csv' DELIMITER ',' ENCLOSED BY '"';
但对于具有带嵌入引号的字符串值的数据文件,这将失败。
添加 ESCAPE AS '"'
将产生错误 ERROR 3169: ENCLOSED BY and ESCAPE AS can not be the same value
。这是一个问题,因为 CSV 值被 "
.
救援的 Vertica SDK CsvParser 扩展
Vertica 在 /opt/vertica/sdk/examples
下提供了一个 SDK,其中包含可以编译成扩展的 C++ 程序。其中之一是 /opt/vertica/sdk/examples/ParserFunctions/Rfc4180CsvParser.cpp
。
效果很好,如下所示:
cd /opt/vertica/sdk/examples
make clean
vsql
==> CREATE LIBRARY Rfc4180CsvParserLib AS '/opt/vertica/sdk/examples/build/Rfc4180CsvParser.so';
==> COPY myschema.mytable FROM '/data/myfile.csv' WITH PARSER Rfc4180CsvParser();
问题
上面的工作很好除了它将数据文件的第一行作为一行导入。 COPY
命令有一个 SKIP 1
选项,但这不适用于解析器。
问题
是否可以编辑 Rfc4180CsvParser.cpp
以跳过第一行,或者更好的是,采用一些参数来指定要跳过的行数?
程序只有 135 行,但我没有看到 where/how 做这个切口。提示?
复制下面的整个程序,因为我没有看到 public 到 link 的回购到...
Rfc4180CsvParser.cpp
/* Copyright (c) 2005 - 2012 Vertica, an HP company -*- C++ -*- */
#include "Vertica.h"
#include "StringParsers.h"
#include "csv.h"
using namespace Vertica;
// Note, the class template is mostly for demonstration purposes,
// so that the same class can use each of two string-parsers.
// Custom parsers can also just pick a string-parser to use.
/**
* A parser that parses something approximating the "official" CSV format
* as defined in IETF RFC-4180: <http://tools.ietf.org/html/rfc4180>
* Oddly enough, many "CSV" files don't actually conform to this standard
* for one reason or another. But for sources that do, this parser should
* be able to handle the data.
* Note that the CSV format does not specify how to handle different
* data types; it is entirely a string-based format.
* So we just use standard parsers based on the corresponding column type.
*/
template <class StringParsersImpl>
class LibCSVParser : public UDParser {
public:
LibCSVParser() : colNum(0) {}
// Keep a copy of the information about each column.
// Note that Vertica doesn't let us safely keep a reference to
// the internal copy of this data structure that it shows us.
// But keeping a copy is fine.
SizedColumnTypes colInfo;
// An instance of the class containing the methods that we're
// using to parse strings to the various relevant data types
StringParsersImpl sp;
/// Current column index
size_t colNum;
/// Parsing state for libcsv
struct csv_parser parser;
// Format strings
std::vector<std::string> formatStrings;
/**
* Given a field in string form (a pointer to the first character and
* a length), submit that field to Vertica.
* `colNum` is the column number from the input file; how many fields
* it is into the current record.
*/
bool handleField(size_t colNum, char* start, size_t len) {
if (colNum >= colInfo.getColumnCount()) {
// Ignore column overflow
return false;
}
// Empty colums are null.
if (len==0) {
writer->setNull(colNum);
return true;
} else {
return parseStringToType(start, len, colNum, colInfo.getColumnType(c
olNum), writer, sp);
}
}
static void handle_record(void *data, size_t len, void *p) {
static_cast<LibCSVParser*>(p)->handleField(static_cast<LibCSVParser*>(p)
->colNum++, (char*)data, len);
}
static void handle_end_of_row(int c, void *p) {
// Ignore 'c' (the terminating character); trust that it's correct
static_cast<LibCSVParser*>(p)->colNum = 0;
static_cast<LibCSVParser*>(p)->writer->next();
}
virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input
, InputState input_state) {
size_t processed;
while ((processed = csv_parse(&parser, input.buf + input.offset, input.s
ize - input.offset,
handle_record, handle_end_of_row, this)) > 0) {
input.offset += processed;
}
if (input_state == END_OF_FILE && input.size == input.offset) {
csv_fini(&parser, handle_record, handle_end_of_row, this);
return DONE;
}
return INPUT_NEEDED;
}
virtual void setup(ServerInterface &srvInterface, SizedColumnTypes &returnTy
pe);
virtual void destroy(ServerInterface &srvInterface, SizedColumnTypes &return
Type) {
csv_free(&parser);
}
};
template <class StringParsersImpl>
void LibCSVParser<StringParsersImpl>::setup(ServerInterface &srvInterface, Sized
ColumnTypes &returnType) {
csv_init(&parser, CSV_APPEND_NULL);
colInfo = returnType;
}
template <>
void LibCSVParser<FormattedStringParsers>::setup(ServerInterface &srvInterface,
SizedColumnTypes &returnType) {
csv_init(&parser, CSV_APPEND_NULL);
colInfo = returnType;
if (formatStrings.size() != returnType.getColumnCount()) {
formatStrings.resize(returnType.getColumnCount(), "");
}
sp.setFormats(formatStrings);
}
template <class StringParsersImpl>
class LibCSVParserFactoryTmpl : public ParserFactory {
public:
virtual void plan(ServerInterface &srvInterface,
PerColumnParamReader &perColumnParamReader,
PlanContext &planCtxt) {}
virtual UDParser* prepare(ServerInterface &srvInterface,
PerColumnParamReader &perColumnParamReader,
PlanContext &planCtxt,
const SizedColumnTypes &returnType)
{
return vt_createFuncObj(srvInterface.allocator,
LibCSVParser<StringParsersImpl>);
}
};
typedef LibCSVParserFactoryTmpl<StringParsers> LibCSVParserFactory;
RegisterFactory(LibCSVParserFactory);
typedef LibCSVParserFactoryTmpl<FormattedStringParsers> FormattedLibCSVParserFac
tory;
RegisterFactory(FormattedLibCSVParserFactory);
快速而肮脏的方法是对其进行硬编码。它正在使用 handle_end_of_row
的回调。跟踪行号,只是不处理第一行。类似于:
static void handle_end_of_row(int c, void *ptr) {
// Ignore 'c' (the terminating character); trust that it's correct
LibCSVParser *p = static_cast<LibCSVParser*>(ptr);
p->colNum = 0;
if (rowcnt <= 0) {
p->bad_field = "";
rowcnt++;
} else if (p->bad_field.empty()) {
p->writer->next();
} else {
// libcsv doesn't give us the whole row to reject.
// So just write to the log.
// TODO: Come up with something more clever.
if (p->currSrvInterface) {
p->currSrvInterface->log("Invalid CSV field value: '%s' Row skipped.",
p->bad_field.c_str());
}
p->bad_field = "";
}
}
此外,最好在 process
中初始化 rownum = 0
,因为我认为它会为您的 COPY
语句中的每个文件调用它。可能有更聪明的方法来做到这一点。基本上,这只会处理记录然后丢弃它。
至于一般支持 SKIP
...查看 TraditionalCSVParser
如何处理参数传递。您必须将它添加到解析器因子 prepare
并将值发送到 LibCSVParser
class 并覆盖 getParameterType
。然后在LibCSVParser
中需要在构造函数中接受参数,修改process
跳过前skip
行。然后使用该值代替上面的硬编码 0
。