查找最佳子串匹配

Find best substring match

我正在寻找一个库或一种使用现有库 (difflib, fuzzywuzzy, python-levenshtein) 来查找文本 (corpus) 中字符串 (query) 的最接近匹配项的方法

我开发了一种基于 difflib 的方法,我将 corpus 分成大小为 n 的 ngram(query 的长度)。

import difflib
from nltk.util import ngrams

def get_best_match(query, corpus):
    ngs = ngrams( list(corpus), len(query) )
    ngrams_text = [''.join(x) for x in ngs]
    return difflib.get_close_matches(query, ngrams_text, n=1, cutoff=0)

当查询和匹配的字符串之间的差异只是字符替换时,它可以按我想要的方式工作。

query = "ipsum dolor"
corpus = "lorem 1psum d0l0r sit amet"

match = get_best_match(query, corpus)
# match = "1psum d0l0r"

但是当区别是字符删除时,则不是。

query = "ipsum dolor"
corpus = "lorem 1psum dlr sit amet"

match = get_best_match(query, corpus)
# match = "psum dlr si"
# expected_match = "1psum dlr"

有没有办法获得更灵活的结果大小(至于 expected_match)?

编辑 1:

编辑 2:

我现在使用的解决方案是用(n-k)-grams for k = {1,2,3}扩展ngrams以防止3个删除。它比第一个版本好得多,但在速度方面效率不高,因为我们要检查的 ngram 数量是原来的 3 倍多。这也是一个不可推广的解决方案。

此函数找到可变长度的最佳匹配子串

该实现将语料库视为一个长字符串,因此避免了您对空格和未分隔单词的担忧。

代码摘要: 1. 以大小 step 的步长扫描语料库中的匹配值,以找到最高匹配值的大致位置 pos2.找到匹配值最高的pos附近的子串,调整子串的left/right位置

from difflib import SequenceMatcher

