使用 Luigi 的循环机器学习 ETL
Recurrent machine learning ETL using Luigi
今天,运行 我编写的机器学习作业是手工完成的。我下载所需的输入文件,学习和预测事物,输出一个 .csv 文件,然后将其复制到数据库中。
但是,由于要投入生产,我需要将所有这些过程自动化。所需的输入文件将每月(最终更频繁地)从提供程序到达 S3 存储桶。
现在我打算用 Luigi 来解决这个问题。这是理想的过程:
- 我需要我的程序每周(或每天,或每小时,我觉得更好)查看 S3 存储桶中的新文件
- 当文件到达时,我的机器学习管道被触发,并吐出一些 pandas 数据帧。
- 之后,我需要我的程序将这些结果写入不同的数据库
问题是,我不知道如何使用 Luigi 来实现自动化:
- 正在看文件
- 安排任务(例如每个月)
- 部署它(以可重现的方式)
今天,这是我想到的管道框架:
import luigi
from mylib import ml_algorithm
from mytools import read_s3, write_hdfs, read_hdfs, write_db, new_files, mark_as_done
class Extract(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
filename = luigi.Parameter()
def requires(self):
pass
def output(self, filename):
luigi.hdfs.HdfsTarget(self.date.strftime('data/%Y_%m_%d' + self.filename)
def run(self):
data = read_s3(s3_path + '/' + file)
with self.output.open('w') as hdfs_file:
write_hdfs(hdfs_file, data)
class Transform(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
filename = luigi.Parameter()
def requires(self):
return Extract(self.date, self.s3_path, self.filename)
def output(self, filename):
luigi.hdfs.HdfsTarget(self.date.strftime('data/results/%Y_%m_%d_' + filename)
def run(self):
with self.input().open('r') as inputfile:
data = read_hdfs(inputfile)
result = ml_algorithm(data)
with self.output().open('w') as outputfile:
write_hdfs(outputfile, result)
mark_as_done(filename)
class Load(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
def requires(self):
return [Transform(self.date, self.s3_path, filename) for filename in new_files(self.s3_path)]
def output(self):
# Fake DB target, just for illustrative purpose
luigi.hdfs.DBTarget('...')
def run(self):
for input in self.input():
with input.open('r') as inputfile:
result = read_hdfs(inputfile)
# again, just for didatic purposes
db = self.output().connection
write_db(db, result)
然后我会将其添加到 crontab 中并简单地包装到 Docker 容器中。
问题:
- 这是人们用来执行此操作的正确模式吗?有更好的方法吗?
- 如果我有
Transform1
(取决于输入数据)和 Transform2
(取决于 Transform1
结果)并且想将这两个结果保存到不同的数据库中,怎么可能一个人使用 Luigi 管道实现这个(也是在观看文件的上下文中)?
- 人们为此使用了与 cron 不同的东西吗?
- 如何正确地将其容器化?
您的模式看起来基本正确。我将从使用 cron 作业调用触发 Load
任务管道的脚本开始。看起来这个 Load
任务已经验证了 S3 存储桶中是否存在新文件,但是您必须将输出更改为也是有条件的,如果无事可做,则可能是状态文件或其他内容.您也可以在更高级别 WrapperTask
(无输出)中执行此操作,仅当有新文件时才需要 Load
任务。然后你可以使用这个 WrapperTask
来要求两个不同的加载任务,这将分别需要你的 Transform1
和 Transform2
。
添加容器...我的 cron 真正调用的是一个脚本,它从 git 中提取我的最新代码,必要时构建一个新容器,然后调用 docker 运行.我有另一个容器,它总是 运行ning luigid
。每天docker运行在容器中执行一个shell脚本使用CMD
调用luigi任务,当天需要的参数。
今天,运行 我编写的机器学习作业是手工完成的。我下载所需的输入文件,学习和预测事物,输出一个 .csv 文件,然后将其复制到数据库中。
但是,由于要投入生产,我需要将所有这些过程自动化。所需的输入文件将每月(最终更频繁地)从提供程序到达 S3 存储桶。
现在我打算用 Luigi 来解决这个问题。这是理想的过程:
- 我需要我的程序每周(或每天,或每小时,我觉得更好)查看 S3 存储桶中的新文件
- 当文件到达时,我的机器学习管道被触发,并吐出一些 pandas 数据帧。
- 之后,我需要我的程序将这些结果写入不同的数据库
问题是,我不知道如何使用 Luigi 来实现自动化:
- 正在看文件
- 安排任务(例如每个月)
- 部署它(以可重现的方式)
今天,这是我想到的管道框架:
import luigi
from mylib import ml_algorithm
from mytools import read_s3, write_hdfs, read_hdfs, write_db, new_files, mark_as_done
class Extract(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
filename = luigi.Parameter()
def requires(self):
pass
def output(self, filename):
luigi.hdfs.HdfsTarget(self.date.strftime('data/%Y_%m_%d' + self.filename)
def run(self):
data = read_s3(s3_path + '/' + file)
with self.output.open('w') as hdfs_file:
write_hdfs(hdfs_file, data)
class Transform(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
filename = luigi.Parameter()
def requires(self):
return Extract(self.date, self.s3_path, self.filename)
def output(self, filename):
luigi.hdfs.HdfsTarget(self.date.strftime('data/results/%Y_%m_%d_' + filename)
def run(self):
with self.input().open('r') as inputfile:
data = read_hdfs(inputfile)
result = ml_algorithm(data)
with self.output().open('w') as outputfile:
write_hdfs(outputfile, result)
mark_as_done(filename)
class Load(luigi.Task):
date = luigi.DateParameter()
s3_path = luigi.Parameter()
def requires(self):
return [Transform(self.date, self.s3_path, filename) for filename in new_files(self.s3_path)]
def output(self):
# Fake DB target, just for illustrative purpose
luigi.hdfs.DBTarget('...')
def run(self):
for input in self.input():
with input.open('r') as inputfile:
result = read_hdfs(inputfile)
# again, just for didatic purposes
db = self.output().connection
write_db(db, result)
然后我会将其添加到 crontab 中并简单地包装到 Docker 容器中。
问题:
- 这是人们用来执行此操作的正确模式吗?有更好的方法吗?
- 如果我有
Transform1
(取决于输入数据)和Transform2
(取决于Transform1
结果)并且想将这两个结果保存到不同的数据库中,怎么可能一个人使用 Luigi 管道实现这个(也是在观看文件的上下文中)? - 人们为此使用了与 cron 不同的东西吗?
- 如何正确地将其容器化?
您的模式看起来基本正确。我将从使用 cron 作业调用触发 Load
任务管道的脚本开始。看起来这个 Load
任务已经验证了 S3 存储桶中是否存在新文件,但是您必须将输出更改为也是有条件的,如果无事可做,则可能是状态文件或其他内容.您也可以在更高级别 WrapperTask
(无输出)中执行此操作,仅当有新文件时才需要 Load
任务。然后你可以使用这个 WrapperTask
来要求两个不同的加载任务,这将分别需要你的 Transform1
和 Transform2
。
添加容器...我的 cron 真正调用的是一个脚本,它从 git 中提取我的最新代码,必要时构建一个新容器,然后调用 docker 运行.我有另一个容器,它总是 运行ning luigid
。每天docker运行在容器中执行一个shell脚本使用CMD
调用luigi任务,当天需要的参数。