删除字典字典中的循环数据以用于桑基图中
Removing circular data in a dictionary of dictionaries for use in Sankey diagram
我的个人会计应用程序遇到了 Sankey 图的挑战。问题是使用 Google 图表生成的 Sankey,如果现金在一个圆圈内移动,例如从账本 A 到账本 B 再到账本 C 再回到 A,则不会呈现。
脚本需要一个额外的步骤来评估数据,如果有一个圆周运动,那么它应该通过删除具有最低值的 link 来打破那个圆圈。
代码以字典的字典开头,其中包含在每对可能的分类账之间移动的现金数量。零意味着两个分类账之间没有现金流动。这部词典的词典要评价,圈子要破。
下面是一些基于@JacobDavis 回复的代码。它确实成功地找到了圈子,但并非总是如此。在下面的示例中,您可以看到 Ledger A 通向 B。但是 B 通向 C 和 D。代码仅检查 C,因此错过了由 D.
该代码尚未尝试通过删除 link 来打破循环。试图让它首先识别周期。
all_ledgers = {
"a": {"a": 0, "b": 1, "c": 0, "d": 0, "e": 0},
"b": {"a": 0, "b": 0, "c": 1, "d": 1, "e": 0},
"c": {"a": 0, "b": 0, "c": 0, "d": 0, "e": 1},
"d": {"a": 1, "b": 0, "c": 0, "d": 0, "e": 0},
"e": {"a": 0, "b": 0, "c": 0, "d": 0, "e": 0}
}
def evaluate_for_circular(dictionary_of_dictionaries):
output = [] # Will put circular ledgers into here.
def evaluate(start,target=None,cache=None):
if not target:
target=start
if not cache:
cache=[start]
# Here we are looking at a new row and will iterate through rows until we find our target (which is same as the row ledger)
print('Evaluatating. Start: '+str(start)+'. Target: '+str(target))
for ledger in dictionary_of_dictionaries[start].keys():
# We now iterate through the items in the row. We use the keys rather than values as we're looking
# for the target.
print('Dealing with ledger '+str(ledger))
print('Cache: '+str(cache))
if dictionary_of_dictionaries[start][ledger]>0 and ledger==target:
return ledger
elif dictionary_of_dictionaries[start][ledger]>0 and ledger not in cache:
cache.append(ledger)
return evaluate(ledger,target,cache)
#return evaluate(ledger,target)
return False
for dict in dictionary_of_dictionaries.keys():
print('--')
print('Starting evaluation of row '+str(dict))
if evaluate(dict):
output.append(dict)
if output:
return output
else:
return False
q = evaluate_for_circular(all_ledgers)
if q:
print("Circle found in data: "+str(q))
else:
print("No circle found in data.")
查看下面的示例。 “all_ledgers”应该代表您的 top-level 词典。它递归地循环遍历具有正值的键,如果它遇到初始目标(第一次迭代的起始值)则返回 True 。如果它通过了整个搜索而没有找到循环,则它 returns False。缓存是为了防止无限循环。
all_ledgers={'a':{'b':0,'c':1},
'b':{'a':1},
'c':{'b':1}}
def find_cycle(start,target=""):
if target=="":
target=start
try:
cache
except NameError:
cache = [start]
for ledger in all_ledgers[start].keys():
if all_ledgers[start][ledger]>0 and ledger==target:
return True
elif all_ledgers[start][ledger]>0 and ledger not in cache:
cache.append(ledger)
return find_cycle(ledger,target)
return False
if find_cycle('a'):
print("There is a cycle.")
else:
print("There is no cycle.")
根据@JacobDavis 的回答,您需要让它完成循环。现在它可以在找到第一个传输后过早退出。
ledgers = {
"a": {"a": 0, "b": 2, "c": 3, "d": 0, "e": 0},
"b": {"a": 0, "b": 0, "c": 1, "d": 1, "e": 0},
"c": {"a": 1, "b": 0, "c": 0, "d": 0, "e": 1},
"d": {"a": 1, "b": 0, "c": 0, "d": 0, "e": 0},
"e": {"a": 0, "b": 0, "c": 0, "d": 0, "e": 0}
}
def find_cycle(all_ledgers: dict, start: str, target="", cycle=None, v=None):
if cycle is None:
cycle = [start]
else:
cycle = cycle + [start]
if v is None:
v = []
if target == "":
target = start
for ledger in all_ledgers[start].keys():
if all_ledgers[start][ledger] > 0 and ledger == target:
v = v + [all_ledgers[start][ledger]]
return True, cycle, v
elif all_ledgers[start][ledger] > 0 and ledger not in cycle:
flag, cycle2, v = find_cycle(all_ledgers, ledger, target, cycle, v)
if flag: # this check will make the loop continue over all sub-ledgers
v = v + [all_ledgers[start][ledger]]
return flag, cycle2, v
return False, [], []
这应该找到第一个循环和return涉及的分类帐以及循环中的转移值(以相反的顺序)。
使用此信息打破循环的方法:
def break_cycle(all_ledgers: dict):
for sub_ledger in all_ledgers.keys():
chk, cyc, u = find_cycle(ledgers, sub_ledger)
u = list(reversed(u))
if chk:
print(f"There is a cycle starting in ledger {sub_ledger}: {cyc}")
print(f'cycle values found: {u}')
min_transfer_index = u.index(min(u))
ledger1 = cyc[min_transfer_index]
if min_transfer_index == len(cyc)-1:
ledger2 = cyc[0]
else:
ledger2 = cyc[min_transfer_index + 1]
print(f'setting the transfer value from {ledger1} to {ledger2} to 0')
ledgers[ledger1][ledger2] = 0
return True
else:
print(f"There is no cycle starting in ledger {sub_ledger}")
return False
最后打破所有循环:
while break_cycle(ledgers):
continue
请注意,这不是很有效,因为每次循环中断时它都会从头扫描整个数据集,但它应该可以完成工作。
我的个人会计应用程序遇到了 Sankey 图的挑战。问题是使用 Google 图表生成的 Sankey,如果现金在一个圆圈内移动,例如从账本 A 到账本 B 再到账本 C 再回到 A,则不会呈现。
脚本需要一个额外的步骤来评估数据,如果有一个圆周运动,那么它应该通过删除具有最低值的 link 来打破那个圆圈。
代码以字典的字典开头,其中包含在每对可能的分类账之间移动的现金数量。零意味着两个分类账之间没有现金流动。这部词典的词典要评价,圈子要破。
下面是一些基于@JacobDavis 回复的代码。它确实成功地找到了圈子,但并非总是如此。在下面的示例中,您可以看到 Ledger A 通向 B。但是 B 通向 C 和 D。代码仅检查 C,因此错过了由 D.
该代码尚未尝试通过删除 link 来打破循环。试图让它首先识别周期。
all_ledgers = {
"a": {"a": 0, "b": 1, "c": 0, "d": 0, "e": 0},
"b": {"a": 0, "b": 0, "c": 1, "d": 1, "e": 0},
"c": {"a": 0, "b": 0, "c": 0, "d": 0, "e": 1},
"d": {"a": 1, "b": 0, "c": 0, "d": 0, "e": 0},
"e": {"a": 0, "b": 0, "c": 0, "d": 0, "e": 0}
}
def evaluate_for_circular(dictionary_of_dictionaries):
output = [] # Will put circular ledgers into here.
def evaluate(start,target=None,cache=None):
if not target:
target=start
if not cache:
cache=[start]
# Here we are looking at a new row and will iterate through rows until we find our target (which is same as the row ledger)
print('Evaluatating. Start: '+str(start)+'. Target: '+str(target))
for ledger in dictionary_of_dictionaries[start].keys():
# We now iterate through the items in the row. We use the keys rather than values as we're looking
# for the target.
print('Dealing with ledger '+str(ledger))
print('Cache: '+str(cache))
if dictionary_of_dictionaries[start][ledger]>0 and ledger==target:
return ledger
elif dictionary_of_dictionaries[start][ledger]>0 and ledger not in cache:
cache.append(ledger)
return evaluate(ledger,target,cache)
#return evaluate(ledger,target)
return False
for dict in dictionary_of_dictionaries.keys():
print('--')
print('Starting evaluation of row '+str(dict))
if evaluate(dict):
output.append(dict)
if output:
return output
else:
return False
q = evaluate_for_circular(all_ledgers)
if q:
print("Circle found in data: "+str(q))
else:
print("No circle found in data.")
查看下面的示例。 “all_ledgers”应该代表您的 top-level 词典。它递归地循环遍历具有正值的键,如果它遇到初始目标(第一次迭代的起始值)则返回 True 。如果它通过了整个搜索而没有找到循环,则它 returns False。缓存是为了防止无限循环。
all_ledgers={'a':{'b':0,'c':1},
'b':{'a':1},
'c':{'b':1}}
def find_cycle(start,target=""):
if target=="":
target=start
try:
cache
except NameError:
cache = [start]
for ledger in all_ledgers[start].keys():
if all_ledgers[start][ledger]>0 and ledger==target:
return True
elif all_ledgers[start][ledger]>0 and ledger not in cache:
cache.append(ledger)
return find_cycle(ledger,target)
return False
if find_cycle('a'):
print("There is a cycle.")
else:
print("There is no cycle.")
根据@JacobDavis 的回答,您需要让它完成循环。现在它可以在找到第一个传输后过早退出。
ledgers = {
"a": {"a": 0, "b": 2, "c": 3, "d": 0, "e": 0},
"b": {"a": 0, "b": 0, "c": 1, "d": 1, "e": 0},
"c": {"a": 1, "b": 0, "c": 0, "d": 0, "e": 1},
"d": {"a": 1, "b": 0, "c": 0, "d": 0, "e": 0},
"e": {"a": 0, "b": 0, "c": 0, "d": 0, "e": 0}
}
def find_cycle(all_ledgers: dict, start: str, target="", cycle=None, v=None):
if cycle is None:
cycle = [start]
else:
cycle = cycle + [start]
if v is None:
v = []
if target == "":
target = start
for ledger in all_ledgers[start].keys():
if all_ledgers[start][ledger] > 0 and ledger == target:
v = v + [all_ledgers[start][ledger]]
return True, cycle, v
elif all_ledgers[start][ledger] > 0 and ledger not in cycle:
flag, cycle2, v = find_cycle(all_ledgers, ledger, target, cycle, v)
if flag: # this check will make the loop continue over all sub-ledgers
v = v + [all_ledgers[start][ledger]]
return flag, cycle2, v
return False, [], []
这应该找到第一个循环和return涉及的分类帐以及循环中的转移值(以相反的顺序)。
使用此信息打破循环的方法:
def break_cycle(all_ledgers: dict):
for sub_ledger in all_ledgers.keys():
chk, cyc, u = find_cycle(ledgers, sub_ledger)
u = list(reversed(u))
if chk:
print(f"There is a cycle starting in ledger {sub_ledger}: {cyc}")
print(f'cycle values found: {u}')
min_transfer_index = u.index(min(u))
ledger1 = cyc[min_transfer_index]
if min_transfer_index == len(cyc)-1:
ledger2 = cyc[0]
else:
ledger2 = cyc[min_transfer_index + 1]
print(f'setting the transfer value from {ledger1} to {ledger2} to 0')
ledgers[ledger1][ledger2] = 0
return True
else:
print(f"There is no cycle starting in ledger {sub_ledger}")
return False
最后打破所有循环:
while break_cycle(ledgers):
continue
请注意,这不是很有效,因为每次循环中断时它都会从头扫描整个数据集,但它应该可以完成工作。