在大型语料库中快速计算单词 co_occurrence 的 python

Fast way to do word co_occurrence count in python on big corpus

我正在努力使 co_occurrence 计数在 python 中快速工作。

我有一个上下文列表 windows。这是我用转换解析器解析语料库后以某种方式通过依赖标签连接的单词列表。每个上下文 window 的平均长度为 3。因此上下文 window 不仅仅是焦点词前后的 10 个词。这就是为什么我需要一个列表列表。以及为什么我不能只使用单词列表。

我还有一本字典,语料库中每个不同的词都有一个不同的索引作为值。但我不认为使用它会加快很多速度。现在我只使用一个包含 50,000 个单词和大约 300,000 个上下文的小型测试语料库 windows 我的代码足够快,但我需要它更快,因为我需要它在更大的语料库上工作最后。

class CoOc:
    def __init__(self):
        self.co_occurrence = Counter()

    def add_count(self, token_index, context, token_to_index):
        try:
            # some words are not in the token dictionary if they dont appear frequent enough
            context_index = token_to_index[context]
        except KeyError:
            context_index = -1
        if context_index != -1:
            self.co_occurrence[(token_index, context_index)] += 1

    def count_co_occurrence(self, context_chains, token_index_dic):
        token_index_list = []
        for key in token_index_dic:
            token_index_list.append([key, token_index_dic[key]])
        time1 = current_milli_time()
        for token_key in token_index_list[0:100]:
            token = token_key[0]
            token_index = token_key[1]
            for context_chain in context_chains:
                if token in context_chain:
                    for context_word in context_chain:
                        if token != context_word:
                            self.add_count(token_index, context_word, token_index_dic)
        time2 = current_milli_time()
        print(time2-time1)

所以在存储在 token_index 字典中的所有标记的计数函数中,我制作了一个只包含其中一小部分的列表来检查它所花费的时间。

我试图遍历所有上下文的列表 windows 来创建一个上下文列表 windows 在我遍历标记之前只包含索引所以我什至不需要添加count 函数并将我调用 add_count 的行替换为 add_count 的最后一行,但这并没有加快进程。

这样只需要100个焦点词就可以运行3秒。我需要它更快地处理我想使用它的语料库。这在 python 中甚至可能吗?还是我需要用另一种语言实现它?

所以前 10 个上下文 windows 看起来像这样:

['Gerücht', 'drücken', 'Kurs']
['Konkursgerüchte', 'drücken', 'Kurs']
['Kurs', 'Aktie']
['Kurs', 'Amazon-Aktie']
['der', 'Amazon-Aktie']
['der', 'Aktie']
['Begleitet', 'Gerücht', 'Konkurs']
['Begleitet', 'Marktgerüchten', 'Konkurs']
['begleiten', 'Gerücht', 'Konkurs']
['begleiten', 'setzt fort', 'Aktie', 'Talfahrt']

每个上下文词都出现在 token_index_dic 中并带有一些索引,除非它在语料库中出现的频率不够高。

token_index_list 的前 10 个元素如下所示:

['Browser-gestützte', 0]
['Akzeptanzstellen', 1]
['Nachschubplanung', 2]
['persönlichen', 3]
['Unionsfraktion', 4]
['Wired', 21122]
['Hauptfigur', 6]
['Strafgesetz', 7]
['Computer-Hersteller', 8]
['Rückschläge', 9]

然后当我打印 self.co_occurrence 时它看起来像这样:

