在 python-telegram-bot embedded HTTPServer.HTTPServer 中添加自定义请求映射

Add custom request mapping in python-telegram-bot embedded HTTPServer.HTTPServer

我刚刚用 python-telegram-bot 重写了来自 pyTelegramBotAPI 的电报机器人。有一个想法是公开监控 url,我们可以偶尔用一些应用程序 ping 一次,看看机器人是否仍然 运行(在内部,它会测试机器人的某些功能,如数据库等)。问题是,是否可以使用嵌入式 HTTPServer.HTTPServer 创建这样的请求点?到目前为止,我找不到办法做到这一点。如果我重用通用 example.com/botTOKEN 方法,我需要注意 json 有效负载,并且在失败的情况下我无法发回 HTTP 错误响应代码。

谢谢。

Update1:所以,我遵循了@Marat 提供的代码片段。这就是我获取处理程序对象的方式:

# since the webhook server is started in an extra thread, it's not available immediately
while updater.httpd is None:
    pass
handler = updater.httpd.RequestHandlerClass

是的,你可以。我希望这个例子能有所帮助:

import SimpleHTTPServer
import SocketServer


class myServer(SimpleHTTPServer.SimpleHTTPRequestHandler):

    def do_GET(self):
        """Serve a GET request."""
        # do something here
        self.finish(
            "Hello world! Request path: " + self.path
        )

    def finish(self, value, status=200, ctype="text/html"):
        try:
            self.send_response(status)
            self.send_header("Content-type", ctype)
            self.send_header("Content-Length", str(len(value)))
            self.end_headers()
            self.wfile.write(str(value))
        finally:
            self.wfile.close()


httpd = SocketServer.TCPServer(("", 80), myServer)

httpd.serve_forever()

更多信息,请查看SimpleHTTPRequestHandler

的源代码

UPD: WebhookHandler 代码可能是一个更好的例子

如果你想重用现有的实例,你可以做猴子补丁:

# wrapper for original do_GET
def patch(get_func):
    def wrapper(self):
        if self.path == '/test_url':
            # do something
            message = "Successful" # or not :(
            self.send_response(200)
            self.send_header("Content-type", 'text/plain')
            self.send_header("Content-Length", str(len(message)))
            self.end_headers()
            self.wfile.write(message)
        else:
            return get_func(self)

    return wrapper

# assume `server` is an instance of WebhookHandler
server.do_GET = patch(server.do_GET)  # monkeypatching

UPD2: 在仔细查看 BaseServer 的代码后,我发现它为每个请求都启动了一个新的请求处理程序实例。这意味着修补实例将不起作用,我们需要修补 class 本身。它是这样工作的:

# ... somewhere in the code far far away

from telegram.ext import Updater
from telegram.utils import webhookhandler as wh

# IMPORTANT: do it before making an instance of updater
# IMPORTANT #2: this is considered dirty hack. Don't repeat it at home!
if not wh.WebhookHandler._fuse:
    # note we're patching handler class itself, not an instance
    wh.WebhookHandler.do_GET = patch(wh.WebhookHandler.do_GET)  # use patch() from above
    wh.WebhookHandler._fuse = True

updater = Updater(token='TOKEN')