python;异步 handle_read;我需要一个单独的线程吗?

python; asyncore handle_read; do I need a seperate thread?

来自 asyncore 的文档:https://docs.python.org/2/library/asyncore.html

import asyncore, socket

class HTTPClient(asyncore.dispatcher):

  def __init__(self, host, path):
      asyncore.dispatcher.__init__(self)
      self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
      self.connect( (host, 80) )
      self.buffer = 'GET %s HTTP/1.0\r\n\r\n' % path

  def handle_connect(self):
      pass

  def handle_close(self):
      self.close()

  def handle_read(self):
      print self.recv(8192)

  def writable(self):
      return (len(self.buffer) > 0)

  def handle_write(self):
      sent = self.send(self.buffer)
      self.buffer = self.buffer[sent:]

  client = HTTPClient('www.python.org', '/')
  asyncore.loop()

现在假设我们有:

def handle_read(self):
    data = self.recv(8192)
    //SOME REALLY LONG AND COMPLICATED THING

由于 asyncore 的 polling/select 方法,这是在 Asyncore 本身处理的,还是我需要做的:

def handle_read(self):
    data = self.recv(8192)
    h = Handler(data)
    h.start()

class Handler(threading.Thread):
    def __init__(self, data):
        threading.Thread.__init__(self)
        self.data = data
    def run():
        //LONG AND COMPLICATED THING WITH DATA

如果我确实需要一个线程,我想要 h.join()start 之后吗?它似乎有效,但由于 join 块,我不确定为什么。

这与我之前提出的问题相同 here

如果您有一个 LONG AND COMPLICATED THING WITH DATA 需要实现,在事件循环中执行它会阻止事件循环执行任何其他操作,直到您的任务完成。

如果您生成一个线程然后 join() 它也是如此(join 只是阻止执行直到连接的线程完成);但是,如果您生成一个工作线程并让它 运行 自行完成,那么在您的长任务并行完成时,事件循环可以自由地继续处理。

简答

Is this handled in Asyncore itself due to asyncore's polling/select methodlogy?

不,asyncore 不能自己处理 handle_read() 中的长时间阻塞任务,因为只有一个线程。线程正在做一些长时间的工作,不能被同一个线程中断。

但是,这样的阻塞实现是有道理的。唯一的问题是网络传输速度较慢。例如,如果长任务需要 1 秒,则最大数据传输速率为每秒 8192 字节。尽管数据速率较慢,但网络连接稳定并且按预期工作。这是由操作系统内核中的 TCP 协议实现处理的。

...or do I need to do...? If I do need a thread, do I want h.join() after start?

None 上面的线程用法是有道理的。但是,仍然可以使用辅助线程以最大速率下载数据并并行处理该数据,请参阅下面的解释。


TCP 协议

TCP 提供可靠、有序且经过错误检查的流传输。

Data transfer:

Flow control — limits the rate a sender transfers data to guarantee reliable delivery. The receiver continually hints the sender on how much data can be received (controlled by the sliding window). When the receiving host's buffer fills, the next acknowledgment contains a 0 in the window size, to stop transfer and allow the data in the buffer to be processed.

...

When a receiver advertises a window size of 0, the sender stops sending data and starts the persist timer. The persist timer is used to protect TCP from a deadlock situation that could arise if a subsequent window size update from the receiver is lost, and the sender cannot send more data until receiving a new window size update from the receiver. When the persist timer expires, the TCP sender attempts recovery by sending a small packet so that the receiver responds by sending another acknowledgement containing the new window size.

因此,当由于 handle_read() 中的长任务而未从套接字读取数据时,套接字缓冲区已满。 TCP 连接挂起,不再接收任何新的数据包。在recv()可以接收到新数据后,所以TCP ACK数据包被发送给发送方以更新TCPwindow大小。

当数据传输速率受设置限制时,可以在文件下载器应用程序中观察到类似行为。例如,如果限制设置为 1Kb/s,则下载器可能每秒调用一次 recv(1000)。即使物理网络连接能够发送 1Mb/s,也只能接收到 1Kb/s。在那种情况下,可以通过 tcpdumpWireshark TCP 零 Window 数据包和 TCP Window更新个数据包。