# (focus_word_index, context_word_index): count
Counter({(1, 9479): 11, (1, 20316): 7, (2, 1722): 7, (2, 20217): 7, (2, 19842): 7, (2, 2934): 7, (3, 11959): 7, (3, 2404): 7, (3, 1105): 7, (4, 12047): 7, (4, 19262): 7, (0, 13585): 4, (1, 18525): 4, (1, 1538): 4, (1, 9230): 4, (1, 20606): 4, (1, 1486): 4, (2, 13166): 4, (2, 6948): 4, (2, 23028): 4, (2, 14051): 4, (3, 3854): 4, (3, 7908): 4, (3, 4902): 4, (3, 13222): 4, (4, 23737): 4, (4, 6785): 4, (4, 18718): 4, (5, 15424): 4, (5, 4394): 4, (5, 21534): 4, (5, 5829): 4, (5, 6513): 4, (6, 23331): 4, (6, 7234): 4, (6, 20606): 4, (6, 22951): 4, (6, 7318): 4, (6, 15332): 4, (9, 21183): 4, (9, 23028): 4, (9, 1572): 4, (1, 25031): 3, (1, 5829): 3, (1, 14458): 3, (3, 14387): 3, (3, 9574): 3, (3, 21061): 3, (4, 18143): 3, (4, 3306): 3, (4, 17798): 3, (4, 2250): 3, (5, 9982): 3, (5, 5999): 3, (6, 15727): 3, (0, 16008): 2, (0, 1304): 2, (0, 5210): 2, (0, 17798): 2, (1, 20000): 2, (1, 19326): 2, (1, 3820): 2, (1, 25173): 2, (1, 21843): 2, (2, 20662): 2, (3, 7521): 2, (3, 14479): 2, (3, 8109): 2, (3, 18311): 2, (4, 2556): 2, (5, 23940): 2, (5, 1823): 2, (5, 18733): 2, (6, 3131): 2, (6, 947): 2, (6, 18540): 2, (6, 4756): 2, (6, 2743): 2, (6, 7692): 2, (6, 20263): 2, (6, 8670): 2, (6, 2674): 2, (6, 20050): 2, (6, 13274): 2, (6, 17876): 2, (6, 7538): 2, (6, 11098): 2, (6, 4296): 2, (6, 2417): 2, (6, 21421): 2, (6, 19256): 2, (6, 16739): 2, (7, 10908): 2, (7, 4439): 2, (7, 9492): 2, (8, 7027): 2, (8, 599): 2, (8, 4439): 2, (9, 16030): 2, (9, 6856): 2, (9, 24320): 2, (9, 15978): 2, (1, 6454): 1, (1, 14482): 1, (1, 2643): 1, (1, 7338): 1, (2, 21061): 1, (2, 1486): 1, (4, 4296): 1, (4, 23940): 1, (4, 5775): 1, (5, 24133): 1, (5, 2743): 1, (5, 11472): 1, (5, 19336): 1, (5, 20606): 1, (5, 2740): 1, (5, 9479): 1, (5, 14200): 1, (6, 22175): 1, (6, 13104): 1, (6, 10435): 1, (6, 1891): 1, (6, 22353): 1, (6, 4852): 1, (6, 20943): 1, (6, 23965): 1, (6, 13494): 1, (7, 1300): 1, (7, 12497): 1, (7, 2788): 1, (8, 13879): 1, (8, 2404): 1})

首先让我们定义一个装饰器,以便我们可以为整个函数计时:

import time

def measured_duration(func):  # decorator to return the call duration and the result
    def wrapper(*args, **kwargs):
        time_before = time.time()
        result = func(*args, **kwargs)
        time_after = time.time()
        return (time_after - time_before) * 1000, result
    return wrapper

可以这样使用:

import os
duration, result = measured_duration(os.listdir)(".")
print(duration, result)
# 1.5974044799804688e-05 ['venv', 'test.csv', '__pycache__', ...]

然后让我们定义一些类型(它不会加快速度,但它帮助我理解了问题):

from collections import Counter
from typing import Tuple, Dict, Sequence


ContextWindow = Tuple[str, ...]
TokensIndexes = Dict[str, int]
Cooccurrences = Counter[Tuple[int, int], int]
#                 token index ^      ^ index in context window

def count_cooccurrences(windows: Sequence[ContextWindow], index_by_token: TokensIndexes) -> Cooccurrences:
    ...

(是的,我本可以使用 NewType,但我不想这样做)

所以我们可以这样使用它:

