Python 中的 Deduplication/merging 个可变数据

Deduplication/merging of mutable data in Python

问题的高级视图

我有 X 个来源,其中包含有关我们环境中资产(hostname、IP、MAC、os 等)的信息。这些来源包含 1500 到 150k 的条目(至少我现在使用的条目)。我的脚本支持os查询它们中的每一个,收集数据,通过合并来自不同来源的相同资产的信息来删除重复数据,以及 return 所有条目的统一列表。我当前的实现确实有效,但对于更大的数据集来说速度很慢。我很好奇是否有更好的方法来完成我想做的事情。

Universal problem
Deduplication of data by merging similar entries with the caveat that merging two assets might change whether the resulting asset will be similar to the third asset that was similar to the first two before merging.
Example:
~ similarity, + merging
(before) A ~ B ~ C
(after) (A+B) ~ C or (A+B) !~ C

我试图寻找有同样问题的人,我只找到了 What is an elegant way to remove duplicate mutable objects in a list in Python?,但它不包括对我来说至关重要的数据合并。

类用过

为了便于阅读和理解而进行了简化,删除了不需要的部分 - 一般功能完好无损。

class Entry:

    def __init__(self, source: List[str], mac: List[str] = [], ip: List[str] = [], hostname: List[str] = [], os: OS = OS.UNKNOWN, details: dict = {}):
        # SO: Sorting and sanitization removed for simplicity
        self.source = source
        self.mac = mac
        self.ip = ip
        self.hostname = hostname
        self.os = os
        self.details = details

    def __eq__(self, other):
        if isinstance(other, Entry):
            return (self.source == other.source and
                    self.os == other.os and
                    self.hostname == other.hostname and
                    self.mac == other.mac and
                    self.ip == other.ip)
        return NotImplemented

    def is_similar(self, other) -> bool:
        def same_entry(l1: list, l2: list) -> bool:
            return not set(l1).isdisjoint(l2)

        if isinstance(other, Entry):
            if self.os == OS.UNKNOWN or other.os == OS.UNKNOWN or self.os == other.os:
                empty_hostnames = self.hostname == [] or other.hostname == []
                empty_macs = self.mac == [] or other.mac == []

                return (same_entry(self.hostname, other.hostname) or
                        (empty_hostnames and same_entry(self.mac, other.mac)) or
                        (empty_hostnames and empty_macs and same_entry(self.ip, other.ip)))

        return False

    def merge(self, other: 'Entry'):
        self.source = _merge_lists(self.source, other.source)
        self.hostname = _merge_lists(self.hostname, other.hostname)
        self.mac = _merge_lists(self.mac, other.mac)
        self.ip = _merge_lists(self.ip, other.ip)
        self.os = self.os if self.os != OS.UNKNOWN else other.os
        self.details = _merge_dicts(self.details, other.details)

    def representation(self) -> str:
        # Might be useful if anyone wishes to run the code
        return f'<Entry from {self.source}: hostname={self.hostname}, MAC={self.mac}, IP={self.ip}, OS={self.os.value}, details={self.details}>'

def _merge_lists(l1: list, l2: list):
    return list(set(l1) | set(l2))


def _merge_dicts(d1: dict, d2: dict):
    """
    Merge two dicts without overwriting any data.
    """
    # If either is empty, return the other one
    if not d1:
        return d2
    if not d2:
        return d1
    if d1 == d2:
        return d1

    result = d1
    for k, v in d2.items():
        if k in result:
            result[k + '_'] = v
        else:
            result[k] = v

    return result
class OS(Enum):
    '''
    Enum specifying the operating system of the asset.
    '''
    UNKNOWN = 'Unknown'
    WINDOWS = 'Windows'
    LINUX = 'Linux'
    MACOS = 'MacOS'

算法

每个算法都获取来自不同来源的条目列表的列表,eq: entries = [[entries from source A], [entries from source B], ..., [entries from source Z]]

主要去重功能

