从 S3 开始的 Luigi Pipeline
Luigi Pipeline beginning in S3
我的初始文件在 AWS S3
。有人可以指出我需要如何在 Luigi Task
中设置它吗?
我查看了文档并找到了 luigi.S3
但我不清楚该怎么做,然后我在网上搜索并仅从 mortar-luigi
获取链接并在 luigi 顶部实现.
更新
遵循为@matagus 提供的示例(我也按照建议创建了 ~/.boto
文件):
# coding: utf-8
import luigi
from luigi.s3 import S3Target, S3Client
class MyS3File(luigi.ExternalTask):
def output(self):
return S3Target('s3://my-bucket/19170205.txt')
class ProcessS3File(luigi.Task):
def requieres(self):
return MyS3File()
def output(self):
return luigi.LocalTarget('/tmp/resultado.txt')
def run(self):
result = None
for input in self.input():
print("Doing something ...")
with input.open('r') as f:
for line in f:
result = 'This is a line'
if result:
out_file = self.output().open('w')
out_file.write(result)
当我执行它时没有任何反应
DEBUG: Checking if ProcessS3File() is complete
INFO: Informed scheduler that task ProcessS3File() has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) running ProcessS3File()
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) done ProcessS3File()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task ProcessS3File() has status DONE
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) was stopped. Shutting down Keep-Alive thread
如您所见,Doing something...
消息从未打印出来。怎么了?
这里的关键是定义一个外部任务,它没有输入,输出是您在 S3 中已有的文件。 Luigi 文档在 Requiring another Task 中提到了这一点:
Note that requires() can not return a Target object. If you have a simple Target object that is created externally you can wrap it in a Task class
所以,基本上你最终会得到这样的结果:
import luigi
from luigi.s3 import S3Target
from somewhere import do_something_with
class MyS3File(luigi.ExternalTask):
def output(self):
return luigi.S3Target('s3://my-bucket/path/to/file')
class ProcessS3File(luigi.Task):
def requires(self):
return MyS3File()
def output(self):
return luigi.S3Target('s3://my-bucket/path/to/output-file')
def run(self):
result = None
# this will return a file stream that reads the file from your aws s3 bucket
with self.input().open('r') as f:
result = do_something_with(f)
# and the you
out_file = self.output().open('w')
# it'd better to serialize this result before writing it to a file, but this is a pretty simple example
out_file.write(result)
更新:
路易吉使用 boto to read files from and/or write them to AWS S3, so in order to make this code work, you'll need to provide your credentials in your boto config file ~/boto
(look for other possible config file locations here):
[Credentials]
aws_access_key_id = <your_access_key_here>
aws_secret_access_key = <your_secret_key_here>
我的初始文件在 AWS S3
。有人可以指出我需要如何在 Luigi Task
中设置它吗?
我查看了文档并找到了 luigi.S3
但我不清楚该怎么做,然后我在网上搜索并仅从 mortar-luigi
获取链接并在 luigi 顶部实现.
更新
遵循为@matagus 提供的示例(我也按照建议创建了 ~/.boto
文件):
# coding: utf-8
import luigi
from luigi.s3 import S3Target, S3Client
class MyS3File(luigi.ExternalTask):
def output(self):
return S3Target('s3://my-bucket/19170205.txt')
class ProcessS3File(luigi.Task):
def requieres(self):
return MyS3File()
def output(self):
return luigi.LocalTarget('/tmp/resultado.txt')
def run(self):
result = None
for input in self.input():
print("Doing something ...")
with input.open('r') as f:
for line in f:
result = 'This is a line'
if result:
out_file = self.output().open('w')
out_file.write(result)
当我执行它时没有任何反应
DEBUG: Checking if ProcessS3File() is complete
INFO: Informed scheduler that task ProcessS3File() has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) running ProcessS3File()
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) done ProcessS3File()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task ProcessS3File() has status DONE
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) was stopped. Shutting down Keep-Alive thread
如您所见,Doing something...
消息从未打印出来。怎么了?
这里的关键是定义一个外部任务,它没有输入,输出是您在 S3 中已有的文件。 Luigi 文档在 Requiring another Task 中提到了这一点:
Note that requires() can not return a Target object. If you have a simple Target object that is created externally you can wrap it in a Task class
所以,基本上你最终会得到这样的结果:
import luigi
from luigi.s3 import S3Target
from somewhere import do_something_with
class MyS3File(luigi.ExternalTask):
def output(self):
return luigi.S3Target('s3://my-bucket/path/to/file')
class ProcessS3File(luigi.Task):
def requires(self):
return MyS3File()
def output(self):
return luigi.S3Target('s3://my-bucket/path/to/output-file')
def run(self):
result = None
# this will return a file stream that reads the file from your aws s3 bucket
with self.input().open('r') as f:
result = do_something_with(f)
# and the you
out_file = self.output().open('w')
# it'd better to serialize this result before writing it to a file, but this is a pretty simple example
out_file.write(result)
更新:
路易吉使用 boto to read files from and/or write them to AWS S3, so in order to make this code work, you'll need to provide your credentials in your boto config file ~/boto
(look for other possible config file locations here):
[Credentials]
aws_access_key_id = <your_access_key_here>
aws_secret_access_key = <your_secret_key_here>