Python 中的 Deduplication/merging 个可变数据
Deduplication/merging of mutable data in Python
问题的高级视图
我有 X 个来源,其中包含有关我们环境中资产(hostname、IP、MAC、os 等)的信息。这些来源包含 1500 到 150k 的条目(至少我现在使用的条目)。我的脚本支持os查询它们中的每一个,收集数据,通过合并来自不同来源的相同资产的信息来删除重复数据,以及 return 所有条目的统一列表。我当前的实现确实有效,但对于更大的数据集来说速度很慢。我很好奇是否有更好的方法来完成我想做的事情。
Universal problem
Deduplication of data by merging similar entries with the caveat that merging two assets might change whether the resulting asset will be similar to the third asset that was similar to the first two before merging.
Example:
~ similarity, + merging
(before) A ~ B ~ C
(after) (A+B) ~ C or (A+B) !~ C
我试图寻找有同样问题的人,我只找到了 What is an elegant way to remove duplicate mutable objects in a list in Python?,但它不包括对我来说至关重要的数据合并。
类用过
为了便于阅读和理解而进行了简化,删除了不需要的部分 - 一般功能完好无损。
class Entry:
def __init__(self, source: List[str], mac: List[str] = [], ip: List[str] = [], hostname: List[str] = [], os: OS = OS.UNKNOWN, details: dict = {}):
# SO: Sorting and sanitization removed for simplicity
self.source = source
self.mac = mac
self.ip = ip
self.hostname = hostname
self.os = os
self.details = details
def __eq__(self, other):
if isinstance(other, Entry):
return (self.source == other.source and
self.os == other.os and
self.hostname == other.hostname and
self.mac == other.mac and
self.ip == other.ip)
return NotImplemented
def is_similar(self, other) -> bool:
def same_entry(l1: list, l2: list) -> bool:
return not set(l1).isdisjoint(l2)
if isinstance(other, Entry):
if self.os == OS.UNKNOWN or other.os == OS.UNKNOWN or self.os == other.os:
empty_hostnames = self.hostname == [] or other.hostname == []
empty_macs = self.mac == [] or other.mac == []
return (same_entry(self.hostname, other.hostname) or
(empty_hostnames and same_entry(self.mac, other.mac)) or
(empty_hostnames and empty_macs and same_entry(self.ip, other.ip)))
return False
def merge(self, other: 'Entry'):
self.source = _merge_lists(self.source, other.source)
self.hostname = _merge_lists(self.hostname, other.hostname)
self.mac = _merge_lists(self.mac, other.mac)
self.ip = _merge_lists(self.ip, other.ip)
self.os = self.os if self.os != OS.UNKNOWN else other.os
self.details = _merge_dicts(self.details, other.details)
def representation(self) -> str:
# Might be useful if anyone wishes to run the code
return f'<Entry from {self.source}: hostname={self.hostname}, MAC={self.mac}, IP={self.ip}, OS={self.os.value}, details={self.details}>'
def _merge_lists(l1: list, l2: list):
return list(set(l1) | set(l2))
def _merge_dicts(d1: dict, d2: dict):
"""
Merge two dicts without overwriting any data.
"""
# If either is empty, return the other one
if not d1:
return d2
if not d2:
return d1
if d1 == d2:
return d1
result = d1
for k, v in d2.items():
if k in result:
result[k + '_'] = v
else:
result[k] = v
return result
class OS(Enum):
'''
Enum specifying the operating system of the asset.
'''
UNKNOWN = 'Unknown'
WINDOWS = 'Windows'
LINUX = 'Linux'
MACOS = 'MacOS'
算法
每个算法都获取来自不同来源的条目列表的列表,eq:
entries = [[entries from source A], [entries from source B], ..., [entries from source Z]]
主要去重功能
这是每个算法中使用的主要功能。它获取来自 2 个不同来源的条目列表,并将其合并到包含资产的列表中,并在需要时合并信息。
这可能是我需要帮助的部分ost。这是我能想到的唯一方法。正因为如此,我专注于如何 运行 这个函数快好几倍,但就减少 运行 时间而言,让这个函数更快是最好的。
def deduplicate(en1: List[Entry], en2: List[Entry]) -> List[Entry]:
"""
Deduplicates entries from provided lists by merging similar entries.
Entries in the lists are supposed to be already deduplicated.
"""
# If either is empty, return the other one
if not en1:
return en2
if not en2:
return en1
result = []
# Iterate over longer and check for similar in shorter
if len(en2) > len(en1):
en1, en2 = en2, en1
for e in en1:
# walrus operator in Python 3.8 or newer
while (similar := next((y for y in en2 if y.is_similar(e)), None)) is not None:
e.merge(similar)
en2.remove(similar)
del similar
result.append(e)
result.extend(en2)
return result
正常重复数据删除(例如使用集合)在这里不适用的原因是因为将一个条目与另一个新条目合并可能会变得相似,例如:
In [2]: e1 = Entry(['SRC_A'], [], ['1.1.1.1'], [], OS.UNKNOWN)
In [3]: e2 = Entry(['SRC_A'], ['aa:bb:cc:dd:ee:ff'], ['1.1.1.1'], [], OS.UNKNOWN)
In [4]: e3 = Entry(['SRC_A'], ['aa:bb:cc:dd:ee:ff'], [], [], OS.UNKNOWN)
In [5]: e1.is_similar(e2)
Out[5]: True
In [6]: e1.is_similar(e3) # at first it's not similar
Out[6]: False
In [7]: e1.merge(e2)
In [8]: e1.is_similar(e3) # but after merging it is
Out[8]: True
第一种方法 - 顺序
我的第一个想法是最简单的,只是简单的递归。
def dedup_multiple(lists: List[List[Entry]]) -> List[Entry]:
"""Deduplication helper allowing for providing more than 2 sources."""
if len(lists) == 1:
return lists[0]
return deduplicate(lists[0], dedup_multiple(lists[1:]))
第二种方法 - 使用池的多线程
这就是我目前使用的方法。到目前为止,它是最快的并且相当简单。
def async_dedup(lists: List[List[Entry]]) -> List[Entry]:
"""Asynchronous deduplication helper allowing for providing more than 2 sources."""
with mp.Pool() as pool:
while len(lists) > 1:
if len(lists) % 2 == 1:
lists.append([])
data = [(lists[i], lists[i+1]) for i in range(0, len(lists), 2)]
lists = pool.map_async(_internal_deduplication, data).get()
return lists[0]
def _internal_deduplication(en):
return deduplicate(*en)
但我很快意识到,如果一项任务比其他任务花费的时间长得多(例如,因为对最大源进行重复数据删除),其他所有任务都会等待而不是工作。
第三种方法 - 使用队列和进程的多线程
当我试图加快第二种方法时,我遇到了 across and Filling a queue and managing multiprocessing in python,我想出了以下解决方案。
def async_dedup2(lists: List[List[Entry]]) -> List[Entry]:
tasks_number = min(os.cpu_count(), len(lists) // 2)
args = lists[:tasks_number]
with mp.Manager() as manager:
queue = manager.Queue()
for l in lists[tasks_number:]:
queue.put(l)
processes = []
for arg in args:
proc = mp.Process(target=test, args=(queue, arg, ))
proc.start()
processes.append(proc)
for proc in processes:
proc.join()
return queue.get()
def test(queue: mp.Queue, arg: List[Entry]):
while not queue.empty():
try:
arg2: List[Entry] = queue.get()
except Empty:
continue
arg = deduplicate(arg, arg2)
queue.put(arg)
我认为这将是最好的解决方案,因为如果 possible 就不会出现不处理数据的情况,但在测试后总是 almost比第二种方法稍慢。
运行时比较
Source A 1510
Source B 1509
Source C 5000
Source D 4460
Source E 5000
Source F 2084
Deduplicating.....
SYNC - Execution time: 188.6127771000 - Count: 13540
ASYNC - Execution time: 68.249583 - Count: 13532
ASYNC2 - Execution time: 69.416046 - Count: 13532
Source A 1510
Source B 1509
Source C 11821
Source D 13871
Source E 5001
Source F 2333
Deduplicating.....
ASYNC - Execution time: 424.405793 - Count: 26229
ASYNC2 - Execution time: 522.697551 - Count: 26405
总结:我们从条目到集合定义了两个草图函数f和g
“草图”使得两个条目 e 和 e' 相似当且仅当
f(e) ∩ g(e') ≠ ∅。然后我们可以有效地识别合并(参见
算法在最后)。
其实我要定义四个sketch函数,fos,
faddr、gos和gaddr,我们从中
构造
- f(e) = {(x, y) | x ∈ fos(e), y ∈ faddr(e)}
- g(e) = {(x, y) | x ∈ gos(e), y ∈ gaddr(e)}.
fos 和 gos 是四个中较简单的一个。
fos(e) 包括
- (1, e.
os
),如果e.os
已知
- (2,), 如果 e.
os
已知
- (3,), 如果 e.
os
未知.
gos(e) 包括
- (1, e.
os
),如果e.os
已知
- (2,), 如果 e.
os
未知
- (3,).
faddr 和 gaddr 比较复杂,因为有
是优先属性,它们可以有多个值。
尽管如此,同样的技巧也可以奏效。 f地址(e)
包括
- (1,
h
) 对于 e.hostname
中的每个 h
- (2,
m
) 对于 e.mac
中的每个 m
,如果 e.hostname
是非空的
- (3,
m
) 对于 e.mac
中的每个 m
,如果 e.hostname
为空
- (4,
i
) 对于 e.ip
中的每个 i
,如果 e.hostname
和 e.mac
是
非空
- (5,
i
) 对于 e.ip
中的每个 i
,如果 e.hostname
为空并且
e.mac
是非空的
- (6,
i
) 对于 e.ip
中的每个 i
,如果 e.hostname
是非空的并且
e.mac
为空
- (7,
i
) 对于 e.ip
中的每个 i
,如果 e.hostname
和 e.mac
是
空。
g地址(e) 包括
- (1,
h
) 对于 e.hostname
中的每个 h
- (2,
m
) 对于 e.mac
中的每个 m
,如果 e.hostname
为空
- (3,
m
) 对于 e.mac
中的每个 m
- (4,
i
) 对于 e.ip
中的每个 i
,如果 e.hostname
为空且
e.mac
为空
- (5,
i
) 对于 e.ip
中的每个 i
,如果 e.mac
为空
- (6,
i
) 对于 e.ip
中的每个 i
,如果 e.hostname
为空
- (7,
i
) 对于 e.ip
. 中的每个 i
算法的其余部分如下。
初始化一个defaultdict(list)
映射草图到条目列表
标识符。
对于每个条目,对于条目的每个 f-sketches,添加条目的
defaultdict
.
中相应列表的标识符
初始化 set
个边。
对于每个条目,对于条目的每个 g 草图,查找
defaultdict
中的 g-sketch 并从条目的
列表中每个其他标识符的标识符。
现在我们有了一组边,我们 运行 进入@btilly 的问题
著名的。作为一名计算机科学家,我的第一直觉是找到连接
组件,但当然,合并两个条目可能会导致一些事件
边缘消失。相反,您可以使用边缘作为候选人
合并,并重复上述算法,直到 returns 没有边缘。
import collections
import itertools
Entry = collections.namedtuple("Entry", ("os", "hostname", "mac", "ip"))
UNKNOWN = "UNKNOWN"
WINDOWS = "WINDOWS"
LINUX = "LINUX"
def f_os(e):
if e.os != UNKNOWN:
yield (1, e.os)
if e.os != UNKNOWN:
yield (2,)
if e.os == UNKNOWN:
yield (3,)
def g_os(e):
if e.os != UNKNOWN:
yield (1, e.os)
if e.os == UNKNOWN:
yield (2,)
yield (3,)
def f_addr(e):
for h in e.hostname:
yield (1, h)
if e.hostname:
for m in e.mac:
yield (2, m)
if not e.hostname:
for m in e.mac:
yield (3, m)
if e.hostname and e.mac:
for i in e.ip:
yield (4, i)
if not e.hostname and e.mac:
for i in e.ip:
yield (5, i)
if e.hostname and not e.mac:
for i in e.ip:
yield (6, i)
if not e.hostname and not e.mac:
for i in e.ip:
yield (7, i)
def g_addr(e):
for h in e.hostname:
yield (1, h)
if not e.hostname:
for m in e.mac:
yield (2, m)
for m in e.mac:
yield (3, m)
if not e.hostname and not e.mac:
for i in e.ip:
yield (4, i)
if not e.mac:
for i in e.ip:
yield (5, i)
if not e.hostname:
for i in e.ip:
yield (6, i)
for i in e.ip:
yield (7, i)
def f(e):
return set(itertools.product(f_os(e), f_addr(e)))
def g(e):
return set(itertools.product(g_os(e), g_addr(e)))
def is_similar(e, e_prime):
return not f(e).isdisjoint(g(e_prime))
# Begin testing code for is_similar
def original_is_similar(e, e_prime):
if e.os != UNKNOWN and e_prime.os != UNKNOWN and e.os != e_prime.os:
return False
if e.hostname and e_prime.hostname:
return not set(e.hostname).isdisjoint(set(e_prime.hostname))
if e.mac and e_prime.mac:
return not set(e.mac).isdisjoint(set(e_prime.mac))
return not set(e.ip).isdisjoint(set(e_prime.ip))
import random
def random_os():
return random.choice([UNKNOWN, WINDOWS, LINUX])
def random_names(prefix):
return [
"{}{}".format(prefix, random.randrange(10)) for n in range(random.randrange(3))
]
def random_entry():
return Entry(random_os(), random_names("H"), random_names("M"), random_names("I"))
def test_is_similar():
print("Testing is_similar()")
for rep in range(100000):
e = random_entry()
e_prime = random_entry()
got = is_similar(e, e_prime)
expected = original_is_similar(e, e_prime)
if got != expected:
print(e)
print(e_prime)
print("got", got)
print("expected", expected)
break
if __name__ == "__main__":
test_is_similar()
# End testing code
def find_edges(entries):
entries = list(entries)
posting_lists = collections.defaultdict(list)
for i, e in enumerate(entries):
for sketch in f(e):
posting_lists[sketch].append(i)
edges = set()
for i, e in enumerate(entries):
for sketch in g(e):
for j in posting_lists[sketch]:
if i < j:
edges.add((i, j))
return edges
# Begin testing code for find_edges
def test_find_edges():
print("Testing find_edges()")
entries = [random_entry() for i in range(1000)]
got = find_edges(entries)
expected = {
(i, j)
for (i, e) in enumerate(entries)
for (j, e_prime) in enumerate(entries)
if i < j and is_similar(e, e_prime)
}
print(len(expected))
assert got == expected
if __name__ == "__main__":
test_find_edges()
find_edges([random_entry() for i in range(10000)])
# End testing code for find_edges
问题的高级视图
我有 X 个来源,其中包含有关我们环境中资产(hostname、IP、MAC、os 等)的信息。这些来源包含 1500 到 150k 的条目(至少我现在使用的条目)。我的脚本支持os查询它们中的每一个,收集数据,通过合并来自不同来源的相同资产的信息来删除重复数据,以及 return 所有条目的统一列表。我当前的实现确实有效,但对于更大的数据集来说速度很慢。我很好奇是否有更好的方法来完成我想做的事情。
Universal problem
Deduplication of data by merging similar entries with the caveat that merging two assets might change whether the resulting asset will be similar to the third asset that was similar to the first two before merging.
Example:
~ similarity, + merging
(before) A ~ B ~ C
(after) (A+B) ~ C or (A+B) !~ C
我试图寻找有同样问题的人,我只找到了 What is an elegant way to remove duplicate mutable objects in a list in Python?,但它不包括对我来说至关重要的数据合并。
类用过
为了便于阅读和理解而进行了简化,删除了不需要的部分 - 一般功能完好无损。
class Entry:
def __init__(self, source: List[str], mac: List[str] = [], ip: List[str] = [], hostname: List[str] = [], os: OS = OS.UNKNOWN, details: dict = {}):
# SO: Sorting and sanitization removed for simplicity
self.source = source
self.mac = mac
self.ip = ip
self.hostname = hostname
self.os = os
self.details = details
def __eq__(self, other):
if isinstance(other, Entry):
return (self.source == other.source and
self.os == other.os and
self.hostname == other.hostname and
self.mac == other.mac and
self.ip == other.ip)
return NotImplemented
def is_similar(self, other) -> bool:
def same_entry(l1: list, l2: list) -> bool:
return not set(l1).isdisjoint(l2)
if isinstance(other, Entry):
if self.os == OS.UNKNOWN or other.os == OS.UNKNOWN or self.os == other.os:
empty_hostnames = self.hostname == [] or other.hostname == []
empty_macs = self.mac == [] or other.mac == []
return (same_entry(self.hostname, other.hostname) or
(empty_hostnames and same_entry(self.mac, other.mac)) or
(empty_hostnames and empty_macs and same_entry(self.ip, other.ip)))
return False
def merge(self, other: 'Entry'):
self.source = _merge_lists(self.source, other.source)
self.hostname = _merge_lists(self.hostname, other.hostname)
self.mac = _merge_lists(self.mac, other.mac)
self.ip = _merge_lists(self.ip, other.ip)
self.os = self.os if self.os != OS.UNKNOWN else other.os
self.details = _merge_dicts(self.details, other.details)
def representation(self) -> str:
# Might be useful if anyone wishes to run the code
return f'<Entry from {self.source}: hostname={self.hostname}, MAC={self.mac}, IP={self.ip}, OS={self.os.value}, details={self.details}>'
def _merge_lists(l1: list, l2: list):
return list(set(l1) | set(l2))
def _merge_dicts(d1: dict, d2: dict):
"""
Merge two dicts without overwriting any data.
"""
# If either is empty, return the other one
if not d1:
return d2
if not d2:
return d1
if d1 == d2:
return d1
result = d1
for k, v in d2.items():
if k in result:
result[k + '_'] = v
else:
result[k] = v
return result
class OS(Enum):
'''
Enum specifying the operating system of the asset.
'''
UNKNOWN = 'Unknown'
WINDOWS = 'Windows'
LINUX = 'Linux'
MACOS = 'MacOS'
算法
每个算法都获取来自不同来源的条目列表的列表,eq:
entries = [[entries from source A], [entries from source B], ..., [entries from source Z]]
主要去重功能
这是每个算法中使用的主要功能。它获取来自 2 个不同来源的条目列表,并将其合并到包含资产的列表中,并在需要时合并信息。
这可能是我需要帮助的部分ost。这是我能想到的唯一方法。正因为如此,我专注于如何 运行 这个函数快好几倍,但就减少 运行 时间而言,让这个函数更快是最好的。
def deduplicate(en1: List[Entry], en2: List[Entry]) -> List[Entry]:
"""
Deduplicates entries from provided lists by merging similar entries.
Entries in the lists are supposed to be already deduplicated.
"""
# If either is empty, return the other one
if not en1:
return en2
if not en2:
return en1
result = []
# Iterate over longer and check for similar in shorter
if len(en2) > len(en1):
en1, en2 = en2, en1
for e in en1:
# walrus operator in Python 3.8 or newer
while (similar := next((y for y in en2 if y.is_similar(e)), None)) is not None:
e.merge(similar)
en2.remove(similar)
del similar
result.append(e)
result.extend(en2)
return result
正常重复数据删除(例如使用集合)在这里不适用的原因是因为将一个条目与另一个新条目合并可能会变得相似,例如:
In [2]: e1 = Entry(['SRC_A'], [], ['1.1.1.1'], [], OS.UNKNOWN)
In [3]: e2 = Entry(['SRC_A'], ['aa:bb:cc:dd:ee:ff'], ['1.1.1.1'], [], OS.UNKNOWN)
In [4]: e3 = Entry(['SRC_A'], ['aa:bb:cc:dd:ee:ff'], [], [], OS.UNKNOWN)
In [5]: e1.is_similar(e2)
Out[5]: True
In [6]: e1.is_similar(e3) # at first it's not similar
Out[6]: False
In [7]: e1.merge(e2)
In [8]: e1.is_similar(e3) # but after merging it is
Out[8]: True
第一种方法 - 顺序
我的第一个想法是最简单的,只是简单的递归。
def dedup_multiple(lists: List[List[Entry]]) -> List[Entry]:
"""Deduplication helper allowing for providing more than 2 sources."""
if len(lists) == 1:
return lists[0]
return deduplicate(lists[0], dedup_multiple(lists[1:]))
第二种方法 - 使用池的多线程
这就是我目前使用的方法。到目前为止,它是最快的并且相当简单。
def async_dedup(lists: List[List[Entry]]) -> List[Entry]:
"""Asynchronous deduplication helper allowing for providing more than 2 sources."""
with mp.Pool() as pool:
while len(lists) > 1:
if len(lists) % 2 == 1:
lists.append([])
data = [(lists[i], lists[i+1]) for i in range(0, len(lists), 2)]
lists = pool.map_async(_internal_deduplication, data).get()
return lists[0]
def _internal_deduplication(en):
return deduplicate(*en)
但我很快意识到,如果一项任务比其他任务花费的时间长得多(例如,因为对最大源进行重复数据删除),其他所有任务都会等待而不是工作。
第三种方法 - 使用队列和进程的多线程
当我试图加快第二种方法时,我遇到了 across
def async_dedup2(lists: List[List[Entry]]) -> List[Entry]:
tasks_number = min(os.cpu_count(), len(lists) // 2)
args = lists[:tasks_number]
with mp.Manager() as manager:
queue = manager.Queue()
for l in lists[tasks_number:]:
queue.put(l)
processes = []
for arg in args:
proc = mp.Process(target=test, args=(queue, arg, ))
proc.start()
processes.append(proc)
for proc in processes:
proc.join()
return queue.get()
def test(queue: mp.Queue, arg: List[Entry]):
while not queue.empty():
try:
arg2: List[Entry] = queue.get()
except Empty:
continue
arg = deduplicate(arg, arg2)
queue.put(arg)
我认为这将是最好的解决方案,因为如果 possible 就不会出现不处理数据的情况,但在测试后总是 almost比第二种方法稍慢。
运行时比较
Source A 1510
Source B 1509
Source C 5000
Source D 4460
Source E 5000
Source F 2084
Deduplicating.....
SYNC - Execution time: 188.6127771000 - Count: 13540
ASYNC - Execution time: 68.249583 - Count: 13532
ASYNC2 - Execution time: 69.416046 - Count: 13532
Source A 1510
Source B 1509
Source C 11821
Source D 13871
Source E 5001
Source F 2333
Deduplicating.....
ASYNC - Execution time: 424.405793 - Count: 26229
ASYNC2 - Execution time: 522.697551 - Count: 26405
总结:我们从条目到集合定义了两个草图函数f和g “草图”使得两个条目 e 和 e' 相似当且仅当 f(e) ∩ g(e') ≠ ∅。然后我们可以有效地识别合并(参见 算法在最后)。
其实我要定义四个sketch函数,fos, faddr、gos和gaddr,我们从中 构造
- f(e) = {(x, y) | x ∈ fos(e), y ∈ faddr(e)}
- g(e) = {(x, y) | x ∈ gos(e), y ∈ gaddr(e)}.
fos 和 gos 是四个中较简单的一个。 fos(e) 包括
- (1, e.
os
),如果e.os
已知 - (2,), 如果 e.
os
已知 - (3,), 如果 e.
os
未知.
gos(e) 包括
- (1, e.
os
),如果e.os
已知 - (2,), 如果 e.
os
未知 - (3,).
faddr 和 gaddr 比较复杂,因为有 是优先属性,它们可以有多个值。 尽管如此,同样的技巧也可以奏效。 f地址(e) 包括
- (1,
h
) 对于 e.hostname
中的每个 - (2,
m
) 对于 e.mac
中的每个m
,如果 e.hostname
是非空的 - (3,
m
) 对于 e.mac
中的每个m
,如果 e.hostname
为空 - (4,
i
) 对于 e.ip
中的每个i
,如果 e.hostname
和 e.mac
是 非空 - (5,
i
) 对于 e.ip
中的每个i
,如果 e.hostname
为空并且 e.mac
是非空的 - (6,
i
) 对于 e.ip
中的每个i
,如果 e.hostname
是非空的并且 e.mac
为空 - (7,
i
) 对于 e.ip
中的每个i
,如果 e.hostname
和 e.mac
是 空。
h
g地址(e) 包括
- (1,
h
) 对于 e.hostname
中的每个 - (2,
m
) 对于 e.mac
中的每个m
,如果 e.hostname
为空 - (3,
m
) 对于 e.mac
中的每个 - (4,
i
) 对于 e.ip
中的每个i
,如果 e.hostname
为空且 e.mac
为空 - (5,
i
) 对于 e.ip
中的每个i
,如果 e.mac
为空 - (6,
i
) 对于 e.ip
中的每个i
,如果 e.hostname
为空 - (7,
i
) 对于 e.ip
. 中的每个
h
m
i
算法的其余部分如下。
初始化一个
defaultdict(list)
映射草图到条目列表 标识符。对于每个条目,对于条目的每个 f-sketches,添加条目的
中相应列表的标识符defaultdict
.初始化
set
个边。对于每个条目,对于条目的每个 g 草图,查找
defaultdict
中的 g-sketch 并从条目的 列表中每个其他标识符的标识符。
现在我们有了一组边,我们 运行 进入@btilly 的问题 著名的。作为一名计算机科学家,我的第一直觉是找到连接 组件,但当然,合并两个条目可能会导致一些事件 边缘消失。相反,您可以使用边缘作为候选人 合并,并重复上述算法,直到 returns 没有边缘。
import collections
import itertools
Entry = collections.namedtuple("Entry", ("os", "hostname", "mac", "ip"))
UNKNOWN = "UNKNOWN"
WINDOWS = "WINDOWS"
LINUX = "LINUX"
def f_os(e):
if e.os != UNKNOWN:
yield (1, e.os)
if e.os != UNKNOWN:
yield (2,)
if e.os == UNKNOWN:
yield (3,)
def g_os(e):
if e.os != UNKNOWN:
yield (1, e.os)
if e.os == UNKNOWN:
yield (2,)
yield (3,)
def f_addr(e):
for h in e.hostname:
yield (1, h)
if e.hostname:
for m in e.mac:
yield (2, m)
if not e.hostname:
for m in e.mac:
yield (3, m)
if e.hostname and e.mac:
for i in e.ip:
yield (4, i)
if not e.hostname and e.mac:
for i in e.ip:
yield (5, i)
if e.hostname and not e.mac:
for i in e.ip:
yield (6, i)
if not e.hostname and not e.mac:
for i in e.ip:
yield (7, i)
def g_addr(e):
for h in e.hostname:
yield (1, h)
if not e.hostname:
for m in e.mac:
yield (2, m)
for m in e.mac:
yield (3, m)
if not e.hostname and not e.mac:
for i in e.ip:
yield (4, i)
if not e.mac:
for i in e.ip:
yield (5, i)
if not e.hostname:
for i in e.ip:
yield (6, i)
for i in e.ip:
yield (7, i)
def f(e):
return set(itertools.product(f_os(e), f_addr(e)))
def g(e):
return set(itertools.product(g_os(e), g_addr(e)))
def is_similar(e, e_prime):
return not f(e).isdisjoint(g(e_prime))
# Begin testing code for is_similar
def original_is_similar(e, e_prime):
if e.os != UNKNOWN and e_prime.os != UNKNOWN and e.os != e_prime.os:
return False
if e.hostname and e_prime.hostname:
return not set(e.hostname).isdisjoint(set(e_prime.hostname))
if e.mac and e_prime.mac:
return not set(e.mac).isdisjoint(set(e_prime.mac))
return not set(e.ip).isdisjoint(set(e_prime.ip))
import random
def random_os():
return random.choice([UNKNOWN, WINDOWS, LINUX])
def random_names(prefix):
return [
"{}{}".format(prefix, random.randrange(10)) for n in range(random.randrange(3))
]
def random_entry():
return Entry(random_os(), random_names("H"), random_names("M"), random_names("I"))
def test_is_similar():
print("Testing is_similar()")
for rep in range(100000):
e = random_entry()
e_prime = random_entry()
got = is_similar(e, e_prime)
expected = original_is_similar(e, e_prime)
if got != expected:
print(e)
print(e_prime)
print("got", got)
print("expected", expected)
break
if __name__ == "__main__":
test_is_similar()
# End testing code
def find_edges(entries):
entries = list(entries)
posting_lists = collections.defaultdict(list)
for i, e in enumerate(entries):
for sketch in f(e):
posting_lists[sketch].append(i)
edges = set()
for i, e in enumerate(entries):
for sketch in g(e):
for j in posting_lists[sketch]:
if i < j:
edges.add((i, j))
return edges
# Begin testing code for find_edges
def test_find_edges():
print("Testing find_edges()")
entries = [random_entry() for i in range(1000)]
got = find_edges(entries)
expected = {
(i, j)
for (i, e) in enumerate(entries)
for (j, e_prime) in enumerate(entries)
if i < j and is_similar(e, e_prime)
}
print(len(expected))
assert got == expected
if __name__ == "__main__":
test_find_edges()
find_edges([random_entry() for i in range(10000)])
# End testing code for find_edges