这是每个算法中使用的主要功能。它获取来自 2 个不同来源的条目列表,并将其合并到包含资产的列表中,并在需要时合并信息。


这可能是我需要帮助的部分ost。这是我能想到的唯一方法。正因为如此,我专注于如何 运行 这个函数快好几倍,但就减少 运行 时间而言,让这个函数更快是最好的。


def deduplicate(en1: List[Entry], en2: List[Entry]) -> List[Entry]:
    """
    Deduplicates entries from provided lists by merging similar entries.
    Entries in the lists are supposed to be already deduplicated.
    """
    # If either is empty, return the other one
    if not en1:
        return en2
    if not en2:
        return en1

    result = []

    # Iterate over longer and check for similar in shorter
    if len(en2) > len(en1):
        en1, en2 = en2, en1

    for e in en1:
        # walrus operator in Python 3.8 or newer
        while (similar := next((y for y in en2 if y.is_similar(e)), None)) is not None:
            e.merge(similar)
            en2.remove(similar)
            del similar
        result.append(e)
    result.extend(en2)

    return result

正常重复数据删除(例如使用集合)在这里不适用的原因是因为将一个条目与另一个新条目合并可能会变得相似,例如:

In [2]: e1 = Entry(['SRC_A'], [], ['1.1.1.1'], [], OS.UNKNOWN)
In [3]: e2 = Entry(['SRC_A'], ['aa:bb:cc:dd:ee:ff'], ['1.1.1.1'], [], OS.UNKNOWN)
In [4]: e3 = Entry(['SRC_A'], ['aa:bb:cc:dd:ee:ff'], [], [], OS.UNKNOWN)
In [5]: e1.is_similar(e2)
Out[5]: True
In [6]: e1.is_similar(e3) # at first it's not similar
Out[6]: False
In [7]: e1.merge(e2)
In [8]: e1.is_similar(e3) # but after merging it is
Out[8]: True

第一种方法 - 顺序

我的第一个想法是最简单的,只是简单的递归。

def dedup_multiple(lists: List[List[Entry]]) -> List[Entry]:
    """Deduplication helper allowing for providing more than 2 sources."""
    if len(lists) == 1:
        return lists[0]

    return deduplicate(lists[0], dedup_multiple(lists[1:]))

第二种方法 - 使用池的多线程

这就是我目前使用的方法。到目前为止,它是最快的并且相当简单。

def async_dedup(lists: List[List[Entry]]) -> List[Entry]:
    """Asynchronous deduplication helper allowing for providing more than 2 sources."""
    with mp.Pool() as pool:
        while len(lists) > 1:
            if len(lists) % 2 == 1:
                lists.append([])
            data = [(lists[i], lists[i+1]) for i in range(0, len(lists), 2)]
            lists = pool.map_async(_internal_deduplication, data).get()
        return lists[0]

def _internal_deduplication(en):
    return deduplicate(*en)

但我很快意识到,如果一项任务比其他任务花费的时间长得多(例如,因为对最大源进行重复数据删除),其他所有任务都会等待而不是工作。

第三种方法 - 使用队列和进程的多线程

当我试图加快第二种方法时,我遇到了 across and Filling a queue and managing multiprocessing in python,我想出了以下解决方案。

