删除字典字典中的循环数据以用于桑基图中

Removing circular data in a dictionary of dictionaries for use in Sankey diagram

我的个人会计应用程序遇到了 Sankey 图的挑战。问题是使用 Google 图表生成的 Sankey,如果现金在一个圆圈内移动,例如从账本 A 到账本 B 再到账本 C 再回到 A,则不会呈现。

脚本需要一个额外的步骤来评估数据,如果有一个圆周运动,那么它应该通过删除具有最低值的 link 来打破那个圆圈。

代码以字典的字典开头,其中包含在每对可能的分类账之间移动的现金数量。零意味着两个分类账之间没有现金流动。这部词典的词典要评价,圈子要破。

下面是一些基于@JacobDavis 回复的代码。它确实成功地找到了圈子,但并非总是如此。在下面的示例中,您可以看到 Ledger A 通向 B。但是 B 通向 C D。代码仅检查 C,因此错过了由 D.

该代码尚未尝试通过删除 link 来打破循环。试图让它首先识别周期。

all_ledgers = {
  "a": {"a": 0, "b": 1, "c": 0, "d": 0, "e": 0},
  "b": {"a": 0, "b": 0, "c": 1, "d": 1, "e": 0},
  "c": {"a": 0, "b": 0, "c": 0, "d": 0, "e": 1},
  "d": {"a": 1, "b": 0, "c": 0, "d": 0, "e": 0},
  "e": {"a": 0, "b": 0, "c": 0, "d": 0, "e": 0}
}

def evaluate_for_circular(dictionary_of_dictionaries):
  
  output = [] # Will put circular ledgers into here.
  
  def evaluate(start,target=None,cache=None):
  
      if not target:
          target=start
      if not cache:
        cache=[start]
      # Here we are looking at a  new row and will iterate through rows until we find our target (which is same as the row ledger)
      print('Evaluatating. Start: '+str(start)+'. Target: '+str(target))
  
      for ledger in dictionary_of_dictionaries[start].keys():
          # We now iterate through the items in the row. We use the keys rather than values as we're looking
          # for the target.
          print('Dealing with ledger '+str(ledger))
          print('Cache: '+str(cache))
          if dictionary_of_dictionaries[start][ledger]>0 and ledger==target:
              return ledger
          elif dictionary_of_dictionaries[start][ledger]>0 and ledger not in cache:
              cache.append(ledger)
              return evaluate(ledger,target,cache)
              #return evaluate(ledger,target)
      return False

  for dict in dictionary_of_dictionaries.keys():
    print('--')
    print('Starting evaluation of row '+str(dict))
    if evaluate(dict):
      output.append(dict)
      
  if output:
    return output
  else:
    return False
 
q = evaluate_for_circular(all_ledgers)

if q:
    print("Circle found in data: "+str(q))
else:
    print("No circle found in data.")

查看下面的示例。 “all_ledgers”应该代表您的 top-level 词典。它递归地循环遍历具有正值的键,如果它遇到初始目标(第一次迭代的起始值)则返回 True 。如果它通过了整个搜索而没有找到循环,则它 returns False。缓存是为了防止无限循环。

all_ledgers={'a':{'b':0,'c':1},
      'b':{'a':1},
      'c':{'b':1}}

def find_cycle(start,target=""):
    if target=="":
        target=start
    try:
        cache
    except NameError:
        cache = [start]
    for ledger in all_ledgers[start].keys():
        if all_ledgers[start][ledger]>0 and ledger==target:
            return True
        elif all_ledgers[start][ledger]>0 and ledger not in cache:
            cache.append(ledger)
            return find_cycle(ledger,target)
    return False
        
if find_cycle('a'):
    print("There is a cycle.")
else:
    print("There is no cycle.")

根据@JacobDavis 的回答,您需要让它完成循环。现在它可以在找到第一个传输后过早退出。

ledgers = {
  "a": {"a": 0, "b": 2, "c": 3, "d": 0, "e": 0},
  "b": {"a": 0, "b": 0, "c": 1, "d": 1, "e": 0},
  "c": {"a": 1, "b": 0, "c": 0, "d": 0, "e": 1},
  "d": {"a": 1, "b": 0, "c": 0, "d": 0, "e": 0},
  "e": {"a": 0, "b": 0, "c": 0, "d": 0, "e": 0}
}


def find_cycle(all_ledgers: dict, start: str, target="", cycle=None, v=None):
if cycle is None:
    cycle = [start]
else:
    cycle = cycle + [start]

if v is None:
    v = []

if target == "":
    target = start

for ledger in all_ledgers[start].keys():
    if all_ledgers[start][ledger] > 0 and ledger == target:
        v = v + [all_ledgers[start][ledger]]
        return True, cycle, v
    elif all_ledgers[start][ledger] > 0 and ledger not in cycle:
        flag, cycle2, v = find_cycle(all_ledgers, ledger, target, cycle, v)
        if flag:  # this check will make the loop continue over all sub-ledgers
            v = v + [all_ledgers[start][ledger]]
            return flag, cycle2, v

return False, [], []

这应该找到第一个循环和return涉及的分类帐以及循环中的转移值(以相反的顺序)。

使用此信息打破循环的方法:

def break_cycle(all_ledgers: dict):
for sub_ledger in all_ledgers.keys():
    chk, cyc, u = find_cycle(ledgers, sub_ledger)
    u = list(reversed(u))
    if chk:
        print(f"There is a cycle starting in ledger {sub_ledger}: {cyc}")
        print(f'cycle values found: {u}')
        min_transfer_index = u.index(min(u))
        ledger1 = cyc[min_transfer_index]
        if min_transfer_index == len(cyc)-1:
            ledger2 = cyc[0]
        else:
            ledger2 = cyc[min_transfer_index + 1]
        print(f'setting the transfer value from {ledger1} to {ledger2} to 0')
        ledgers[ledger1][ledger2] = 0
        return True
    else:
        print(f"There is no cycle starting in ledger {sub_ledger}")
return False

最后打破所有循环:

while break_cycle(ledgers):
    continue

请注意,这不是很有效,因为每次循环中断时它都会从头扫描整个数据集,但它应该可以完成工作。