spacy-io 如何在没有 GIL 的情况下使用多线程?
how is spacy-io using multi threading without GIL?
参考 谈到那个,
还有这里
来自 https://spacy.io/
from spacy.attrs import *
# All strings mapped to integers, for easy export to numpy
np_array = doc.to_array([LOWER, POS, ENT_TYPE, IS_ALPHA])
from reddit_corpus import RedditComments
reddit = RedditComments('/path/to/reddit/corpus')
# Parse a stream of documents, with multi-threading (no GIL!)
# Processes over 100,000 tokens per second.
for doc in nlp.pipe(reddit.texts, batch_size=10000, n_threads=4):
# Multi-word expressions, such as names, dates etc
# can be merged into single tokens
for ent in doc.ents:
ent.merge(ent.root.tag_, ent.text, ent.ent_type_)
# Efficient, lossless serialization --- all annotations
# saved, same size as uncompressed text
byte_string = doc.to_bytes()
推测它在 C 级别而不是 python 级别进行解析。一旦你进入 C,如果不需要访问任何 python 对象,你可以安全地释放 GIL。在最底层的读写上,CPython也发布了GIL。原因是,如果还有其他线程 运行 并且我们要调用一个阻塞的 C 函数,那么我们应该在函数调用期间释放 GIL。
您可以在 CPython 的最低实现中看到这一点 write。
if (gil_held) {
do {
Py_BEGIN_ALLOW_THREADS
errno = 0;
#ifdef MS_WINDOWS
n = write(fd, buf, (int)count);
#else
n = write(fd, buf, count);
#endif
/* save/restore errno because PyErr_CheckSignals()
* and PyErr_SetFromErrno() can modify it */
err = errno;
Py_END_ALLOW_THREADS
} while (n < 0 && err == EINTR &&
!(async_err = PyErr_CheckSignals()));
我需要为此写一篇适当的博客 post。 tl;dr 是 spaCy 是在 Cython 中实现的,Cython 是一种类似于 Python 的语言,可以转换为 C 或 C++,并最终生成 Python 扩展。您可以在此处阅读有关使用 Cython 发布 GIL 的更多信息:
http://docs.cython.org/src/userguide/parallelism.html
下面是 .pipe 方法在 spaCy 中的实现:
https://github.com/spacy-io/spaCy/blob/master/spacy/syntax/parser.pyx#L135
def pipe(self, stream, int batch_size=1000, int n_threads=2):
cdef Pool mem = Pool()
cdef TokenC** doc_ptr = <TokenC**>mem.alloc(batch_size, sizeof(TokenC*))
cdef int* lengths = <int*>mem.alloc(batch_size, sizeof(int))
cdef Doc doc
cdef int i
cdef int nr_class = self.moves.n_moves
cdef int nr_feat = self.model.nr_feat
cdef int status
queue = []
for doc in stream:
doc_ptr[len(queue)] = doc.c
lengths[len(queue)] = doc.length
queue.append(doc)
if len(queue) == batch_size:
with nogil:
for i in cython.parallel.prange(batch_size, num_threads=n_threads):
status = self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class)
if status != 0:
with gil:
sent_str = queue[i].text
raise ValueError("Error parsing doc: %s" % sent_str)
PyErr_CheckSignals()
for doc in queue:
self.moves.finalize_doc(doc)
yield doc
queue = []
batch_size = len(queue)
with nogil:
for i in cython.parallel.prange(batch_size, num_threads=n_threads):
status = self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class)
if status != 0:
with gil:
sent_str = queue[i].text
raise ValueError("Error parsing doc: %s" % sent_str)
PyErr_CheckSignals()
for doc in queue:
self.moves.finalize_doc(doc)
yield doc
多线程的实际机制非常简单,因为 NLP(通常)是令人尴尬的并行 --- 每个文档都是独立解析的,所以我们只需要对文本流进行 prange 循环。
不过,以多线程方式实现解析器非常困难。要有效地使用多线程,您需要释放 GIL,而不是重新获取它。这意味着不使用 Python 对象,不引发异常等。
当您创建一个 Python 对象 --- 比方说一个列表 --- 您需要增加它的引用计数,它是全局存储的。这意味着获得 GIL。没有办法解决这个问题。但是,如果您在 C 扩展中,并且只想将整数放入堆栈,或者调用 malloc 或 free,则不需要获取 GIL。因此,如果您在该级别编写程序,仅使用 C 和 C++ 构造,则可以发布 GIL。
几年来我一直在用 Cython 编写统计解析器。 (在 spaCy 之前,我有一个用于我的学术研究的实现。)在没有 GIL 的情况下编写整个解析循环是很困难的。到 2015 年底,我有了机器学习、哈希 table、外部解析循环和大部分特征提取作为 nogil 代码。但是状态对象有一个复杂的接口,并且被实现为一个 cdef class。如果不获取 GIL,我无法创建此对象或将其存储在容器中。
当我想出一种在 Cython 中编写 C++ class 的未记录的方法时,突破就来了。这使我能够挖空控制解析器状态的现有 cdef class。我将它的接口代理到内部 C++ class,一种方法一种方法。这样我就可以保持代码正常工作,并确保我没有在特征计算中引入任何细微的错误。
您可以在此处查看内部 class:https://github.com/spacy-io/spaCy/blob/master/spacy/syntax/_state.pxd
如果您浏览此文件的 git 历史,您可以看到我实现 .pipe 方法的补丁。
参考
还有这里 来自 https://spacy.io/
from spacy.attrs import *
# All strings mapped to integers, for easy export to numpy
np_array = doc.to_array([LOWER, POS, ENT_TYPE, IS_ALPHA])
from reddit_corpus import RedditComments
reddit = RedditComments('/path/to/reddit/corpus')
# Parse a stream of documents, with multi-threading (no GIL!)
# Processes over 100,000 tokens per second.
for doc in nlp.pipe(reddit.texts, batch_size=10000, n_threads=4):
# Multi-word expressions, such as names, dates etc
# can be merged into single tokens
for ent in doc.ents:
ent.merge(ent.root.tag_, ent.text, ent.ent_type_)
# Efficient, lossless serialization --- all annotations
# saved, same size as uncompressed text
byte_string = doc.to_bytes()
推测它在 C 级别而不是 python 级别进行解析。一旦你进入 C,如果不需要访问任何 python 对象,你可以安全地释放 GIL。在最底层的读写上,CPython也发布了GIL。原因是,如果还有其他线程 运行 并且我们要调用一个阻塞的 C 函数,那么我们应该在函数调用期间释放 GIL。
您可以在 CPython 的最低实现中看到这一点 write。
if (gil_held) {
do {
Py_BEGIN_ALLOW_THREADS
errno = 0;
#ifdef MS_WINDOWS
n = write(fd, buf, (int)count);
#else
n = write(fd, buf, count);
#endif
/* save/restore errno because PyErr_CheckSignals()
* and PyErr_SetFromErrno() can modify it */
err = errno;
Py_END_ALLOW_THREADS
} while (n < 0 && err == EINTR &&
!(async_err = PyErr_CheckSignals()));
我需要为此写一篇适当的博客 post。 tl;dr 是 spaCy 是在 Cython 中实现的,Cython 是一种类似于 Python 的语言,可以转换为 C 或 C++,并最终生成 Python 扩展。您可以在此处阅读有关使用 Cython 发布 GIL 的更多信息:
http://docs.cython.org/src/userguide/parallelism.html
下面是 .pipe 方法在 spaCy 中的实现:
https://github.com/spacy-io/spaCy/blob/master/spacy/syntax/parser.pyx#L135
def pipe(self, stream, int batch_size=1000, int n_threads=2):
cdef Pool mem = Pool()
cdef TokenC** doc_ptr = <TokenC**>mem.alloc(batch_size, sizeof(TokenC*))
cdef int* lengths = <int*>mem.alloc(batch_size, sizeof(int))
cdef Doc doc
cdef int i
cdef int nr_class = self.moves.n_moves
cdef int nr_feat = self.model.nr_feat
cdef int status
queue = []
for doc in stream:
doc_ptr[len(queue)] = doc.c
lengths[len(queue)] = doc.length
queue.append(doc)
if len(queue) == batch_size:
with nogil:
for i in cython.parallel.prange(batch_size, num_threads=n_threads):
status = self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class)
if status != 0:
with gil:
sent_str = queue[i].text
raise ValueError("Error parsing doc: %s" % sent_str)
PyErr_CheckSignals()
for doc in queue:
self.moves.finalize_doc(doc)
yield doc
queue = []
batch_size = len(queue)
with nogil:
for i in cython.parallel.prange(batch_size, num_threads=n_threads):
status = self.parseC(doc_ptr[i], lengths[i], nr_feat, nr_class)
if status != 0:
with gil:
sent_str = queue[i].text
raise ValueError("Error parsing doc: %s" % sent_str)
PyErr_CheckSignals()
for doc in queue:
self.moves.finalize_doc(doc)
yield doc
多线程的实际机制非常简单,因为 NLP(通常)是令人尴尬的并行 --- 每个文档都是独立解析的,所以我们只需要对文本流进行 prange 循环。
不过,以多线程方式实现解析器非常困难。要有效地使用多线程,您需要释放 GIL,而不是重新获取它。这意味着不使用 Python 对象,不引发异常等。
当您创建一个 Python 对象 --- 比方说一个列表 --- 您需要增加它的引用计数,它是全局存储的。这意味着获得 GIL。没有办法解决这个问题。但是,如果您在 C 扩展中,并且只想将整数放入堆栈,或者调用 malloc 或 free,则不需要获取 GIL。因此,如果您在该级别编写程序,仅使用 C 和 C++ 构造,则可以发布 GIL。
几年来我一直在用 Cython 编写统计解析器。 (在 spaCy 之前,我有一个用于我的学术研究的实现。)在没有 GIL 的情况下编写整个解析循环是很困难的。到 2015 年底,我有了机器学习、哈希 table、外部解析循环和大部分特征提取作为 nogil 代码。但是状态对象有一个复杂的接口,并且被实现为一个 cdef class。如果不获取 GIL,我无法创建此对象或将其存储在容器中。
当我想出一种在 Cython 中编写 C++ class 的未记录的方法时,突破就来了。这使我能够挖空控制解析器状态的现有 cdef class。我将它的接口代理到内部 C++ class,一种方法一种方法。这样我就可以保持代码正常工作,并确保我没有在特征计算中引入任何细微的错误。
您可以在此处查看内部 class:https://github.com/spacy-io/spaCy/blob/master/spacy/syntax/_state.pxd
如果您浏览此文件的 git 历史,您可以看到我实现 .pipe 方法的补丁。