使用 App Engine Python 将 BIG CSV 中的内容上传到 CloudSQL
Upload content from a BIG CSV to CloudSQL using App Engine Python
我是 Google App Engine 的新手。
我需要做的是将一个相当大的 CSV 文件上传到 CloudSQL。
我有一个 HTML 页面,其中有一个文件上传模块,当上传到 Blobstore 时。
之后,我使用 Blob reader 打开 CSV,并使用 cursor.execute("insert into table values") 将每一行执行到 CloudSQL。这里的问题是我只能执行 HTTP 请求一分钟,并不是所有数据都在那么短的时间内插入。它还使屏幕保持加载状态,如果可能的话,我想通过在后端制作代码 运行 来避免这种情况?
我也尝试过 "LOAD DATA LOCAL INFILE" 方式。
当我通过终端连接到 CloudSQL 时,"LOAD DATA LOCAL INFILE" 在我的本地计算机上工作。而且它非常快。
我将如何在 App Engine 中使用它?
或者是否有更好的方法在从 HTML 上传 CSV 后直接通过 Blobstore 或 Google Cloud Storage 将大型 CSV 导入 CloudSQL?
另外,是否可以将 Task Queues 与 Blob Store 一起使用,然后将数据插入后端的 CloudSQL 中?
我对 Datastore 而不是 CloudSQL 使用了类似的方法,但同样的方法可以应用于您的场景。
设置应用程序的非默认模块(以前称为后端,现在已弃用)
发送将通过任务队列触发模块端点的 http 请求(以避免 60 秒截止日期)
使用 mapreduce 和 CSV 作为输入,并在 map 函数中对 csv 的每一行进行操作(以避免内存错误并在操作过程中出现任何错误时从它离开的地方恢复管道)
编辑:根据 OP 请求详细说明 map reduce,并取消使用 taskqueue
- 从找到的文档中阅读 mapreduce 基础知识 here
下载 mapreduce 工作的依赖文件夹 (simplejson, graphy, mapreduce)
下载 this 文件到您的项目文件夹并另存为 "custom_input_reader.py"
现在将下面的代码复制到您的 main_app.py 文件中。
main_app.py
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from custom_input_reader import GoogleStorageLineInputReader
def testMapperFunc(row):
# do process with csv row
return
class TestGCSReaderPipeline(base_handler.PipelineBase):
def run(self):
yield mapreduce_pipeline.MapPipeline(
"gcs_csv_reader_job",
"main_app.testMapperFunc",
"custom_input_reader.GoogleStorageLineInputReader",
params={
"input_reader": {
"file_paths": ['/' + bucketname + '/' + filename]
}
})
- 创建一个将启动地图作业的 http 处理程序
main_app.py
class BeginUpload(webapp2.RequestHandler):
# do whatever you want
upload_task = TestGCSReaderPipeline()
upload_task.start()
# do whatever you want
- 如果要传递任何参数,请在"run"方法中添加参数并在创建管道对象时提供值
您可以尝试通过云控制台导入 CSV 数据:
https://cloud.google.com/sql/docs/import-export?hl=en#import-csv
我是 Google App Engine 的新手。
我需要做的是将一个相当大的 CSV 文件上传到 CloudSQL。 我有一个 HTML 页面,其中有一个文件上传模块,当上传到 Blobstore 时。
之后,我使用 Blob reader 打开 CSV,并使用 cursor.execute("insert into table values") 将每一行执行到 CloudSQL。这里的问题是我只能执行 HTTP 请求一分钟,并不是所有数据都在那么短的时间内插入。它还使屏幕保持加载状态,如果可能的话,我想通过在后端制作代码 运行 来避免这种情况?
我也尝试过 "LOAD DATA LOCAL INFILE" 方式。
当我通过终端连接到 CloudSQL 时,"LOAD DATA LOCAL INFILE" 在我的本地计算机上工作。而且它非常快。 我将如何在 App Engine 中使用它?
或者是否有更好的方法在从 HTML 上传 CSV 后直接通过 Blobstore 或 Google Cloud Storage 将大型 CSV 导入 CloudSQL? 另外,是否可以将 Task Queues 与 Blob Store 一起使用,然后将数据插入后端的 CloudSQL 中?
我对 Datastore 而不是 CloudSQL 使用了类似的方法,但同样的方法可以应用于您的场景。
设置应用程序的非默认模块(以前称为后端,现在已弃用)
发送将通过任务队列触发模块端点的 http 请求(以避免 60 秒截止日期)
使用 mapreduce 和 CSV 作为输入,并在 map 函数中对 csv 的每一行进行操作(以避免内存错误并在操作过程中出现任何错误时从它离开的地方恢复管道)
编辑:根据 OP 请求详细说明 map reduce,并取消使用 taskqueue
- 从找到的文档中阅读 mapreduce 基础知识 here
下载 mapreduce 工作的依赖文件夹 (simplejson, graphy, mapreduce)
下载 this 文件到您的项目文件夹并另存为 "custom_input_reader.py"
现在将下面的代码复制到您的 main_app.py 文件中。
main_app.py
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from custom_input_reader import GoogleStorageLineInputReader
def testMapperFunc(row):
# do process with csv row
return
class TestGCSReaderPipeline(base_handler.PipelineBase):
def run(self):
yield mapreduce_pipeline.MapPipeline(
"gcs_csv_reader_job",
"main_app.testMapperFunc",
"custom_input_reader.GoogleStorageLineInputReader",
params={
"input_reader": {
"file_paths": ['/' + bucketname + '/' + filename]
}
})
- 创建一个将启动地图作业的 http 处理程序
main_app.py
class BeginUpload(webapp2.RequestHandler):
# do whatever you want
upload_task = TestGCSReaderPipeline()
upload_task.start()
# do whatever you want
- 如果要传递任何参数,请在"run"方法中添加参数并在创建管道对象时提供值
您可以尝试通过云控制台导入 CSV 数据:
https://cloud.google.com/sql/docs/import-export?hl=en#import-csv