找出一个集合与一组集合中的所有其他集合相比有多相似

Find out how similar a set is compared to all other sets in a collection of sets

我正在尝试通过计算匹配元素的数量来计算集合与集合中所有其他集合相比的相似程度。一旦我有了计数,我想对每个具有前 X(当前 100)个相似集(计数最高的集)的每个集执行进一步的操作。我提供了一个示例输入和一个输出,显示了两组匹配元素的数量:

输入

{
  "list1": [
    "label1",
    "label2",
    "label3"
  ],
  "list2": [
    "label2",
    "label3",
    "label4"
  ],
  "list3": [
    "label3",
    "label4",
    "label5"
  ],
  "list4": [
    "label4",
    "label5",
    "label6"
  ]
}

输出

{
  "list1": {
    "list1": 3,
    "list2": 2,
    "list3": 1,
    "list4": 0
  },
  "list2": {
    "list1": 2,
    "list2": 3,
    "list3": 2,
    "list4": 1
  },
  "list3": {
    "list1": 1,
    "list2": 2,
    "list3": 3,
    "list4": 2
  },
  "list4": {
    "list1": 0,
    "list2": 1,
    "list3": 2,
    "list4": 3
  }
}

我想出了下面的代码,但是输入大约200,000组需要几个小时。一组中 elements/labels 的数量各不相同,但平均每组约有 10 个元素。唯一标签值的总数约为 300。

    input = {}
    input['list1'] = ['label1', 'label2', 'label3']
    input['list2'] = ['label2', 'label3', 'label4']
    input['list3'] = ['label3', 'label4', 'label5']
    input['list4'] = ['label4', 'label5', 'label6']
    print(json.dumps(input, indent=2))
    input = {key: set(value) for key, value in input.items()}
    output = {key1: {key2: 0 for key2 in input.keys()} for key1 in input.keys()}
    for key1, value1 in input.items():
        for key2, value2 in input.items():
            for element in value1:
                if element in value2:
                    count = output[key1][key2]
                    output[key1][key2] = count + 1

    print(json.dumps(output, indent=2))

有没有人知道在集合数量很大的情况下如何改进上述代码的执行时间?

感谢您的任何建议!

使用inverted index来避免与那些交集的基数为0的集合计算交集:

from collections import defaultdict, Counter
from itertools import chain
from pprint import pprint

data = {
    "list1": ["label1", "label2", "label3"],
    "list2": ["label2", "label3", "label4"],
    "list3": ["label3", "label4", "label5"],
    "list4": ["label4", "label5", "label6"]
}

index = defaultdict(list)
for key, values in data.items():
    for value in values:
        index[value].append(key)

result = {key: Counter(chain.from_iterable(index[label] for label in labels)) for key, labels in data.items()}
pprint(result)

输出

{'list1': Counter({'list1': 3, 'list2': 2, 'list3': 1}),
 'list2': Counter({'list2': 3, 'list1': 2, 'list3': 2, 'list4': 1}),
 'list3': Counter({'list3': 3, 'list2': 2, 'list4': 2, 'list1': 1}),
 'list4': Counter({'list4': 3, 'list3': 2, 'list2': 1})}

如果确实需要,您可以包括具有 0 交集基数的集合,如下所示:

result = {key: {k: value.get(k, 0) for k in data} for key, value in result.items()}
pprint(result)

输出

{'list1': {'list1': 3, 'list2': 2, 'list3': 1, 'list4': 0},
 'list2': {'list1': 2, 'list2': 3, 'list3': 2, 'list4': 1},
 'list3': {'list1': 1, 'list2': 2, 'list3': 3, 'list4': 2},
 'list4': {'list1': 0, 'list2': 1, 'list3': 2, 'list4': 3}}

第二种选择来自于大多数时间都用于寻找集合的交集的观察,因此更快的数据结构如 roaring bitmap 是有用的:

from collections import defaultdict
from pprint import pprint
from pyroaring import BitMap

data = {
    "list1": ["label1", "label2", "label3"],
    "list2": ["label2", "label3", "label4"],
    "list3": ["label3", "label4", "label5"],
    "list4": ["label4", "label5", "label6"]
}

# all labels
labels = set().union(*data.values())

# lookup mapping to an integer
lookup = {key: value for value, key in enumerate(labels)}

roaring_data = {key: BitMap(lookup[v] for v in value) for key, value in data.items()}


result = defaultdict(dict)
for k_out, outer in roaring_data.items():
    for k_in, inner in roaring_data.items():
        result[k_out][k_in] = len(outer & inner)

pprint(result)

输出

defaultdict(<class 'dict'>,
            {'list1': {'list1': 3, 'list2': 2, 'list3': 1, 'list4': 0},
             'list2': {'list1': 2, 'list2': 3, 'list3': 2, 'list4': 1},
             'list3': {'list1': 1, 'list2': 2, 'list3': 3, 'list4': 2},
             'list4': {'list1': 0, 'list2': 1, 'list3': 2, 'list4': 3}})

性能分析

上图显示了字典 data 的性能,长度由 x 轴的值给出,字典的每个值都是从总体中随机抽取的 10 个标签的列表100。与直觉相反,咆哮位图比您的解决方案执行得更差,而使用倒排索引花费的时间不到一半(大约 40%)。可以找到重现上述结果的代码here

假设大多数列表对没有交集,下面的代码应该更快。如果它不够快,并且近似结果还可以,那么你可以尝试最小散列(将 k 设置为较低的值以获得更快的速度,设置较高的值以提高召回率)。

input = {
    "list1": ["label1", "label2", "label3"],
    "list2": ["label2", "label3", "label4"],
    "list3": ["label3", "label4", "label5"],
    "list4": ["label4", "label5", "label6"],
}


import collections
import hashlib


def optional_min_hash(values, k=None):
    return (
        values
        if k is None
        else sorted(
            hashlib.sha256(str(value).encode("utf8")).digest() for value in values
        )[:k]
    )


buckets = collections.defaultdict(list)
for key, values in input.items():
    for value in optional_min_hash(values):
        buckets[value].append(key)
output = collections.defaultdict(dict)
for key1, key2 in {
    (key1, key2)
    for bucket in buckets.values()
    for key1 in bucket
    for key2 in bucket
    if key1 <= key2
}:
    count = len(set(input[key1]) & set(input[key2]))
    output[key1][key2] = count
    output[key2][key1] = count
print(output)

示例输出:

defaultdict(<class 'dict'>, {'list2': {'list4': 1, 'list1': 2, 'list2': 3, 'list3': 2}, 'list4': {'list2': 1, 'list4': 3, 'list3': 2}, 'list1': {'list2': 2, 'list3': 1, 'list1': 3}, 'list3': {'list1': 1, 'list2': 2, 'list3': 3, 'list4': 2}})