def main():
    # the example data
    windows = [
        ['Gerücht', 'drücken', 'Kurs'],
        ['Konkursgerüchte', 'drücken', 'Kurs'],
        ['Kurs', 'Aktie'],
        ['Kurs', 'Amazon-Aktie'],
        ['der', 'Amazon-Aktie'],
        ['der', 'Aktie', 'Begleitet'],  # <------ changed
        ['Begleitet', 'Gerücht', 'Konkurs'],
        ['Begleitet', 'Marktgerüchten', 'Konkurs'],
        ['begleiten', 'Gerücht', 'Konkurs'],
        ['begleiten', 'setzt fort', 'Aktie', 'Talfahrt'],
    ]
    token_index_list = {
        'Begleitet': 0,  # <------- modified
        # 'Browser-gestützte': 0,  # -----
        'Akzeptanzstellen': 1,
        'Nachschubplanung': 2,
        'persönlichen': 3,
        'Unionsfraktion': 4,
        'Wired': 21122,
        'Hauptfigur': 6,
        'Strafgesetz': 7,
        'Computer-Hersteller': 8,
        'Rückschläge': 9,
    }

    duration, cooccurrences = measured_duration(count_cooccurrences)(windows, token_index_list)
    print(duration, cooccurrences)

main()

我稍微更改了您提供的数据,以便有一些匹配项。

现在函数:

def count_cooccurrences(windows: Sequence[ContextWindow], index_by_token: TokensIndexes) -> Cooccurrences:
    return Counter(
        (token_index, context_index)
        for (token, token_index), window_tokens in itertools.product(
            index_by_token.items(), windows
        )
        for context_index in (
            index_in_window
            for index_in_window, window_token in enumerate(window_tokens)
            if token == window_token
        )
        if context_index != -1
    )
5.269050598144531e-05 Counter({(0, 0): 2, (0, 2): 1})
# on my computer, I always get ~0.055 ms

我直接 return 一个 Counter 对象,它被提供一个迭代器用于初始化,其中包含成对的 (token_index_in_corpus, token_index_in_context_window)。 可悲的是,我们只想计算匹配项并且 tuple/list 上没有 find 方法(它只存在于 str 上)所以我们有一个完整的 for context_index 循环,通常迭代 0 次,有时迭代 1 次。但它支持在上下文 window 中使用多次令牌,如果是这种情况的话。

如果这个 运行 对你来说足够快,请告诉我。下一步是使用多个进程使其并行(因为它是一个CPU-bound任务)。

编辑

为了对数据进行多重处理,首先让我们定义一个辅助函数:

from itertools import zip_longest
################################################################################
# copied from https://docs.python.org/3/library/itertools.html#itertools-recipes
def grouper(iterable, n, *, incomplete='fill', fillvalue=None):
    "Collect data into non-overlapping fixed-length chunks or blocks"
    # grouper('ABCDEFG', 3, fillvalue='x') --> ABC DEF Gxx
    # grouper('ABCDEFG', 3, incomplete='strict') --> ABC DEF ValueError
    # grouper('ABCDEFG', 3, incomplete='ignore') --> ABC DEF
    args = [iter(iterable)] * n
    if incomplete == 'ignore':
        return zip(*args)
    if incomplete == 'fill':
        return zip_longest(*args, fillvalue=fillvalue)
    if incomplete == 'strict':
        return zip(*args, strict=True)
    else:
        raise ValueError('Expected fill, strict, or ignore')
################################################################################

然后并行执行:

import concurrent.futures
from functools import reduce


WINDOW_CHUNK_SIZE = 5
CORPUS_TOKENS_CHUNK_SIZE = 5
# means that each task will consist of searching for CORPUS_TOKENS_CHUNK_SIZE tokens in WINDOW_CHUNK_SIZE windows
# you can fiddle with these to find an optimum, here I used 5 and 5 for demo purposes


def unpack_count_cooccurrences(args):
    # cf 
    # the ProcessPoolExecutor.map make the target function be called with a tuple of its args, so we unpack it
    return count_cooccurrences(args[0], args[1])


def count_cooccurrences_parallel(windows: Sequence[ContextWindow], index_by_token: TokensIndexes) -> Cooccurrences:
    with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor:
        all_params = (  # generator
            (windows_chunk, dict(index_by_token__chunk))
            for windows_chunk, index_by_token__chunk in itertools.product(
                grouper(windows, WINDOW_CHUNK_SIZE, incomplete="ignore"),
                grouper(index_by_token.items(), CORPUS_TOKENS_CHUNK_SIZE, incomplete="ignore"),
            )
        )
        tasks = (executor.submit(unpack_count_cooccurrences, params) for params in all_params)  # generator
        return reduce(lambda counter, future_counter: counter + future_counter.result(),  # could not use operator.add
                      concurrent.futures.as_completed(tasks),  # to reduce progressively
                      Counter())  # starting from an initial value

