如何对动作的依赖树进行排序和分块,以便您可以在每个步骤中将尽可能多的动作打包在一起?

How to sort and chunk a dependency tree of actions, so you can batch as many actions as possible together at each step?

假设您有一堆操作 creating/inserting 记录到一堆不同的数据库 table 中。您有一些记录可以插入而不依赖于任何其他插入的输出。你有一些需要等待另一件事完成。还有其他人需要等待很多事情完成,这些事情可能会在流程中的不同时间完成。

你如何编写一个算法来对依赖树中的操作进行排序和分块,以便可以最佳地批处理插入/数据库操作?通过最佳批处理,我的意思是如果您可以一次将 10 条记录插入同一个 table,那么就这样做。任何时候你都可以批量插入,你应该尽量减少数据库的数量calls/inserts.

这是示例代码的一个片段,我使用简单的数据结构将其与伪造的操作序列结合在一起,以捕获所有必需的信息。

...
{ action: 'create', table: 'tb1', set: 'key12', input: {
  p: { type: 'binding', path: ['key10', 'z'] },
  q: { type: 'binding', path: ['key11', 'a'] }
} },
{ action: 'create', table: 'tb4', set: 'key13' },
{ action: 'create', table: 'tb3', set: 'key14' },
{ action: 'create', table: 'tb4', set: 'key15', input: {
  a: { type: 'binding', path: ['key8', 'z'] },
} },
...

请注意,我们的“依赖节点项”有 4 个可能的属性:

