如何 运行 在自己的线程中扭曲 webapps 而不是使用 twistd
How to run twisted webapps within their own threads and not using twistd
我有一个有点丰富的 python 应用程序,不想为了暴露一些 http 端点而重构它。我只是想通过一组线程公开它们,这些线程通过这些端口上的 http 侦听。
有几个相关的问题:但它们很旧,并不是我要找的:
- How to start twisted's reactor from ipython
- Twisted application without twistd
什么是 "modern" 方法,这将涉及简单地启动线程 - 或者甚至更好的 threadpool
条目 - 以将 twisted
合并到现有应用程序中?
见https://github.com/itamarst/crochet
这是文档中的示例:
#!/usr/bin/python
"""
Do a DNS lookup using Twisted's APIs.
"""
from __future__ import print_function
# The Twisted code we'll be using:
from twisted.names import client
from crochet import setup, wait_for
setup()
# Crochet layer, wrapping Twisted's DNS library in a blocking call.
@wait_for(timeout=5.0)
def gethostbyname(name):
"""Lookup the IP of a given hostname.
Unlike socket.gethostbyname() which can take an arbitrary amount of time
to finish, this function will raise crochet.TimeoutError if more than 5
seconds elapse without an answer being received.
"""
d = client.lookupAddress(name)
d.addCallback(lambda result: result[0][0].payload.dottedQuad())
return d
if __name__ == '__main__':
# Application code using the public API - notice it works in a normal
# blocking manner, with no event loop visible:
import sys
name = sys.argv[1]
ip = gethostbyname(name)
print(name, "->", ip)
我有一个有点丰富的 python 应用程序,不想为了暴露一些 http 端点而重构它。我只是想通过一组线程公开它们,这些线程通过这些端口上的 http 侦听。
有几个相关的问题:但它们很旧,并不是我要找的:
- How to start twisted's reactor from ipython
- Twisted application without twistd
什么是 "modern" 方法,这将涉及简单地启动线程 - 或者甚至更好的 threadpool
条目 - 以将 twisted
合并到现有应用程序中?
见https://github.com/itamarst/crochet
这是文档中的示例:
#!/usr/bin/python
"""
Do a DNS lookup using Twisted's APIs.
"""
from __future__ import print_function
# The Twisted code we'll be using:
from twisted.names import client
from crochet import setup, wait_for
setup()
# Crochet layer, wrapping Twisted's DNS library in a blocking call.
@wait_for(timeout=5.0)
def gethostbyname(name):
"""Lookup the IP of a given hostname.
Unlike socket.gethostbyname() which can take an arbitrary amount of time
to finish, this function will raise crochet.TimeoutError if more than 5
seconds elapse without an answer being received.
"""
d = client.lookupAddress(name)
d.addCallback(lambda result: result[0][0].payload.dottedQuad())
return d
if __name__ == '__main__':
# Application code using the public API - notice it works in a normal
# blocking manner, with no event loop visible:
import sys
name = sys.argv[1]
ip = gethostbyname(name)
print(name, "->", ip)