可以像第一个一样调用:

    duration, cooccurrences = measured_duration(count_cooccurrences_parallel)(windows, token_index_list)
    print(duration, cooccurrences)
13.42153549194336 Counter({(0, 0): 2, (0, 2): 1})

由于成本开销,结果要慢得多(13 毫秒对 0.05)。对于这么小的数据集,多处理实在是太过分了。但是对于更大的数据集,它应该提供良好的性能。
再次告诉我 运行.

花费的时间

使用多处理和 cython。 必须做的第一件事是将上下文链列表更改为索引列表列表。每个索引列表必须具有相同的长度 X。每个索引都是词汇表中一个单词的索引。因此,对于上下文链中的每个不同单词,词汇表中都存在一个不同的索引。对于长度小于 X 的上下文链,必须添加额外的索引。这些额外的索引应该都相同并且大于词汇表中最大的索引。完成后可以使用 cython 和 multiprocessing 。为此,我只是生成一个随机的上下文链列表,我假设词汇表的大小为 32000。我的机器上有 8 个内核,所以我将词汇表分成 8 个片,但更多片也可以。

在python那边。

这会生成上下文链和 vocabindexlist 切片并并行调用函数 dosomething()。

def runcoocinparallel():
    index_lists = []
    context_list = []
    for ab in range(300000):
        list2 = []
        for ac in range(3):
            list2.append(random.randint(0, 32000-1))
        for ac in range(7):
            list2.append(50000)
        context_list.append(list2)
    for j in range(8):
        indexlist = []
        for i in range(4000):
            indexlist.append(j*4000+i)
        index_lists.append(indexlist)
    time1 = helper.current_milli_time()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        a = executor.map(dosomething, index_lists, [context_list]*8)
    print(helper.current_milli_time()-time1)

在此函数中调用了 cython makecount 函数。可以淘汰了。

def dosomething(index_list, context_list):
    firstindex = index_list[0]
    index_list_len = len(index_list)
    array_cy.makecount(firstindex, index_list_len, 8*index_list_len, context_list)

在 cython 方面:

# is supposed to speed up the indexing.
@cython.boundscheck(False) # turn off bounds-checking for entire function
@cython.wraparound(False)  # turn off negative index wrapping for entire function
cpdef makecount(int first_index, int indizes_slice_len, int indizes_len, context_list):
    # allocating memory space for the arrays.
    # this is the array where the count is stored for each possible combination
    cdef int *context_list_cy = <int *> malloc(10* context_list_len*sizeof(int))
    cdef int *combinations = <int *> malloc(indizes_slice_len*indizes_len*sizeof(int))
    # type definition so cython does not check the type each loop.
    cdef int context_list_len = len(context_list)
    cdef int i
    cdef int j
    cdef int k
    cdef int context_index
    # setting the counts to zero
    for i in range(indizes_slice_len*indizes_len):
        combinations[i] = 0
    # creating an cython 1D-Array from the list of contextchains
    for i in range(context_list_len):
        for j in range(10):
            context_list_cy[i*10+j] = context_list[i][j]
    # this is where all the time is spent
    time1 = helper.current_milli_time()
    for i in range(first_index, first_index+indizes_slice_len):
        for j in range(context_list_len):
            # checking if focus_index i is inside this contextchain 
            if i == context_list_cy[j * 10] or i == context_list_cy[j * 10 + 1] or i == context_list_cy[j * 10 + 2] or \
               i == context_list_cy[j * 10 + 3] or i == context_list_cy[j * 10 + 4] or \
               i == context_list_cy[j * 10 + 5] or i == context_list_cy[j * 10 + 6] or \
               i == context_list_cy[j * 10 + 7] or i == context_list_cy[j * 10 + 8] or i == context_list_cy[j * 10 + 9]:
                # if so, increase the count of every valid context_index
                for k in range(10):
                        context_index = context_list_cy[j*10+k]
                        if not (i == context_index or context_index == 50000):
                            combinations[(i-first_index)*indizes_len+context_index] += 1
    print(helper.current_milli_time()-time1)

注意:对于更大的 context_chain_lists,RAM-Size 会成为一个问题。