MySQL Luigi 工作流程中的目标
MySQL Targets in Luigi workflow
我的 TaskB 需要 TaskA,完成后 TaskA 写入 MySQL table,然后 TaskB 将此输出作为其输入 table。
我似乎不知道如何在 Luigi 中执行此操作。有人可以给我举个例子或给我一个简单的例子吗?
luigi 中现有的 MySqlTarget 使用单独的标记 table 来指示任务何时完成。这是我会采取的粗略方法......但是你的问题非常抽象,所以在现实中它可能会更复杂。
import luigi
from datetime import datetime
from luigi.contrib.mysqldb import MySqlTarget
class TaskA(luigi.Task):
rundate = luigi.DateParameter(default=datetime.now().date())
target_table = "table_to_update"
host = "localhost:3306"
db = "db_to_use"
user = "user_to_use"
pw = "pw_to_use"
def get_target(self):
return MySqlTarget(host=self.host, database=self.db, user=self.user, password=self.pw, table=self.target_table,
update_id=str(self.rundate))
def requires(self):
return []
def output(self):
return self.get_target()
def run(self):
#update table
self.get_target().touch()
class TaskB(luigi.Task):
def requires(self):
return [TaskA()]
def run(self):
# reading from target_table
我的 TaskB 需要 TaskA,完成后 TaskA 写入 MySQL table,然后 TaskB 将此输出作为其输入 table。
我似乎不知道如何在 Luigi 中执行此操作。有人可以给我举个例子或给我一个简单的例子吗?
luigi 中现有的 MySqlTarget 使用单独的标记 table 来指示任务何时完成。这是我会采取的粗略方法......但是你的问题非常抽象,所以在现实中它可能会更复杂。
import luigi
from datetime import datetime
from luigi.contrib.mysqldb import MySqlTarget
class TaskA(luigi.Task):
rundate = luigi.DateParameter(default=datetime.now().date())
target_table = "table_to_update"
host = "localhost:3306"
db = "db_to_use"
user = "user_to_use"
pw = "pw_to_use"
def get_target(self):
return MySqlTarget(host=self.host, database=self.db, user=self.user, password=self.pw, table=self.target_table,
update_id=str(self.rundate))
def requires(self):
return []
def output(self):
return self.get_target()
def run(self):
#update table
self.get_target().touch()
class TaskB(luigi.Task):
def requires(self):
return [TaskA()]
def run(self):
# reading from target_table