def get_best_match(query, corpus, step=4, flex=3, case_sensitive=False, verbose=False):
    """Return best matching substring of corpus.

    Parameters
    ----------
    query : str
    corpus : str
    step : int
        Step size of first match-value scan through corpus. Can be thought of
        as a sort of "scan resolution". Should not exceed length of query.
    flex : int
        Max. left/right substring position adjustment value. Should not
        exceed length of query / 2.

    Outputs
    -------
    output0 : str
        Best matching substring.
    output1 : float
        Match ratio of best matching substring. 1 is perfect match.
    """

    def _match(a, b):
        """Compact alias for SequenceMatcher."""
        return SequenceMatcher(None, a, b).ratio()

    def scan_corpus(step):
        """Return list of match values from corpus-wide scan."""
        match_values = []

        m = 0
        while m + qlen - step <= len(corpus):
            match_values.append(_match(query, corpus[m : m-1+qlen]))
            if verbose:
                print(query, "-", corpus[m: m + qlen], _match(query, corpus[m: m + qlen]))
            m += step

        return match_values

    def index_max(v):
        """Return index of max value."""
        return max(range(len(v)), key=v.__getitem__)

    def adjust_left_right_positions():
        """Return left/right positions for best string match."""
        # bp_* is synonym for 'Best Position Left/Right' and are adjusted 
        # to optimize bmv_*
        p_l, bp_l = [pos] * 2
        p_r, bp_r = [pos + qlen] * 2

        # bmv_* are declared here in case they are untouched in optimization
        bmv_l = match_values[p_l // step]
        bmv_r = match_values[p_l // step]

        for f in range(flex):
            ll = _match(query, corpus[p_l - f: p_r])
            if ll > bmv_l:
                bmv_l = ll
                bp_l = p_l - f

            lr = _match(query, corpus[p_l + f: p_r])
            if lr > bmv_l:
                bmv_l = lr
                bp_l = p_l + f

            rl = _match(query, corpus[p_l: p_r - f])
            if rl > bmv_r:
                bmv_r = rl
                bp_r = p_r - f

            rr = _match(query, corpus[p_l: p_r + f])
            if rr > bmv_r:
                bmv_r = rr
                bp_r = p_r + f

            if verbose:
                print("\n" + str(f))
                print("ll: -- value: %f -- snippet: %s" % (ll, corpus[p_l - f: p_r]))
                print("lr: -- value: %f -- snippet: %s" % (lr, corpus[p_l + f: p_r]))
                print("rl: -- value: %f -- snippet: %s" % (rl, corpus[p_l: p_r - f]))
                print("rr: -- value: %f -- snippet: %s" % (rl, corpus[p_l: p_r + f]))

        return bp_l, bp_r, _match(query, corpus[bp_l : bp_r])

    if not case_sensitive:
        query = query.lower()
        corpus = corpus.lower()

    qlen = len(query)

    if flex >= qlen/2:
        print("Warning: flex exceeds length of query / 2. Setting to default.")
        flex = 3

    match_values = scan_corpus(step)
    pos = index_max(match_values) * step

    pos_left, pos_right, match_value = adjust_left_right_positions()

    return corpus[pos_left: pos_right].strip(), match_value

示例:

query = "ipsum dolor"
corpus = "lorem i psum d0l0r sit amet"
match = get_best_match(query, corpus, step=2, flex=4)
print(match)
('i psum d0l0r', 0.782608695652174)

一些很好的启发式建议是始终保持 step < len(query) * 3/4flex < len(query) / 3。我还添加了区分大小写,以防万一。当您开始使用 step 和 flex 值时,它工作得很好。较小的步长值可提供更好的结果,但计算时间更长。 flex 控制结果子字符串长度的灵活性。

重要提示:这只会找到第一个最佳匹配项,因此如果有多个同样好的匹配项,则只会return编辑第一个。要允许多个匹配项,请将 index_max() 更改为 return 输入列表中 n 最高值的索引列表,并循环 adjust_left_right_positions() 以获取该列表中的值。

我会尝试从查询字符串构建正则表达式模板。然后可以使用该模板在语料库中搜索可能与查询匹配的子字符串。然后使用 difflib 或 fuzzywuzzy 检查子字符串是否与查询匹配。

例如,一个可能的模板是匹配查询的前两个字母中的至少一个,至少匹配查询的最后两个字母中的一个,并且中间的字母数量大致正确:

import re

query = "ipsum dolor"
corpus = ["lorem 1psum d0l0r sit amet",
          "lorem 1psum dlr sit amet",
          "lorem ixxxxxxxr sit amet"]

first_letter, second_letter = query[:2]
minimum_gap, maximum_gap = len(query) - 6, len(query) - 3
penultimate_letter, ultimate_letter = query[-2:]

fmt = '(?:{}.|.{}).{{{},{}}}(?:{}.|.{})'.format
pattern = fmt(first_letter, second_letter,
              minimum_gap, maximum_gap,
              penultimate_letter, ultimate_letter)

#print(pattern) # for debugging pattern

m = difflib.SequenceMatcher(None, "", query, False)

for c in corpus:
    for match in re.finditer(pattern1, c, re.IGNORECASE):
        substring = match.group()
        m.set_seq1(substring)
        ops = m.get_opcodes()

        # EDIT fixed calculation of the number of edits
        #num_edits = sum(1 for t,_,_,_,_ in ops if t != 'equal')
        num_edits = sum(max(i2-i1, j2-j1) for op,i1,i2,j1,j2 in ops if op != 'equal' )
        print(num_edits, substring)

输出:

3 1psum d0l0r
3 1psum dlr
9 ixxxxxxxr

另一个想法是在构建正则表达式时使用ocr的特性。例如,如果 ocr 总是得到正确的某些字母,那么当这些字母中的任何一个在查询中时,在正则表达式中使用其中的几个。或者,如果 ocr 混淆了“1”、“!”、'l' 和 'i',但从不替换其他内容,那么如果其中一个字母在查询中,请使用 [1!il]在正则表达式中。

解决方案的主要路径使用某种有限状态自动机 (FSA)。如果您想要该主题的详细摘要,请查看此 dissertation (PDF link)。基于错误的模型(包括 Levenshtein 自动机和转换器,Sergei 提到的前者)是解决此问题的有效方法。然而,随机模型,包括与 FSA 集成的各种类型的机器学习方法,目前非常流行。

由于我们正在查看编辑距离(有效拼写错误的单词),Levenshtein 方法很好而且相对简单。 This paper(以及论文;还有 PDF)给出了基本思想的一个体面的概述,它还明确提到了 OCR 任务的应用。但是,我将在下面回顾一些要点。

基本思想是您想要构建一个 FSA 来计算有效字符串以及所有字符串直到某个错误距离 (k)。在一般情况下,这个 k 可能是无限的或文本的大小,但这与 OCR 几乎无关(如果你的 OCR 甚至可能 return bl*h 其中 * 是整个文本的其余部分,我建议寻找更好的 OCR 系统)。因此,我们可以从搜索字符串 blah 的有效答案集中限制正则表达式 bl*h。一般的,简单直观的 k 对于你的上下文来说大概是字符串的长度 (w) 减去 2。这允许 b--h成为 blah 的有效字符串。它还允许 bla--h,但没关系。另外,请记住,错误可以是您指定的任何字符,包括 spaces(因此 'multiword' 输入是可解决的)。

下一个基本任务是设置一个简单的加权传感器。任何 OpenFST Python 端口都可以做到这一点 (here's one)。逻辑很简单:插入和删除会增加权重,而相等会增加输入字符串中的索引。您也可以像 Sergei 评论 link 中的那个人那样手动编码。

一旦你有了权重和权重的相关索引,你只需排序和return。计算复杂度应该是 O(n(w+k)),因为我们将在最坏情况下为文本中的每个字符 (n) 向前看 w+k 个字符。

在这里,您可以做各种各样的事情。您可以将传感器转换为 DFA。您可以通过将文本分解为 w+k-grams 来并行化系统,这些 w+k-grams 被发送到不同的进程。您可以开发一个语言模型或混淆矩阵来定义输入集中每个字母存在的常见错误(从而限制有效转换的 space 和相关 FSA 的复杂性)。文献数量庞大且仍在增长,因此修改的数量可能与解决方案的数量一样多(如果不是更多的话)。

希望在不提供任何代码的情况下回答您的一些问题。