如何在大量对象上过滤 'reduce inputs'?

How do I filter a 'reduce inputs' over a large stream of objects?

我用它来累积唯一键的映射,其值是聚合计数和持续时间总计。目前通过 'reduce inputs'.

每个输入都 运行s
reduce inputs as $r
({};
("Pipeline:" + $r.m."topic.type") as $topic
| ("Channel:" + $r.channel) as $channel
| ("Campaign:" + $r.campaign) as $campaign
| ("Cellcode:" + $r.cellcode) as $cellcode
| ("Tracking:" + $r.tracking) as $tracking
| ("Template:" + $r.m."template.id") as $template
| ("Event:" + $r.name) as $event
| ("Reason:" + $r.reason) as $reason
| ($r.duration|tonumber) as $duration
| (($topic + ":" + $channel + ":" + $campaign + ":" + $cellcode + ":" + $tracking + ":" + $template + ":" + $event + ":" + $reason) as $key
  | .[$key][0] += 1 | .[$key][1] += $duration)

我不知道在哪里放置 select() 过滤器,以便我只对那些通过 'select($r.type == "AUDIT_CHANNEL")' 检查的条目进行归约,以跳过 2 "type":"AUDIT_SYSTEM" 测试数据中的事件:

{"type":"AUDIT_CHANNEL","name":"DROPPED","reason":"INVALID_MAIL_META_DATA","start":"1472083067058","duration":"91","end":"1472083067149","dc":"dev","pool":"raptor-app","host.name":"L-SEA-10002721","host.ip":"10.236.67.80","rlogid":"tfsqiu.dvw9%3FJ*P%40G*25671246-156befd00b2-0x293","channel":"EMAIL","m":{"audited":"1472083067058","created":"1472083066974","enabled":"true","entity.common.version":"1","template.id":"2840df6d-d9e8-4f27-e8b5-918c122d4561","template.version":"17","topic.curname":"eddude-default-topic","topic.curtype":"DEFAULT","topic.dc":"LVS","topic.name":"eddude-default-topic","topic.part":"5","topic.type":"DEFAULT"},"id":"0AEC4350-1C6E2FC9B80-0156BEF9ED92-0000000000000003","campaign":"999","contract":"a5872a5c-8912-dd63-583f-61fa8db3efde","user":1276847275,"cellcode":"","age":"175"}

{"type":"AUDIT_SYSTEM","name":"ROTATED","start":"1472083081033","duration":"0","end":"1472083081033","dc":"dev","pool":"raptor-app","host.name":"L-SEA-10002721","host.ip":"10.236.67.80","rlogid":"tfsqiu.dvw9%3FJ*P%40G*25671246-156befd3749-0xce"}

{"type":"AUDIT_SYSTEM","name":"ROTATED","start":"1472083141034","duration":"0","end":"1472083141034","dc":"dev","pool":"raptor-app","host.name":"L-SEA-10002721","host.ip":"10.236.67.80","rlogid":"tfsqiu.dvw9%3FJ*P%40G*25671246-156befe21aa-0xce"}

{"type":"AUDIT_CHANNEL","name":"RECEIVED","start":"1472083158860","duration":"109","end":"1472083158969","dc":"dev","pool":"raptor-app","host.name":"L-SEA-10002721","host.ip":"10.236.67.80","rlogid":"tfsqiu.dvw9%3FJ*P%40G*25671246-156befe674c-0x10f","channel":"EMAIL","m":{"audited":"1472083158860","created":"1472083158860","enabled":"true","entity.common.version":"1","template.id":"2840df6d-d9e8-4f27-e8b5-918c122d4561","template.version":"17","topic.curname":"eddude-default-topic","topic.curtype":"DEFAULT","topic.dc":"LVS","topic.name":"eddude-default-topic","topic.part":"5","topic.type":"DEFAULT"},"id":"0AEC4350-1C6E2FC9B80-0156BEF9ED92-0000000000000004","campaign":"999","contract":"a5872a5c-8912-dd63-583f-61fa8db3efde","user":1276847275,"cellcode":"","age":"109"}

我尝试将它放在 reduce 前面、reduce 内部等,但我没有得到所需的输出:

{
  "Pipeline:DEFAULT:Channel:EMAIL:Campaign:999:Cellcode::Tracking::Template:2840df6d-d9e8-4f27-e8b5-918c122d4561:Event:DROPPED:Reason:INVALID_MAIL_META_DATA": [
    1,
    91
  ],
  "Pipeline:DEFAULT:Channel:EMAIL:Campaign:999:Cellcode::Tracking::Template:2840df6d-d9e8-4f27-e8b5-918c122d4561:Event:RECEIVED:Reason:": [
    1,
    109
  ]
}

我是否必须完全在 reduce 之外执行过滤 运行,或者我只是不知道如何使用单个 filter-and-reduce 执行此操作?

顺便说一句,假设此输入是一个包含数百万条记录的巨大流,其中有数百条独特的 "keys" 被计算用于累积。

inputs 将为输入的每个输入生成一个结果。你想按类型过滤这些输入,这样你就可以把你的过滤器放在那里:

reduce (inputs | select(.type == "AUDIT_CHANNEL")) as $r ...

我会这样写你的过滤器:

reduce (inputs | select(.type == "AUDIT_CHANNEL")) as $r ({};
    ([
        "Pipeline", $r.m."topic.type",
        "Channel",  $r.channel,
        "Campaign", $r.campaign,
        "Cellcode", $r.cellcode,
        "Tracking", $r.tracking,
        "Template", $r.m."template.id",
        "Event",    $r.name,
        "Reason",   $r.reason
    ] | join(":")) as $key
    | .[$key] |= [ .[0]+1, .[1]+($r.duration|tonumber) ]
)