虽然应用程序会处理长阻塞任务,但网络连接通常被认为是瓶颈。所以,还是尽快放网比较好。

如果长任务需要更长的时间,那么数据下载最简单的解决方案是下载所有内容,然后才处理下载的数据。然而,如果数据下载的时间与数据处理任务的时间相称,则可能是不可接受的。例如1小时下载+2小时处理,如果处理与下载并行,2小时就可以完成。


每个数据块的线程

如果在 handle_read() 中创建了一个新线程并且主线程不等待辅助线程完成(没有 join()),应用程序可能会创建大量线程。请注意,handle_read() 可能每秒被调用数千次,如果每个长任务花费的时间超过秒,应用程序可能会创建数百个线程,最后可能会被异常杀死。这种解决方案没有意义,因为无法控制线程数量,而且这些线程处理的数据块也是随机的。函数 recv(8192) 最多接收 8192 个字节,但它也可能接收较小的数据块。


每个数据块的线程并与主线程连接

创建一个线程并立即阻止主线程执行join()没有任何意义,因为这样的解决方案并不比没有任何线程的初始解决方案更好。

一些辅助线程和后来的 join() 可用于并行执行某些操作。例如:

# Start detached thread
h.start()
# Do something in parallel to that thread
# ...
# Wait the thread to finish
h.join()

然而,这里不是这样。


持久工作线程和生产者-消费者数据交换

可以创建一个持久工作线程(或多个以使用所有 CPU 核心)来负责数据处理。应该在asyncore.loop()之前启动,例如:

handler = Handler()
asyncore.loop()

现在,一旦处理程序线程准备就绪,它就可以处理所有下载的数据,同时主线程可以继续下载数据。当处理程序线程忙时,下载程序将数据附加到其数据缓冲区。需要注意线程之间的正确同步:

  • 如果下载的数据附加到下载器 buffer,处理程序线程应该等待才能访问该 buffer
  • 如果处理程序正在读取 buffer,下载程序应该等待,然后才能附加到 buffer
  • 如果处理程序无事可做且 buffer 为空,则应 冻结 并等待新的下载数据。

这可以使用 threading condition object 和生产者-消费者示例来实现:

# create a new condition variable on __init__
cv = threading.Condition()

# Consume one item by Handler
cv.acquire()
while not an_item_is_available():
    cv.wait()
get_an_available_item()
cv.release()
# DO SOME REALLY LONG AND COMPLICATED THING

# Produce one item by Downloader
cv.acquire()
make_an_item_available()
cv.notify()
cv.release()

此处make_an_item_available()可能与将下载的数据附加到buffer和/或设置一些其他共享状态变量有关(例如在handle_close()中)。处理程序线程应在 cv.release() 之后执行其长任务,因此在该长任务期间,下载程序能够获取锁并将新数据附加到 buffer.

我发布我自己的答案是因为它受到了 Orest Hera 的答案的启发,但因为我了解我的工作量,所以它略有不同。

我的工作负载使得请求可以突发到达,但这些突发是零星的(非固定的)。此外,它们需要按接收顺序进行处理。所以,这就是我所做的:

#! /usr/bin/env python3

import asyncore #https://docs.python.org/2/library/asyncore.html
import socket
import threading    
import queue
import time

fqueue = queue.Queue()

class Handler(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.keep_reading = True

    def run(self):
        while self.keep_reading:
            if fqueue.empty():
                time.sleep(1)
            else: 
                #PROCESS
    def stop(self):
        self.keep_reading = False


class Listener(asyncore.dispatcher): #http://effbot.org/librarybook/asyncore.htm
    def __init__(self, host, port):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connect((host, port))


    def handle_read(self):
        data = self.recv(40) #pretend it always waits for 40 bytes
        fqueue.put(data)

    def start(self):
        try:
            h = Handler()
            h.start()
            asyncore.loop()
        except KeyboardInterrupt:
            pass
        finally:
            h.stop()