将从 POST 请求收到的文件保存在 Python 中

Save file received from POST request in Python

我正在尝试实现基本 http.server Python 模块的上传功能。

到目前为止,我创建了一个名为 SimpleHTTPRequestHandlerWithUpload 的新 class,它继承自 SimpleHTTPRequestHandler,并向 list_directory() 添加了一个上传部分。下一步将创建一个 do_POST() 方法,该方法处理请求并将文件保存在当前工作目录中。但是,我不知道该怎么做。我查看了 UniIsland's code on GitHub but I can't understand what he did and the code is very old. I also read this question 并尝试在我的代码中实现它。

有点效果,但文件“乱七八糟”headers。这不会对 txt 文件造成大问题,但会破坏所有其他文件扩展名。

我想知道如何删除headers,将上传的文件以其原始名称保存在当前工作目录中,并检查上传是否成功。

这是我的代码:

__version__ = '0.1'

import http.server
import html
import io
import os
import socket  # For gethostbyaddr()
import sys
import urllib.parse
import contextlib

from http import HTTPStatus


class SimpleHTTPRequestHandlerWithUpload(http.server.SimpleHTTPRequestHandler):

    server_version = 'SimpleHTTPWithUpload/' + __version__

    def do_POST(self):
        """Serve a POST request."""
        data = self.rfile.read(int(self.headers['content-length']))
        with open('file.txt', 'wb') as file:
            file.write(data)
        r = []
        enc = sys.getfilesystemencoding()
        r.append('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
        r.append("<html>\n<title>Upload Result Page</title>\n")
        r.append("<body>\n<h2>Upload Result Page</h2>\n")
        r.append("</body>\n</html>")
        encoded = '\n'.join(r).encode(enc, 'surrogateescape')
        f = io.BytesIO()
        f.write(encoded)
        f.seek(0)
        self.send_response(HTTPStatus.OK)
        self.send_header("Content-type", "text/html")
        self.send_header("Content-Length", str(len(encoded)))
        self.end_headers()
        if f:
            self.copyfile(f, self.wfile)
            f.close()

    def list_directory(self, path):
        """Helper to produce a directory listing (absent index.html).
        Return value is either a file object, or None (indicating an
        error).  In either case, the headers are sent, making the
        interface the same as for send_head().
        """
        try:
            list = os.listdir(path)
        except OSError:
            self.send_error(
                HTTPStatus.NOT_FOUND,
                'No permission to list directory')
            return None
        list.sort(key=lambda a: a.lower())
        r = []
        try:
            displaypath = urllib.parse.unquote(self.path,
                                               errors='surrogatepass')
        except UnicodeDecodeError:
            displaypath = urllib.parse.unquote(path)
        displaypath = html.escape(displaypath, quote=False)
        enc = sys.getfilesystemencoding()
        title = 'Directory listing for %s' % displaypath
        r.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" '
                 '"http://www.w3.org/TR/html4/strict.dtd">')
        r.append('<html>\n<head>')
        r.append('<meta http-equiv="Content-Type" '
                 'content="text/html; charset=%s">' % enc)
        r.append('<title>%s</title>\n</head>' % title)
        r.append('<body>\n<h1>%s</h1>' % title)
        r.append('<hr>\n<ul>')
        for name in list:
            fullname = os.path.join(path, name)
            displayname = linkname = name
            # Append / for directories or @ for symbolic links
            if os.path.isdir(fullname):
                displayname = name + '/'
                linkname = name + '/'
            if os.path.islink(fullname):
                displayname = name + '@'
                # Note: a link to a directory displays with @ and links with /
            r.append('<li><a href="%s">%s</a></li>' % (urllib.parse.quote(linkname, errors='surrogatepass'),
                                                       html.escape(displayname, quote=False)))
        r.append('</ul>\n<hr>\n')
        r.append('<form id="upload" enctype="multipart/form-data" method="post" action="#">\n'
                 '<input id="fileupload" name="file" type="file" />\n'
                 '<input type="submit" value="Submit" id="submit" />\n'
                 '</form>')
        r.append('\n<hr>\n</body>\n</html>\n')
        encoded = '\n'.join(r).encode(enc, 'surrogateescape')
        f = io.BytesIO()
        f.write(encoded)
        f.seek(0)
        self.send_response(HTTPStatus.OK)
        self.send_header('Content-type', 'text/html; charset=%s' % enc)
        self.send_header('Content-Length', str(len(encoded)))
        self.end_headers()
        return f


if __name__ == '__main__':
    class DualStackServer(http.server.ThreadingHTTPServer):
        def server_bind(self):
            # suppress exception when protocol is IPv4
            with contextlib.suppress(Exception):
                self.socket.setsockopt(
                    socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
            return super().server_bind()

    http.server.test(
        HandlerClass=SimpleHTTPRequestHandlerWithUpload,
        ServerClass=DualStackServer
    )

如果你想测试它,只需 运行 你机器上的脚本,在另一台机器上打开网络浏览器并在地址栏中输入 <IP_ADDRESS_1>:8000 其中 IP_ADDRESS_1 是您运行正在运行代码的机器的 IP。

请告诉我除了 do_POST() 方法之外还有什么问题。我是一名新的 Python 程序员,我正在努力提高我的软件设计技能。谢谢!


编辑:我想出了如何删除 headers 并使用其原始名称保存文件。但是,脚本一直挂在 data = self.rfile.readlines() 上,直到我关闭浏览器选项卡然后运行良好。我不知道该怎么办。看来我必须发送某种 EOF 来通知 readlines() 我已完成文件发送,但我不知道如何发送。我也不知道如何检查文件是否已成功上传。感谢您的帮助!

更新了 do_POST() 方法:

def do_POST(self):
    """Serve a POST request."""
    data = self.rfile.readlines()
    filename = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', str(data[1]))
    if len(filename) == 1:
        filename = ''.join(filename)
    else:
        return
    data = data[4:-2]
    data = b''.join(data)
    with open(filename, 'wb') as file:
        file.write(data)
    r = []
    enc = sys.getfilesystemencoding()
    r.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" '
             '"http://www.w3.org/TR/html4/strict.dtd">')
    r.append('<html>\n<title>Upload result page</title>\n')
    r.append('<body>\n<h2>Upload result page</h2>\n')
    r.append('</body>\n</html>')
    encoded = '\n'.join(r).encode(enc, 'surrogateescape')
    f = io.BytesIO()
    f.write(encoded)
    f.seek(0)
    self.send_response(HTTPStatus.OK)
    self.send_header('Content-type', 'text/html')
    self.send_header('Content-Length', str(len(encoded)))
    self.end_headers()
    if f:
        self.copyfile(f, self.wfile)
        f.close()

我设法解决了所有问题。我在 GitHub 上发布了我的 code,供任何感兴趣的人使用。