jq能否跨文件进行聚合
Can jq perform aggregation across files
我正在尝试确定一个 program/software,它将允许我有效地获取大量大型 CSV 文件(总计 40+ GB)并输出具有特定格式的 JSON 文件需要导入 Elasticsearch (ES)。
jq 可以像这样有效地获取数据吗:
file1:
id,age,gender,wave
1,49,M,1
2,72,F,0
file2:
id,time,event1
1,4/20/2095,V39
1,4/21/2095,T21
2,5/17/2094,V39
通过 id 聚合它(这样来自多个文件中 CSV 行的所有 JSON 文档都属于单个 id 条目),输出如下:
{"index":{"_index":"forum_mat","_type":"subject","_id":"1"}}
{"id":"1","file1":[{"filen":"file1","id":"1","age":"49","gender":"M","wave":"1"}],"file2":[{"filen":"file2","id":"1","time":"4/20/2095","event1":"V39"},{"filen":"file2","id":"1","time":"4/21/2095","event1":"T21"}]}
{"index":{"_index":"forum_mat","_type":"subject","_id":"2"}}
{"id":"2","file1":[{"filen":"file1","id":"2","age":"72","gender":"F","wave":"0"}],"file2":[{"filen":"file2","id":"2","time":"5/17/2094","event1":"V39"}]}
我在 Matlab 中写了一个脚本,但我担心它会很慢。我可能需要几个月的时间来处理所有 40+GB 的数据。我 informed 认为 Logstash(ES 的首选数据输入工具)不擅长这种类型的聚合。
我相信以下内容可以满足您的要求,但我不完全理解您的输入文件和您包含的输出之间的联系。希望这至少会让你走上正轨。
该程序假设您的所有数据都适合内存。它使用 JSON 对象作为快速查找的字典,因此应该非常高效。
此处采用的方法将 csv 到 json 的转换与聚合分开,因为前者可能有更好的方法。 (参见示例 the jq Cookbook entry on convert-a-csv-file-with-headers-to-json。)
第一个文件 (scsv2json.jq) 用于将简单的 CSV 文件转换为 JSON。第二个文件 (aggregate.jq) 进行聚合。有了这些:
$ (jq -R -s -f scsv2json.jq file1.csv ;\
jq -R -s -f scsv2json.jq file2.csv) |\
jq -s -c -f aggregate.jq
[{"id":"1",
"file1":{"age":"49","gender":"M","wave":"1"},
"file2":{"time":"4/21/2095","event1":"T21"}},
{"id":"2",
"file1":{"age":"72","gender":"F","wave":"0"},
"file2":{"time":"5/17/2094","event1":"V39"}}]
请注意,"id" 已从输出的内部对象中删除。
aggregate.jq:
# Input: an array of objects, each with an "id" field
# such that (tostring|.id) is an index.
# Output: a dictionary keyed by the id field.
def todictionary:
reduce .[] as $row ( {}; . + { ($row.id | tostring): $row } );
def aggregate:
.[0] as $file1
| .[1] as $file2
| ($file1 | todictionary) as $d1
| ($file2 | todictionary) as $d2
| ( [$file1[].id] + [$file2[].id] | unique ) as $keys
| reduce ($keys[] | tostring) as $k
( [];
. + [{"id": $k,
"file1": ($d1[$k] | del(.id)),
"file2": ($d2[$k] | del(.id)) }] );
aggregate
scsv2json.jq
def objectify(headers):
. as $in
| reduce range(0; headers|length) as $i
({}; .[headers[$i]] = ($in[$i]) );
def csv2table:
def trim: sub("^ +";"") | sub(" +$";"");
split("\n") | map( split(",") | map(trim) );
def csv2json:
csv2table
| .[0] as $headers
| reduce (.[1:][] | select(length > 0) ) as $row
( []; . + [ $row|objectify($headers) ]);
csv2json
以上假设使用的是支持正则表达式的 jq 版本。如果您的 jq 不支持正则表达式,只需省略修剪。
这是一种内存密集度较低的方法。它只需要 file1
保存在内存中:第二个文件一次处理一行。
调用方式如下:
$ jq -n -R --argfile file1 <(jq -R -s -f scsv2json.jq file1.csv)\
-f aggregate.jq file2.csv
其中scsv2json.jq如前post所示。这里不再重复,主要是因为(正如别处指出的那样)一些其他以相同方式将 CSV 转换为 JSON 的程序可能是合适的。
aggregate.jq:
def objectify(headers):
. as $in
| reduce range(0; headers|length) as $i
({}; .[headers[$i]] = ($in[$i]) );
def csv2table:
def trim: sub("^ +";"") | sub(" +$";"");
split("\n") | map( split(",") | map(trim) );
# Input: an array of objects, each with an "id" field
# such that (tostring|.id) is an index.
# Output: a dictionary keyed by the id field.
def todictionary:
reduce .[] as $row ( {}; . + { ($row.id | tostring): $row } );
# input: {"id": ID } + OBJECT2
# dict: {ID: OBJECT1, ...}
# output: {id: ID, "file1": OBJECT1, "file2": OBJECT2}
def aggregate(dict):
.id as $id
| (dict[$id] | del(.id)) as $o1
| {"id": $id,
"file1": $o1,
"file2": del(.id) };
# $file1 is the JSON version of file1.csv -- an array of objects
(input | csv2table[0]) as $headers
| inputs
| csv2table[0]
| objectify($headers)
| ($file1 | todictionary) as $d1
| aggregate($d1)
这是一种对 jq 内存要求非常小的方法。它假定您已经能够将所有 .csv 文件合并到一个 JSON 数组流(或文件)中,格式为:
[id, sourceFile, baggage]
其中 id 的值是按排序顺序排列的。流可能如下所示:
[1,"file1", {"a":1}]
[1,"file2", {"b":1}]
[1,"file3", {"c":1}]
[2,"file1", {"d":1}]
[2,"file2", {"e":1}]
[3,"file1", {"f":1}]
此初步步骤需要全局排序,因此您可能需要 select 仔细排序实用程序。
文件源数量不限;不需要每个数组都放在一行上;并且 id 值不必是整数——例如,它们可以是字符串。
让我们假设以上内容在名为 combined.json 的文件中,并且 aggregate.jq 具有如下所示的内容。然后调用:
$ jq -c -n -f aggregate.jq combined.json
会产生:
{"id":1,"file1":{"a":1},"file2":{"b":1},"file3":{"c":1}}
{"id":2,"file1":{"d":1},"file2":{"e":1}}
{"id":3,"file1":{"f":1}}
更正:aggregate.jq:
foreach (inputs,null) as $row
# At each iteration, if .emit then emit it
( {"emit": null, "current": null};
if $row == null
then {emit: .current, current: null} # signal EOF
else {id: $row[0], ($row[1]) : $row[2] } as $this
| if .current == null
then {emit: null, current: $this}
elif $row[0] == .current.id
then .emit = null | .current += $this
else {emit: .current, current: $this}
end
end;
if .emit then .emit else empty end
)
正如其中一条评论所建议的那样,我最终使用 SQL 以我需要的格式导出 JSON。另一个 帮了大忙。最后,我选择将给定的 SQL table 输出到它自己的 JSON 文件,而不是合并它们(文件大小变得难以管理)。这是执行此操作的代码结构,以便您为 Bulk API 和 JSON 数据行生成命令行:
create or replace function format_data_line(command text, data_str text)
returns setof text language plpgsql as $$
begin
return next command;
return next
replace(
regexp_replace(data_str,
'(\d\d\d\d-\d\d-\d\d)T', ' ', 'g'),
e' \n ', '');
end $$;
COPY (
with f_1 as(
SELECT id, json_agg(fileX.*) AS tag
FROM forum.file3
GROUP BY id
)
SELECT
format_data_line(
format('{"update":{"_index":"forum2","_type":"subject","_id":%s}}',a.id),
format('{"doc":{"id":%s,"fileX":%s}}',
a.id, a.tag))
FROM f_1 a
) TO '/path/to/json/fileX.json';
使用批量 API 导入较大的文件也被证明是有问题的(内存不足 Java 错误)因此需要一个脚本来仅将数据的子集发送到 Curl(对于Elasticsearch 中的索引)在给定时间。该脚本的基本结构是:
#!/bin/bash
FILE=
INC=100
numline=`wc -l $FILE | awk '{print }'`
rm -f output/$FILE.txt
for i in `seq 1 $INC $numline`; do
TIME=`date +%H:%M:%S`
echo "[$TIME] Processing lines from $i to $((i + INC -1))"
rm -f intermediates/interm_file_$i.json
sed -n $i,$((i +INC - 1))p $FILE >> intermediates/interm_file_$i.json
curl -s -XPOST localhost:9200/_bulk --data-binary @intermediates/interm_file_$i.json >> output/$FILE.txt
done
应在脚本文件目录下创建一个"intermediates" 目录。该脚本可以在命令行上保存为 "ESscript" 和 运行:
./ESscript fileX.json
我正在尝试确定一个 program/software,它将允许我有效地获取大量大型 CSV 文件(总计 40+ GB)并输出具有特定格式的 JSON 文件需要导入 Elasticsearch (ES)。
jq 可以像这样有效地获取数据吗:
file1:
id,age,gender,wave
1,49,M,1
2,72,F,0
file2:
id,time,event1
1,4/20/2095,V39
1,4/21/2095,T21
2,5/17/2094,V39
通过 id 聚合它(这样来自多个文件中 CSV 行的所有 JSON 文档都属于单个 id 条目),输出如下:
{"index":{"_index":"forum_mat","_type":"subject","_id":"1"}}
{"id":"1","file1":[{"filen":"file1","id":"1","age":"49","gender":"M","wave":"1"}],"file2":[{"filen":"file2","id":"1","time":"4/20/2095","event1":"V39"},{"filen":"file2","id":"1","time":"4/21/2095","event1":"T21"}]}
{"index":{"_index":"forum_mat","_type":"subject","_id":"2"}}
{"id":"2","file1":[{"filen":"file1","id":"2","age":"72","gender":"F","wave":"0"}],"file2":[{"filen":"file2","id":"2","time":"5/17/2094","event1":"V39"}]}
我在 Matlab 中写了一个脚本,但我担心它会很慢。我可能需要几个月的时间来处理所有 40+GB 的数据。我 informed 认为 Logstash(ES 的首选数据输入工具)不擅长这种类型的聚合。
我相信以下内容可以满足您的要求,但我不完全理解您的输入文件和您包含的输出之间的联系。希望这至少会让你走上正轨。
该程序假设您的所有数据都适合内存。它使用 JSON 对象作为快速查找的字典,因此应该非常高效。
此处采用的方法将 csv 到 json 的转换与聚合分开,因为前者可能有更好的方法。 (参见示例 the jq Cookbook entry on convert-a-csv-file-with-headers-to-json。)
第一个文件 (scsv2json.jq) 用于将简单的 CSV 文件转换为 JSON。第二个文件 (aggregate.jq) 进行聚合。有了这些:
$ (jq -R -s -f scsv2json.jq file1.csv ;\
jq -R -s -f scsv2json.jq file2.csv) |\
jq -s -c -f aggregate.jq
[{"id":"1",
"file1":{"age":"49","gender":"M","wave":"1"},
"file2":{"time":"4/21/2095","event1":"T21"}},
{"id":"2",
"file1":{"age":"72","gender":"F","wave":"0"},
"file2":{"time":"5/17/2094","event1":"V39"}}]
请注意,"id" 已从输出的内部对象中删除。
aggregate.jq:
# Input: an array of objects, each with an "id" field
# such that (tostring|.id) is an index.
# Output: a dictionary keyed by the id field.
def todictionary:
reduce .[] as $row ( {}; . + { ($row.id | tostring): $row } );
def aggregate:
.[0] as $file1
| .[1] as $file2
| ($file1 | todictionary) as $d1
| ($file2 | todictionary) as $d2
| ( [$file1[].id] + [$file2[].id] | unique ) as $keys
| reduce ($keys[] | tostring) as $k
( [];
. + [{"id": $k,
"file1": ($d1[$k] | del(.id)),
"file2": ($d2[$k] | del(.id)) }] );
aggregate
scsv2json.jq
def objectify(headers):
. as $in
| reduce range(0; headers|length) as $i
({}; .[headers[$i]] = ($in[$i]) );
def csv2table:
def trim: sub("^ +";"") | sub(" +$";"");
split("\n") | map( split(",") | map(trim) );
def csv2json:
csv2table
| .[0] as $headers
| reduce (.[1:][] | select(length > 0) ) as $row
( []; . + [ $row|objectify($headers) ]);
csv2json
以上假设使用的是支持正则表达式的 jq 版本。如果您的 jq 不支持正则表达式,只需省略修剪。
这是一种内存密集度较低的方法。它只需要 file1 保存在内存中:第二个文件一次处理一行。
调用方式如下:
$ jq -n -R --argfile file1 <(jq -R -s -f scsv2json.jq file1.csv)\
-f aggregate.jq file2.csv
其中scsv2json.jq如前post所示。这里不再重复,主要是因为(正如别处指出的那样)一些其他以相同方式将 CSV 转换为 JSON 的程序可能是合适的。
aggregate.jq:
def objectify(headers):
. as $in
| reduce range(0; headers|length) as $i
({}; .[headers[$i]] = ($in[$i]) );
def csv2table:
def trim: sub("^ +";"") | sub(" +$";"");
split("\n") | map( split(",") | map(trim) );
# Input: an array of objects, each with an "id" field
# such that (tostring|.id) is an index.
# Output: a dictionary keyed by the id field.
def todictionary:
reduce .[] as $row ( {}; . + { ($row.id | tostring): $row } );
# input: {"id": ID } + OBJECT2
# dict: {ID: OBJECT1, ...}
# output: {id: ID, "file1": OBJECT1, "file2": OBJECT2}
def aggregate(dict):
.id as $id
| (dict[$id] | del(.id)) as $o1
| {"id": $id,
"file1": $o1,
"file2": del(.id) };
# $file1 is the JSON version of file1.csv -- an array of objects
(input | csv2table[0]) as $headers
| inputs
| csv2table[0]
| objectify($headers)
| ($file1 | todictionary) as $d1
| aggregate($d1)
这是一种对 jq 内存要求非常小的方法。它假定您已经能够将所有 .csv 文件合并到一个 JSON 数组流(或文件)中,格式为:
[id, sourceFile, baggage]
其中 id 的值是按排序顺序排列的。流可能如下所示:
[1,"file1", {"a":1}]
[1,"file2", {"b":1}]
[1,"file3", {"c":1}]
[2,"file1", {"d":1}]
[2,"file2", {"e":1}]
[3,"file1", {"f":1}]
此初步步骤需要全局排序,因此您可能需要 select 仔细排序实用程序。
文件源数量不限;不需要每个数组都放在一行上;并且 id 值不必是整数——例如,它们可以是字符串。
让我们假设以上内容在名为 combined.json 的文件中,并且 aggregate.jq 具有如下所示的内容。然后调用:
$ jq -c -n -f aggregate.jq combined.json
会产生:
{"id":1,"file1":{"a":1},"file2":{"b":1},"file3":{"c":1}}
{"id":2,"file1":{"d":1},"file2":{"e":1}}
{"id":3,"file1":{"f":1}}
更正:aggregate.jq:
foreach (inputs,null) as $row
# At each iteration, if .emit then emit it
( {"emit": null, "current": null};
if $row == null
then {emit: .current, current: null} # signal EOF
else {id: $row[0], ($row[1]) : $row[2] } as $this
| if .current == null
then {emit: null, current: $this}
elif $row[0] == .current.id
then .emit = null | .current += $this
else {emit: .current, current: $this}
end
end;
if .emit then .emit else empty end
)
正如其中一条评论所建议的那样,我最终使用 SQL 以我需要的格式导出 JSON。另一个
create or replace function format_data_line(command text, data_str text)
returns setof text language plpgsql as $$
begin
return next command;
return next
replace(
regexp_replace(data_str,
'(\d\d\d\d-\d\d-\d\d)T', ' ', 'g'),
e' \n ', '');
end $$;
COPY (
with f_1 as(
SELECT id, json_agg(fileX.*) AS tag
FROM forum.file3
GROUP BY id
)
SELECT
format_data_line(
format('{"update":{"_index":"forum2","_type":"subject","_id":%s}}',a.id),
format('{"doc":{"id":%s,"fileX":%s}}',
a.id, a.tag))
FROM f_1 a
) TO '/path/to/json/fileX.json';
使用批量 API 导入较大的文件也被证明是有问题的(内存不足 Java 错误)因此需要一个脚本来仅将数据的子集发送到 Curl(对于Elasticsearch 中的索引)在给定时间。该脚本的基本结构是:
#!/bin/bash
FILE=
INC=100
numline=`wc -l $FILE | awk '{print }'`
rm -f output/$FILE.txt
for i in `seq 1 $INC $numline`; do
TIME=`date +%H:%M:%S`
echo "[$TIME] Processing lines from $i to $((i + INC -1))"
rm -f intermediates/interm_file_$i.json
sed -n $i,$((i +INC - 1))p $FILE >> intermediates/interm_file_$i.json
curl -s -XPOST localhost:9200/_bulk --data-binary @intermediates/interm_file_$i.json >> output/$FILE.txt
done
应在脚本文件目录下创建一个"intermediates" 目录。该脚本可以在命令行上保存为 "ESscript" 和 运行:
./ESscript fileX.json