将从 POST 请求收到的文件保存在 Python 中
Save file received from POST request in Python
我正在尝试实现基本 http.server Python 模块的上传功能。
到目前为止,我创建了一个名为 SimpleHTTPRequestHandlerWithUpload
的新 class,它继承自 SimpleHTTPRequestHandler
,并向 list_directory()
添加了一个上传部分。下一步将创建一个 do_POST()
方法,该方法处理请求并将文件保存在当前工作目录中。但是,我不知道该怎么做。我查看了 UniIsland's code on GitHub but I can't understand what he did and the code is very old. I also read this question 并尝试在我的代码中实现它。
有点效果,但文件“乱七八糟”headers。这不会对 txt 文件造成大问题,但会破坏所有其他文件扩展名。
我想知道如何删除headers,将上传的文件以其原始名称保存在当前工作目录中,并检查上传是否成功。
这是我的代码:
__version__ = '0.1'
import http.server
import html
import io
import os
import socket # For gethostbyaddr()
import sys
import urllib.parse
import contextlib
from http import HTTPStatus
class SimpleHTTPRequestHandlerWithUpload(http.server.SimpleHTTPRequestHandler):
server_version = 'SimpleHTTPWithUpload/' + __version__
def do_POST(self):
"""Serve a POST request."""
data = self.rfile.read(int(self.headers['content-length']))
with open('file.txt', 'wb') as file:
file.write(data)
r = []
enc = sys.getfilesystemencoding()
r.append('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
r.append("<html>\n<title>Upload Result Page</title>\n")
r.append("<body>\n<h2>Upload Result Page</h2>\n")
r.append("</body>\n</html>")
encoded = '\n'.join(r).encode(enc, 'surrogateescape')
f = io.BytesIO()
f.write(encoded)
f.seek(0)
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", "text/html")
self.send_header("Content-Length", str(len(encoded)))
self.end_headers()
if f:
self.copyfile(f, self.wfile)
f.close()
def list_directory(self, path):
"""Helper to produce a directory listing (absent index.html).
Return value is either a file object, or None (indicating an
error). In either case, the headers are sent, making the
interface the same as for send_head().
"""
try:
list = os.listdir(path)
except OSError:
self.send_error(
HTTPStatus.NOT_FOUND,
'No permission to list directory')
return None
list.sort(key=lambda a: a.lower())
r = []
try:
displaypath = urllib.parse.unquote(self.path,
errors='surrogatepass')
except UnicodeDecodeError:
displaypath = urllib.parse.unquote(path)
displaypath = html.escape(displaypath, quote=False)
enc = sys.getfilesystemencoding()
title = 'Directory listing for %s' % displaypath
r.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" '
'"http://www.w3.org/TR/html4/strict.dtd">')
r.append('<html>\n<head>')
r.append('<meta http-equiv="Content-Type" '
'content="text/html; charset=%s">' % enc)
r.append('<title>%s</title>\n</head>' % title)
r.append('<body>\n<h1>%s</h1>' % title)
r.append('<hr>\n<ul>')
for name in list:
fullname = os.path.join(path, name)
displayname = linkname = name
# Append / for directories or @ for symbolic links
if os.path.isdir(fullname):
displayname = name + '/'
linkname = name + '/'
if os.path.islink(fullname):
displayname = name + '@'
# Note: a link to a directory displays with @ and links with /
r.append('<li><a href="%s">%s</a></li>' % (urllib.parse.quote(linkname, errors='surrogatepass'),
html.escape(displayname, quote=False)))
r.append('</ul>\n<hr>\n')
r.append('<form id="upload" enctype="multipart/form-data" method="post" action="#">\n'
'<input id="fileupload" name="file" type="file" />\n'
'<input type="submit" value="Submit" id="submit" />\n'
'</form>')
r.append('\n<hr>\n</body>\n</html>\n')
encoded = '\n'.join(r).encode(enc, 'surrogateescape')
f = io.BytesIO()
f.write(encoded)
f.seek(0)
self.send_response(HTTPStatus.OK)
self.send_header('Content-type', 'text/html; charset=%s' % enc)
self.send_header('Content-Length', str(len(encoded)))
self.end_headers()
return f
if __name__ == '__main__':
class DualStackServer(http.server.ThreadingHTTPServer):
def server_bind(self):
# suppress exception when protocol is IPv4
with contextlib.suppress(Exception):
self.socket.setsockopt(
socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
return super().server_bind()
http.server.test(
HandlerClass=SimpleHTTPRequestHandlerWithUpload,
ServerClass=DualStackServer
)
如果你想测试它,只需 运行 你机器上的脚本,在另一台机器上打开网络浏览器并在地址栏中输入 <IP_ADDRESS_1>:8000
其中 IP_ADDRESS_1
是您运行正在运行代码的机器的 IP。
请告诉我除了 do_POST()
方法之外还有什么问题。我是一名新的 Python 程序员,我正在努力提高我的软件设计技能。谢谢!
编辑:我想出了如何删除 headers 并使用其原始名称保存文件。但是,脚本一直挂在 data = self.rfile.readlines()
上,直到我关闭浏览器选项卡然后运行良好。我不知道该怎么办。看来我必须发送某种 EOF 来通知 readlines()
我已完成文件发送,但我不知道如何发送。我也不知道如何检查文件是否已成功上传。感谢您的帮助!
更新了 do_POST()
方法:
def do_POST(self):
"""Serve a POST request."""
data = self.rfile.readlines()
filename = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', str(data[1]))
if len(filename) == 1:
filename = ''.join(filename)
else:
return
data = data[4:-2]
data = b''.join(data)
with open(filename, 'wb') as file:
file.write(data)
r = []
enc = sys.getfilesystemencoding()
r.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" '
'"http://www.w3.org/TR/html4/strict.dtd">')
r.append('<html>\n<title>Upload result page</title>\n')
r.append('<body>\n<h2>Upload result page</h2>\n')
r.append('</body>\n</html>')
encoded = '\n'.join(r).encode(enc, 'surrogateescape')
f = io.BytesIO()
f.write(encoded)
f.seek(0)
self.send_response(HTTPStatus.OK)
self.send_header('Content-type', 'text/html')
self.send_header('Content-Length', str(len(encoded)))
self.end_headers()
if f:
self.copyfile(f, self.wfile)
f.close()
我设法解决了所有问题。我在 GitHub 上发布了我的 code,供任何感兴趣的人使用。
我正在尝试实现基本 http.server Python 模块的上传功能。
到目前为止,我创建了一个名为 SimpleHTTPRequestHandlerWithUpload
的新 class,它继承自 SimpleHTTPRequestHandler
,并向 list_directory()
添加了一个上传部分。下一步将创建一个 do_POST()
方法,该方法处理请求并将文件保存在当前工作目录中。但是,我不知道该怎么做。我查看了 UniIsland's code on GitHub but I can't understand what he did and the code is very old. I also read this question 并尝试在我的代码中实现它。
有点效果,但文件“乱七八糟”headers。这不会对 txt 文件造成大问题,但会破坏所有其他文件扩展名。
我想知道如何删除headers,将上传的文件以其原始名称保存在当前工作目录中,并检查上传是否成功。
这是我的代码:
__version__ = '0.1'
import http.server
import html
import io
import os
import socket # For gethostbyaddr()
import sys
import urllib.parse
import contextlib
from http import HTTPStatus
class SimpleHTTPRequestHandlerWithUpload(http.server.SimpleHTTPRequestHandler):
server_version = 'SimpleHTTPWithUpload/' + __version__
def do_POST(self):
"""Serve a POST request."""
data = self.rfile.read(int(self.headers['content-length']))
with open('file.txt', 'wb') as file:
file.write(data)
r = []
enc = sys.getfilesystemencoding()
r.append('<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">')
r.append("<html>\n<title>Upload Result Page</title>\n")
r.append("<body>\n<h2>Upload Result Page</h2>\n")
r.append("</body>\n</html>")
encoded = '\n'.join(r).encode(enc, 'surrogateescape')
f = io.BytesIO()
f.write(encoded)
f.seek(0)
self.send_response(HTTPStatus.OK)
self.send_header("Content-type", "text/html")
self.send_header("Content-Length", str(len(encoded)))
self.end_headers()
if f:
self.copyfile(f, self.wfile)
f.close()
def list_directory(self, path):
"""Helper to produce a directory listing (absent index.html).
Return value is either a file object, or None (indicating an
error). In either case, the headers are sent, making the
interface the same as for send_head().
"""
try:
list = os.listdir(path)
except OSError:
self.send_error(
HTTPStatus.NOT_FOUND,
'No permission to list directory')
return None
list.sort(key=lambda a: a.lower())
r = []
try:
displaypath = urllib.parse.unquote(self.path,
errors='surrogatepass')
except UnicodeDecodeError:
displaypath = urllib.parse.unquote(path)
displaypath = html.escape(displaypath, quote=False)
enc = sys.getfilesystemencoding()
title = 'Directory listing for %s' % displaypath
r.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" '
'"http://www.w3.org/TR/html4/strict.dtd">')
r.append('<html>\n<head>')
r.append('<meta http-equiv="Content-Type" '
'content="text/html; charset=%s">' % enc)
r.append('<title>%s</title>\n</head>' % title)
r.append('<body>\n<h1>%s</h1>' % title)
r.append('<hr>\n<ul>')
for name in list:
fullname = os.path.join(path, name)
displayname = linkname = name
# Append / for directories or @ for symbolic links
if os.path.isdir(fullname):
displayname = name + '/'
linkname = name + '/'
if os.path.islink(fullname):
displayname = name + '@'
# Note: a link to a directory displays with @ and links with /
r.append('<li><a href="%s">%s</a></li>' % (urllib.parse.quote(linkname, errors='surrogatepass'),
html.escape(displayname, quote=False)))
r.append('</ul>\n<hr>\n')
r.append('<form id="upload" enctype="multipart/form-data" method="post" action="#">\n'
'<input id="fileupload" name="file" type="file" />\n'
'<input type="submit" value="Submit" id="submit" />\n'
'</form>')
r.append('\n<hr>\n</body>\n</html>\n')
encoded = '\n'.join(r).encode(enc, 'surrogateescape')
f = io.BytesIO()
f.write(encoded)
f.seek(0)
self.send_response(HTTPStatus.OK)
self.send_header('Content-type', 'text/html; charset=%s' % enc)
self.send_header('Content-Length', str(len(encoded)))
self.end_headers()
return f
if __name__ == '__main__':
class DualStackServer(http.server.ThreadingHTTPServer):
def server_bind(self):
# suppress exception when protocol is IPv4
with contextlib.suppress(Exception):
self.socket.setsockopt(
socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
return super().server_bind()
http.server.test(
HandlerClass=SimpleHTTPRequestHandlerWithUpload,
ServerClass=DualStackServer
)
如果你想测试它,只需 运行 你机器上的脚本,在另一台机器上打开网络浏览器并在地址栏中输入 <IP_ADDRESS_1>:8000
其中 IP_ADDRESS_1
是您运行正在运行代码的机器的 IP。
请告诉我除了 do_POST()
方法之外还有什么问题。我是一名新的 Python 程序员,我正在努力提高我的软件设计技能。谢谢!
编辑:我想出了如何删除 headers 并使用其原始名称保存文件。但是,脚本一直挂在 data = self.rfile.readlines()
上,直到我关闭浏览器选项卡然后运行良好。我不知道该怎么办。看来我必须发送某种 EOF 来通知 readlines()
我已完成文件发送,但我不知道如何发送。我也不知道如何检查文件是否已成功上传。感谢您的帮助!
更新了 do_POST()
方法:
def do_POST(self):
"""Serve a POST request."""
data = self.rfile.readlines()
filename = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', str(data[1]))
if len(filename) == 1:
filename = ''.join(filename)
else:
return
data = data[4:-2]
data = b''.join(data)
with open(filename, 'wb') as file:
file.write(data)
r = []
enc = sys.getfilesystemencoding()
r.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" '
'"http://www.w3.org/TR/html4/strict.dtd">')
r.append('<html>\n<title>Upload result page</title>\n')
r.append('<body>\n<h2>Upload result page</h2>\n')
r.append('</body>\n</html>')
encoded = '\n'.join(r).encode(enc, 'surrogateescape')
f = io.BytesIO()
f.write(encoded)
f.seek(0)
self.send_response(HTTPStatus.OK)
self.send_header('Content-type', 'text/html')
self.send_header('Content-Length', str(len(encoded)))
self.end_headers()
if f:
self.copyfile(f, self.wfile)
f.close()
我设法解决了所有问题。我在 GitHub 上发布了我的 code,供任何感兴趣的人使用。