jq:如何为大型流媒体用例定制原子化功能?

jq: how to customize the atomize function for a large streaming use-case?

我有一个非常大的文件,其结构如下:

{ 
  "users": { ... },
  ...
  "stats": {
    "daily": {
      "k1": { ... },
      "k2": { ... },
      ...
      "kN": { ... }
    },
    "monthly": {
      "p1": { ... },
      "p2": { ... },
      ...
      "pN": { ... }
    }
  }
}

stats中只有两个键:dailymonthly,它们都包含非常大量的键值对。

我想分别传输 .stats.daily.stats.monthly 中的所有键值对。 如果文件很小,我会简单地做 jq '.stats.daily' myfile.jsonjq '.stats.monthly' myfile.json

我不知道如何编辑食谱中的 atomize 函数来执行我想要的操作。这是我正在尝试但不起作用的方法:

jq -nc --stream '
  def atomize(s):
    fromstream(foreach s as $in ( {previous:null, emit: null};
      if ($in | length == 2) and ($in|.[0][0]) != .previous and .previous != null
      then {emit: [[.previous]], previous: $in|.[0][0]}
      else { previous: ($in|.[0][0]), emit: null}
      end;
      (.emit // empty), $in) ) ;
  atomize(2|truncate_stream(inputs | select(.[0][0] == "daily"))

有人可以解释一下它是如何工作的以及如何针对我的用例修复它吗?谢谢

既然您已经表示要将 "daily" 值与 "monthly" 值分开处理,那么让我们关注前者。

为此,我们先使用 fromstreamtruncate_stream:

输入类似于给定的示例,但进行了调整以使其有效 JSON:

fromstream( 1|truncate_stream(1|truncate_stream(
  inputs | select( .[0][0] == "stats" and .[0][1] == "daily" ) )) )

会产生:

{"k1":{"a":[1]},"k2":{"a":[1]},"kN":{"a":[1]}}

如果你有jq 1.6那么上面的jq过滤器可以精简为:

fromstream(2|truncate_stream(
  inputs | select( .[0][0:2] == ["stats","daily"] ) ))

现在我们只需要用atomize代替fromstream就可以得到想要的结果。比如使用jq 1.6,我们看到:

atomize(2|truncate_stream(
  inputs | select( .[0][0:2] == ["stats","daily"] ) ))

会产生:

{"k1":{"a":[1]}}
{"k2":{"a":[1]}}
{"kN":{"a":[1]}}

调用

jq -n -c --stream -f program.jq input.json

效率提升

假设输入中的对象没有重复键,可以简化上述解决方案,以便在处理完感兴趣的键后,不再进行进一步处理。这可以使用下面定义的 run/3 来实现。流式解决方案则变为:

atomize( 1 | truncate_stream( 1 | truncate_stream(
  run( inputs; .[0][0:2]; ["stats", "daily"] ))))

或使用 jq 1.6:

atomize( 2 | truncate_stream(
  run( inputs; .[0][0:2]; ["stats", "daily"] )))

run/3

# emit the first run of items in the stream for which f == $value
def run(stream; f; $value):
  label $done
  | foreach stream as $x ( {};
      ($x | f) as $k
      | if .start then (if $k == $value then . else .stop = true end)
        elif $k == $value then .start = true
        else .
        end;
      if .stop then break $done 
      elif .start then $x
      else empty
      end );