鉴于此,应该有可能以某种方式构建一个简单的算法,将操作分块成子集,这些子集可以 运行 并行和批处理。例如,我们下面的代码最终会像这样的结构(我手动创建了这个输出,所以可能会有错误,尽管我认为我做对了。哦,请注意,虽然 table 是编号的,但它没有' 必然向他们暗示命令,只是简单的名称可供选择):

// this is "close to" the desired result, because
// there might be a slightly different sort applied
// to this when implemented, but the chunking pattern
// and grouping of elements should be exactly like This
// (I am pretty sure, I manually did this 
// so there could be small mistakes, but I doubt it)
const closeToDesiredResult = [
  [
    [
      { action: 'create', table: 'tb1', set: 'key1' },
      { action: 'create', table: 'tb1', set: 'key21' },
    ],
    [
      { action: 'create', table: 'tb2', set: 'key2' },
      { action: 'create', table: 'tb2', set: 'key3' },
      { action: 'create', table: 'tb2', set: 'key23' },
    ],
    [
      { action: 'create', table: 'tb4', set: 'key6' },
      { action: 'create', table: 'tb4', set: 'key8' },
      { action: 'create', table: 'tb4', set: 'key13' },
    ],
    [
      { action: 'create', table: 'tb3', set: 'key5' },
      { action: 'create', table: 'tb3', set: 'key7' },
      { action: 'create', table: 'tb3', set: 'key9' },
      { action: 'create', table: 'tb3', set: 'key14' },
      { action: 'create', table: 'tb3', set: 'key24' },
    ],
    [
      { action: 'create', table: 'tb6', set: 'key17' },
    ],
    [
      { action: 'create', table: 'tb5', set: 'key16' },
    ]
  ],
  [
    [
      { action: 'create', table: 'tb1', set: 'key4', input: {
        x: { type: 'binding', path: ['key2', 'baz'] }
      } },
    ],
    [
      { action: 'create', table: 'tb3', set: 'key10', input: {
        y: { type: 'binding', path: ['key6', 'foo'] },
        z: { type: 'binding', path: ['key1', 'bar'] }
      } },
    ],
    [
      { action: 'create', table: 'tb4', set: 'key15', input: {
        a: { type: 'binding', path: ['key8', 'z'] },
      } },
    ]
  ],
  [
    [
      { action: 'create', table: 'tb1', set: 'key12', input: {
        p: { type: 'binding', path: ['key10', 'z'] },
        q: { type: 'binding', path: ['key11', 'a'] }
      } },
    ],
    [
      { action: 'create', table: 'tb4', set: 'key11', input: {
        a: { type: 'binding', path: ['key10', 'z'] },
        b: { type: 'binding', path: ['key1', 'bar'] }
      } },
    ],
    [
      { action: 'create', table: 'tb6', set: 'key18', input: {
        m: { type: 'binding', path: ['key4', 'x'] },
      } },
      { action: 'create', table: 'tb6', set: 'key19', input: {
        m: { type: 'binding', path: ['key4', 'x'] },
        n: { type: 'binding', path: ['key13', 'a'] },
      } },
    ]
  ],
  [
    [
      { action: 'create', table: 'tb2', set: 'key22', input: {
        w: { type: 'binding', path: ['key18', 'm'] },
        x: { type: 'binding', path: ['key17', 'm'] },
      } },
    ],
    [
      { action: 'create', table: 'tb6', set: 'key20', input: {
        m: { type: 'binding', path: ['key18', 'm'] },
        n: { type: 'binding', path: ['key17', 'm'] },
      } },
    ]
  ]
]

注意结果数组中有 4 个顶级块。这些是主要步骤。然后在每一步中,所有的东西都按table分组,所以它们都可以并行运行,并且在每个table组中,它们都可以批量插入。轰.

你会如何实现这个,我的头脑似乎很难理解?

const actionTree = generateActionTree()
const chunkedActionTree = chunkDependencyTree(actionTree)

function chunkDependencyTree(list) {
  const independentOnesMapByTableName = {}
  list.forEach(node => {
    // easy case
    if (!node.input) {
      const group = independentOnesMapByTableName[node.table]
        = independentOnesMapByTableName[node.table] ?? []
      group.push(node)
    } else {
      // I am at a loss for words...
    }
  })
}

function generateActionTree() {
  // this would be constructed through a bunch of real-world
  // functions, queuing up all the actions
  // and pointing outputs to inputs.
  return [
    { action: 'create', table: 'tb1', set: 'key1' },
    { action: 'create', table: 'tb2', set: 'key2' },
    { action: 'create', table: 'tb2', set: 'key3' },
    { action: 'create', table: 'tb3', set: 'key5' },
    { action: 'create', table: 'tb4', set: 'key6' },
    { action: 'create', table: 'tb3', set: 'key7' },
    { action: 'create', table: 'tb4', set: 'key8' },
    { action: 'create', table: 'tb3', set: 'key9' },
    { action: 'create', table: 'tb3', set: 'key10', input: {
      y: { type: 'binding', path: ['key6', 'foo'] },
      z: { type: 'binding', path: ['key1', 'bar'] }
    } },
    { action: 'create', table: 'tb1', set: 'key4', input: {
      x: { type: 'binding', path: ['key2', 'baz'] }
    } },
    { action: 'create', table: 'tb4', set: 'key11', input: {
      a: { type: 'binding', path: ['key10', 'z'] },
      b: { type: 'binding', path: ['key1', 'bar'] }
    } },
    { action: 'create', table: 'tb1', set: 'key12', input: {
      p: { type: 'binding', path: ['key10', 'z'] },
      q: { type: 'binding', path: ['key11', 'a'] }
    } },
    { action: 'create', table: 'tb4', set: 'key13' },
    { action: 'create', table: 'tb3', set: 'key14' },
    { action: 'create', table: 'tb4', set: 'key15', input: {
      a: { type: 'binding', path: ['key8', 'z'] },
    } },
    { action: 'create', table: 'tb5', set: 'key16' },
    { action: 'create', table: 'tb6', set: 'key17' },
    { action: 'create', table: 'tb6', set: 'key18', input: {
      m: { type: 'binding', path: ['key4', 'x'] },
    } },
    { action: 'create', table: 'tb6', set: 'key19', input: {
      m: { type: 'binding', path: ['key4', 'x'] },
      n: { type: 'binding', path: ['key13', 'a'] },
    } },
    { action: 'create', table: 'tb6', set: 'key20', input: {
      m: { type: 'binding', path: ['key18', 'm'] },
      n: { type: 'binding', path: ['key17', 'm'] },
    } },
    { action: 'create', table: 'tb1', set: 'key21' },
    { action: 'create', table: 'tb2', set: 'key22', input: {
      w: { type: 'binding', path: ['key18', 'm'] },
      x: { type: 'binding', path: ['key17', 'm'] },
    } },
    { action: 'create', table: 'tb2', set: 'key23' },
    { action: 'create', table: 'tb3', set: 'key24' },
  ]
}

我认为这大致是 topological sorting,但不太确定如何将其应用于这种特定情况。

我不清楚你的数据结构。什么是单字母 ID pq 等?我也不明白 table 的作用。您可以一次写入同一个 table 中插入多个项目,不是吗?我假设这些 tihngs 在根排序问题中无关紧要。

我将 set 字段视为“作业”,并将 inputs 中提到的相应键视为依赖项:必须在它之前完成的作业。

我没有时间在这里详述,而且我手边没有 javascript 环境,所以在 Python.

让我们从冗长的数据结构中提取依赖关系图。然后寻找“水平”。第一层是所有没有依赖关系的节点。第二个是在任何先前级别等中满足依赖关系的所有节点。冲洗并重复。

请注意,与我在评论中的想法不同,这不是传统定义的水平图。

此外,我不会费心使用数据结构来提高效率。您可以在 O(n log n) 时间内完成。我的代码是 O(n^2).

抱歉,如果我误解了你的问题。也很抱歉这里未经测试,可能有错误的实现。

from collections import defaultdict

def ExtractGraph(cmds):
  """Gets a dependency graph from verbose command input data."""
  graph = defaultdict(set)
  for cmd in cmds:
    node = cmd['set']
    graph[node].update(set())
    inputs = cmd.get('input')
    if inputs:
      for _, val in inputs.items():
        graph[node].add(val['path'][0])
  return graph

def FindSources(graph):
  """Returns the nodes of the given graph having no dependencies."""
  sources = set()
  for node, edges in graph.items():
    if not edges:
      sources.add(node)
  return sources

def GetLevels(dependencies):
  """Returns sequence levels satisfying given dependency graph."""
  sources = FindSources(dependencies)
  level = set(sources)
  done = set(level)
  todos = dependencies.keys() - done
  levels = []
  while level:
    levels.append(level)
    # Next level is jobs that have all dependencies done
    new_level = set()
    # A clever data structure could find the next level in O(k log n)
    # for a level size of k and n jobs. This needs O(n).
    for todo in todos:
      if dependencies[todo].issubset(done):
        new_level.add(todo)
    todos.difference_update(new_level)
    done.update(new_level)
    level = new_level
  return levels

cmds = [
    { 'action' : 'create', 'table' : 'tb1', 'set' : 'key1' },
    { 'action' : 'create', 'table' : 'tb2', 'set' : 'key2' },
    { 'action' : 'create', 'table' : 'tb2', 'set' : 'key3' },
    { 'action' : 'create', 'table' : 'tb3', 'set' : 'key5' },
    { 'action' : 'create', 'table' : 'tb4', 'set' : 'key6' },
    { 'action' : 'create', 'table' : 'tb3', 'set' : 'key7' },
    { 'action' : 'create', 'table' : 'tb4', 'set' : 'key8' },
    { 'action' : 'create', 'table' : 'tb3', 'set' : 'key9' },
    { 'action' : 'create', 'table' : 'tb3', 'set' : 'key10', 'input' : {
      'y' : { 'type' : 'binding', 'path' : ['key6', 'foo'] },
      'z' : { 'type' : 'binding', 'path' : ['key1', 'bar'] }
    } },
    { 'action' : 'create', 'table' : 'tb1', 'set' : 'key4', 'input' : {
      'x' : { 'type' : 'binding', 'path' : ['key2', 'baz'] }
    } },
    { 'action' : 'create', 'table' : 'tb4', 'set' : 'key11', 'input' : {
      'a' : { 'type' : 'binding', 'path' : ['key10', 'z'] },
      'b' : { 'type' : 'binding', 'path' : ['key1', 'bar'] }
    } },
    { 'action' : 'create', 'table' : 'tb1', 'set' : 'key12', 'input' : {
      'p' : { 'type' : 'binding', 'path' : ['key10', 'z'] },
      'q' : { 'type' : 'binding', 'path' : ['key11', 'a'] }
    } },
    { 'action' : 'create', 'table' : 'tb4', 'set' : 'key13' },
    { 'action' : 'create', 'table' : 'tb3', 'set' : 'key14' },
    { 'action' : 'create', 'table' : 'tb4', 'set' : 'key15', 'input' : {
      'a' : { 'type' : 'binding', 'path' : ['key8', 'z'] },
    } },
    { 'action' : 'create', 'table' : 'tb5', 'set' : 'key16' },
    { 'action' : 'create', 'table' : 'tb6', 'set' : 'key17' },
    { 'action' : 'create', 'table' : 'tb6', 'set' : 'key18', 'input' : {
      'm' : { 'type' : 'binding', 'path' : ['key4', 'x'] },
    } },
    { 'action' : 'create', 'table' : 'tb6', 'set' : 'key19', 'input' : {
      'm' : { 'type' : 'binding', 'path' : ['key4', 'x'] },
      'n' : { 'type' : 'binding', 'path' : ['key13', 'a'] },
    } },
    { 'action' : 'create', 'table' : 'tb6', 'set' : 'key20', 'input' : {
      'm' : { 'type' : 'binding', 'path' : ['key18', 'm'] },
      'n' : { 'type' : 'binding', 'path' : ['key17', 'm'] },
    } },
    { 'action' : 'create', 'table' : 'tb1', 'set' : 'key21' },
    { 'action' : 'create', 'table' : 'tb2', 'set' : 'key22', 'input' : {
      'w' : { 'type' : 'binding', 'path' : ['key18', 'm'] },
      'x' : { 'type' : 'binding', 'path' : ['key17', 'm'] },
    } },
    { 'action' : 'create', 'table' : 'tb2', 'set' : 'key23' },
    { 'action' : 'create', 'table' : 'tb3', 'set' : 'key24' },
]
    
dependencies = ExtractGraph(cmds)
levels = GetLevels(dependencies)
print(levels)

当运行时,这找到了几个级别:

[
{'key9', 'key3', 'key24', 'key7', 'key17', 'key8', 'key21', 'key1',
     'key5', 'key2', 'key16', 'key6', 'key23', 'key13', 'key14'}, 
{'key15', 'key4', 'key10'}, 
{'key19', 'key18', 'key11'}, 
{'key22', 'key12', 'key20'}
]

为了进行抽查,让我们看看 key12。它有 10 和 11 作为依赖项。 key10 有 6 和 1。key11 有 10 和 1。键 1 和 6 有 none。我们发现

  • 级别 0 中的 1 和 6(无依赖项),
  • 10(需要 1 和 6)级别 1,
  • 2 级 11(需要 10 和 1),
  • 3 级 12(需要 10 和 11)。

每项工作在满足其依赖性后立即完成。所以这是令人鼓舞的。但是,必须进行更彻底的测试。

如果您需要将这些级别进一步分组为每个 table 的单独插入,这是一个 post 处理步骤。一个级别内的结果插入可以并行完成。

我们有这个:

const actionTree = [
    { action: 'create', table: 'tb1', set: 'key1' },
    { action: 'create', table: 'tb2', set: 'key2' },
...
    { action: 'create', table: 'tb3', set: 'key24' },
  ];

我们要填写这个:

batches = [];

假设我们只需要确保所有相关的输入集都已经被插入,并且输入的第二项 path: ['key8', 'z'](在本例中为 z)不会影响任何东西,因为集是原子的,我们所要做的就是:

batches = [];

batched = () => batches.reduce((p,a)=>[...p,...a],[]);
unbatched = () => actionTree.filter(b=>batched().indexOf(b)<0);

nextbatchfilter = (a) => (!("input" in a))||(Object.values(a.input).filter(i=>batched().map(a=>a.set).indexOf(i.path[0])<0).length==0);

while (unbatched().length>0)
    batches.push(unbatched().filter(nextbatchfilter));
    if (batches[batches.length-1].length==0) {
        console.log("could not build dependency graph with all items, "+unbatched().length.toString()+" items remaining")
        break; // avoid infinite loop in case of impossible tree
    }

此处batched()通过展平batches显示哪些动作已被批处理;而 unbatched() 则相反,它显示了哪些操作仍需要进行批处理。 nextbachfilter 显示未批处理的哪些可以批处理。有了这些就可以完成我们的工作了。

可以修改代码以减少在计算 unbatchedbatched 时过多的 cpu 返工,方法是保留中间状态:

batches = [];

unbatched = Array.from(actionTree);

nextbatchfilter = (a) => (!("input" in a))||(Object.values(a.input).filter(i=>!(batchedsets.has(i.path[0]))).length==0);

batchedsets = new Set();
while (unbatched.length>0) {
    nextbatch = unbatched.filter(nextbatchfilter);
    if (nextbatch.length==0) {
        console.log("could not build dependency graph with all items, "+unbatched.length.toString()+" items remaining")
        break; // avoid infinite loop in case of impossible tree
    }
    unbatched = unbatched.filter(a=>!nextbatchfilter(a));
    batches.push(nextbatch);
    nextbatch.forEach(a=>batchedsets.add(a.set));
}

在这两种情况下,batches 输出不会按 tables 对操作进行分组,为了查看按 table 分组的操作,就像在示例中一样,只需要:

batches.map(b=>Array.from(new Set(b.map(a=>a.table))).map(t=>b.filter(a=>a.table==t)));

可选地,它可以通过已经到位的这个组来构建。

编辑:为两种解决方案添加了无限循环保护