使用 pig 从 ES 加载数据并在 HDFS 中存储为 avro
Load data from ES and store as avro in HDFS using pig
我在 ElasticSearch 上有一些数据需要发送到 HDFS。我正在尝试使用 pig(这是我第一次使用它),但是我在为我的数据定义正确的模式时遇到了一些问题。
首先,我尝试使用选项 'es.output.json=true'
和 org.elasticsearch.hadoop.pig.EsStorage
加载 JSON,我可以正确地 load/dump 数据,并将它们保存为JSON 使用 STORE A INTO 'hdfs://path/to/store';
到 HDFS。后来在HIVE上定义了一个外部的table,我就可以查询这个数据了。这是运行良好的完整示例(我从代码中删除了所有 SSL 属性):
REGISTER /path/to/commons-httpclient-3.1.jar;
REGISTER /path/to/elasticsearch-hadoop-5.3.0.jar;
A = LOAD 'my-index/log' USING org.elasticsearch.hadoop.pig.EsStorage(
'es.nodes=https://addr1:port,https://addr2:port2,https://addr3:port3',
'es.query=?q=*',
'es.output.json=true');
STORE A INTO 'hdfs://path/to/store';
如何将我的数据作为 AVRO 存储到 HDFS?我想我需要使用 AvroStorage
,但我还应该定义一个加载数据的模式,或者 JSON 就足够了吗?我尝试使用 LOAD...USING...AS
命令定义模式并设置 es.mapping.date.rich=false
而不是 es.output.json=true
(我的数据非常复杂,有地图的地图和类似的东西),但它不起作用.我不确定问题是出在语法上,还是出在方法本身上。如果能提示要遵循的正确方向,那就太好了。
更新
这是我尝试使用 es.mapping.date.rich=false
的示例。我的问题是,如果一个字段为空,所有字段的顺序都会错误。
A = LOAD 'my-index/log' USING org.elasticsearch.hadoop.pig.EsStorage(
'es.nodes=https://addr1:port,https://addr2:port2,https://addr3:port3',
'es.query=?q=*',
'es.mapping.date.rich=false')
AS(
field1:chararray,
field2:chararray,
field3:map[chararray,fieldMap:map[],chararray],
field4:chararray,
field5:map[]
);
B = FOREACH A GENERATE field1, field2;
STORE B INTO 'hdfs://path/to/store' USING AvroStorage('
{
"type" : "foo1",
"name" : "foo2",
"namespace" : "foo3",
"fields" : [ {
"name" : "field1",
"type" : ["null","string"],
"default" : null
}, {
"name" : "field2",
"type" : ["null","string"],
"default" : null
} ]
}
');
对于未来的读者,我决定使用 spark
,因为它比 pig
快得多。要在 hdfs 上保存 avro
个文件,我使用 databrick
库。
我在 ElasticSearch 上有一些数据需要发送到 HDFS。我正在尝试使用 pig(这是我第一次使用它),但是我在为我的数据定义正确的模式时遇到了一些问题。
首先,我尝试使用选项 'es.output.json=true'
和 org.elasticsearch.hadoop.pig.EsStorage
加载 JSON,我可以正确地 load/dump 数据,并将它们保存为JSON 使用 STORE A INTO 'hdfs://path/to/store';
到 HDFS。后来在HIVE上定义了一个外部的table,我就可以查询这个数据了。这是运行良好的完整示例(我从代码中删除了所有 SSL 属性):
REGISTER /path/to/commons-httpclient-3.1.jar;
REGISTER /path/to/elasticsearch-hadoop-5.3.0.jar;
A = LOAD 'my-index/log' USING org.elasticsearch.hadoop.pig.EsStorage(
'es.nodes=https://addr1:port,https://addr2:port2,https://addr3:port3',
'es.query=?q=*',
'es.output.json=true');
STORE A INTO 'hdfs://path/to/store';
如何将我的数据作为 AVRO 存储到 HDFS?我想我需要使用 AvroStorage
,但我还应该定义一个加载数据的模式,或者 JSON 就足够了吗?我尝试使用 LOAD...USING...AS
命令定义模式并设置 es.mapping.date.rich=false
而不是 es.output.json=true
(我的数据非常复杂,有地图的地图和类似的东西),但它不起作用.我不确定问题是出在语法上,还是出在方法本身上。如果能提示要遵循的正确方向,那就太好了。
更新
这是我尝试使用 es.mapping.date.rich=false
的示例。我的问题是,如果一个字段为空,所有字段的顺序都会错误。
A = LOAD 'my-index/log' USING org.elasticsearch.hadoop.pig.EsStorage(
'es.nodes=https://addr1:port,https://addr2:port2,https://addr3:port3',
'es.query=?q=*',
'es.mapping.date.rich=false')
AS(
field1:chararray,
field2:chararray,
field3:map[chararray,fieldMap:map[],chararray],
field4:chararray,
field5:map[]
);
B = FOREACH A GENERATE field1, field2;
STORE B INTO 'hdfs://path/to/store' USING AvroStorage('
{
"type" : "foo1",
"name" : "foo2",
"namespace" : "foo3",
"fields" : [ {
"name" : "field1",
"type" : ["null","string"],
"default" : null
}, {
"name" : "field2",
"type" : ["null","string"],
"default" : null
} ]
}
');
对于未来的读者,我决定使用 spark
,因为它比 pig
快得多。要在 hdfs 上保存 avro
个文件,我使用 databrick
库。