def async_dedup2(lists: List[List[Entry]]) -> List[Entry]:
    tasks_number = min(os.cpu_count(), len(lists) // 2)
    args = lists[:tasks_number]

    with mp.Manager() as manager:
        queue = manager.Queue()

        for l in lists[tasks_number:]:
            queue.put(l)

        processes = []
        for arg in args:
            proc = mp.Process(target=test, args=(queue, arg, ))
            proc.start()
            processes.append(proc)

        for proc in processes:
            proc.join()

        return queue.get()


def test(queue: mp.Queue, arg: List[Entry]):
    while not queue.empty():
        try:
            arg2: List[Entry] = queue.get()
        except Empty:
            continue
        arg = deduplicate(arg, arg2)

    queue.put(arg)

我认为这将是最好的解决方案,因为如果 possible 就不会出现不处理数据的情况,但在测试后总是 almost比第二种方法稍慢。

运行时比较

Source A    1510
Source B    1509
Source C    5000
Source D    4460
Source E    5000
Source F    2084

Deduplicating.....
SYNC   - Execution time: 188.6127771000 - Count: 13540
ASYNC  - Execution time: 68.249583 - Count: 13532
ASYNC2 - Execution time: 69.416046 - Count: 13532
Source A    1510
Source B    1509
Source C    11821
Source D    13871
Source E    5001
Source F    2333

Deduplicating.....
ASYNC  - Execution time: 424.405793 - Count: 26229
ASYNC2 - Execution time: 522.697551 - Count: 26405

总结:我们从条目到集合定义了两个草图函数f和g “草图”使得两个条目 e 和 e' 相似当且仅当 f(e) ∩ g(e') ≠ ∅。然后我们可以有效地识别合并(参见 算法在最后)。

其实我要定义四个sketch函数,fos, faddr、gos和gaddr,我们从中 构造

  • f(e) = {(x, y) | x ∈ fos(e), y ∈ faddr(e)}
  • g(e) = {(x, y) | x ∈ gos(e), y ∈ gaddr(e)}.

fos 和 gos 是四个中较简单的一个。 fos(e) 包括

  • (1, e.os),如果e.os已知
  • (2,), 如果 e.os 已知
  • (3,), 如果 e.os 未知.

gos(e) 包括

  • (1, e.os),如果e.os已知
  • (2,), 如果 e.os 未知
  • (3,).

faddr 和 gaddr 比较复杂,因为有 是优先属性,它们可以有多个值。 尽管如此,同样的技巧也可以奏效。 f地址(e) 包括

  • (1, h) 对于 e.hostname
  • 中的每个 h
  • (2, m) 对于 e.mac 中的每个 m,如果 e.hostname 是非空的
  • (3, m) 对于 e.mac 中的每个 m,如果 e.hostname 为空
  • (4, i) 对于 e.ip 中的每个 i,如果 e.hostname 和 e.mac 是 非空
  • (5, i) 对于 e.ip 中的每个 i,如果 e.hostname 为空并且 e.mac 是非空的
  • (6, i) 对于 e.ip 中的每个 i,如果 e.hostname 是非空的并且 e.mac为空
  • (7, i) 对于 e.ip 中的每个 i,如果 e.hostname 和 e.mac 是 空。

g地址(e) 包括

  • (1, h) 对于 e.hostname
  • 中的每个 h
  • (2, m) 对于 e.mac 中的每个 m,如果 e.hostname 为空
  • (3, m) 对于 e.mac
  • 中的每个 m
  • (4, i) 对于 e.ip 中的每个 i,如果 e.hostname 为空且 e.mac为空
  • (5, i) 对于 e.ip 中的每个 i,如果 e.mac 为空
  • (6, i) 对于 e.ip 中的每个 i,如果 e.hostname 为空
  • (7, i) 对于 e.ip.
  • 中的每个 i

算法的其余部分如下。

  • 初始化一个defaultdict(list)映射草图到条目列表 标识符。

  • 对于每个条目,对于条目的每个 f-sketches,添加条目的 defaultdict.

    中相应列表的标识符
  • 初始化 set 个边。

  • 对于每个条目,对于条目的每个 g 草图,查找 defaultdict 中的 g-sketch 并从条目的 列表中每个其他标识符的标识符。

现在我们有了一组边,我们 运行 进入@btilly 的问题 著名的。作为一名计算机科学家,我的第一直觉是找到连接 组件,但当然,合并两个条目可能会导致一些事件 边缘消失。相反,您可以使用边缘作为候选人 合并,并重复上述算法,直到 returns 没有边缘。

import collections
import itertools

Entry = collections.namedtuple("Entry", ("os", "hostname", "mac", "ip"))

