python luigi localTarget pickle

python luigi localTarget pickle

我 运行正在 Windows 7,Python 2.7 通过 Anaconda 4.3.17,Luigi 2.4.0,Pandas 0.18,sklearn 版本 0.18。在下面,我试图让 luigi.LocalTarget 输出成为一个 pickle 来存储几个不同的对象(使用 firstJob),然后在依赖作业(secondJob)中从该 pickle 中读取。如果我 运行 从命令行执行以下操作,则 firstJob 成功完成:

"python -m luigi --module luigiPickle firstJob --date 2017-06-07 --local-scheduler"

但是,如果我尝试 运行ning secondJob 即

"python -m luigi --module luigiPickle secondJob --date 2017-06-07 --local-scheduler"

我明白了

Traceback (most recent call last):
  File "C:\Anaconda2\lib\site-packages\luigi-2.4.0-py2.7.egg\luigi\worker.py", l
ine 191, in run
    new_deps = self._run_get_new_deps()
  File "C:\Anaconda2\lib\site-packages\luigi-2.4.0-py2.7.egg\luigi\worker.py", l
ine 129, in _run_get_new_deps
    task_gen = self.task.run()
  File "luigiPickle.py", line 41, in run
    ret2 = pickle.load(inFile)
  File "C:\Anaconda2\lib\pickle.py", line 1384, in load
    return Unpickler(file).load()
  File "C:\Anaconda2\lib\pickle.py", line 864, in load
    dispatch[key](self)
  File "C:\Anaconda2\lib\pickle.py", line 1096, in load_global
    klass = self.find_class(module, name)
  File "C:\Anaconda2\lib\pickle.py", line 1130, in find_class
    __import__(module)
ImportError: No module named frame

由于无法识别 pandas.DataFrame() 对象(可能是作用域问题?),luigi 似乎无法读取 pickle。

import luigi
import pandas as pd
import pickle
from sklearn.linear_model import LinearRegression

class firstJob(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return None

    def output(self):
        return luigi.LocalTarget('%s_first.pickle' % self.date)

    def run(self):
        ret = {}
        ret['a'] = pd.DataFrame({'a': [1, 2], 'b': [3, 4]})
        ret['b'] = pd.DataFrame({'a': [3, 4], 'd': [0, 0]})
        ret['c'] = LinearRegression()
        outFile = self.output().open('wb')
        pickle.dump(ret, outFile, protocol=pickle.HIGHEST_PROTOCOL)
        outFile.close()

class secondJob(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return firstJob(self.date)

    def output(self):
        return luigi.LocalTarget('%s_second.pickle' % self.date)

    def run(self):
        inFile = self.input().open('rb')
        ret2 = pickle.load(inFile)
        inFile.close()

if __name__ == '__main__':
    luigi.run()

luigi open 命令不适用于二进制的 b 标志 - 它将它从选项字符串中删除。 (不知道为什么)。最好只使用带有路径属性的标准打开:

open(self.input().path, 'rb')open(self.output().path, 'wb').

d6tflow solves this, see example for sklearn model pickle 回答了这个问题。另外,您不需要编写所有样板代码。

import d6tflow

class firstJob(d6tflow.tasks.TaskPickle):
    def run(self):
        # your code
        self.save(ret)

class secondJob(TaskClass):
    date = luigi.DateParameter()

    def requires(self):
        return firstJob(self.date)

    def run(self):
        inFile = self.input().load()
        # use inFile

d6tflow.run([secondJob])