UNKNOWN = "UNKNOWN"
WINDOWS = "WINDOWS"
LINUX = "LINUX"


def f_os(e):
    if e.os != UNKNOWN:
        yield (1, e.os)
    if e.os != UNKNOWN:
        yield (2,)
    if e.os == UNKNOWN:
        yield (3,)


def g_os(e):
    if e.os != UNKNOWN:
        yield (1, e.os)
    if e.os == UNKNOWN:
        yield (2,)
    yield (3,)


def f_addr(e):
    for h in e.hostname:
        yield (1, h)
    if e.hostname:
        for m in e.mac:
            yield (2, m)
    if not e.hostname:
        for m in e.mac:
            yield (3, m)
    if e.hostname and e.mac:
        for i in e.ip:
            yield (4, i)
    if not e.hostname and e.mac:
        for i in e.ip:
            yield (5, i)
    if e.hostname and not e.mac:
        for i in e.ip:
            yield (6, i)
    if not e.hostname and not e.mac:
        for i in e.ip:
            yield (7, i)


def g_addr(e):
    for h in e.hostname:
        yield (1, h)
    if not e.hostname:
        for m in e.mac:
            yield (2, m)
    for m in e.mac:
        yield (3, m)
    if not e.hostname and not e.mac:
        for i in e.ip:
            yield (4, i)
    if not e.mac:
        for i in e.ip:
            yield (5, i)
    if not e.hostname:
        for i in e.ip:
            yield (6, i)
    for i in e.ip:
        yield (7, i)


def f(e):
    return set(itertools.product(f_os(e), f_addr(e)))


def g(e):
    return set(itertools.product(g_os(e), g_addr(e)))


def is_similar(e, e_prime):
    return not f(e).isdisjoint(g(e_prime))


# Begin testing code for is_similar


def original_is_similar(e, e_prime):
    if e.os != UNKNOWN and e_prime.os != UNKNOWN and e.os != e_prime.os:
        return False
    if e.hostname and e_prime.hostname:
        return not set(e.hostname).isdisjoint(set(e_prime.hostname))
    if e.mac and e_prime.mac:
        return not set(e.mac).isdisjoint(set(e_prime.mac))
    return not set(e.ip).isdisjoint(set(e_prime.ip))


import random


def random_os():
    return random.choice([UNKNOWN, WINDOWS, LINUX])


def random_names(prefix):
    return [
        "{}{}".format(prefix, random.randrange(10)) for n in range(random.randrange(3))
    ]


def random_entry():
    return Entry(random_os(), random_names("H"), random_names("M"), random_names("I"))


def test_is_similar():
    print("Testing is_similar()")
    for rep in range(100000):
        e = random_entry()
        e_prime = random_entry()
        got = is_similar(e, e_prime)
        expected = original_is_similar(e, e_prime)
        if got != expected:
            print(e)
            print(e_prime)
            print("got", got)
            print("expected", expected)
            break


if __name__ == "__main__":
    test_is_similar()


# End testing code


def find_edges(entries):
    entries = list(entries)
    posting_lists = collections.defaultdict(list)
    for i, e in enumerate(entries):
        for sketch in f(e):
            posting_lists[sketch].append(i)
    edges = set()
    for i, e in enumerate(entries):
        for sketch in g(e):
            for j in posting_lists[sketch]:
                if i < j:
                    edges.add((i, j))
    return edges


# Begin testing code for find_edges


def test_find_edges():
    print("Testing find_edges()")
    entries = [random_entry() for i in range(1000)]
    got = find_edges(entries)
    expected = {
        (i, j)
        for (i, e) in enumerate(entries)
        for (j, e_prime) in enumerate(entries)
        if i < j and is_similar(e, e_prime)
    }
    print(len(expected))
    assert got == expected


if __name__ == "__main__":
    test_find_edges()
    find_edges([random_entry() for i in range(10000)])

# End testing code for find_edges