Luigi 任务和包装器失败

Luigi tasks and wrapper failing

问过类似的问题,但决定将我的管道分解成更少的步骤,以便更深入地了解我哪里出错了,并尽可能简化调试。

在我的第一个 class 中,我获取了一个巨大的 csv,并根据用户的当前状态将其分成多个 csv。然后我创建了另一个任务,然后根据是否发生这种情况查看给定用户是否从一个状态移动到下一个返回 1 和 0 的状态。

然后我有一个包装器 class,它应该动态地将参数值分配给前一个 class。但是,我的流水线好像没有运行而且我不确定我做错了什么

这是我的:

separate_csv.py:

import luigi
import pandas as pd
class data_filter(luigi.Task):
    file = luigi.Parameter()
    def run(self):
        for current in actions:
            file_pd = pd.read_csv(self.file)
            actions = file_pd.state.unique()
            filter_file = file_pd.loc[file_pd.state.str.contains(current,na=False)]
            filter_file.to_csv('/Users/emm/Documents/AttributionData/Data/'+str(current)+'.csv')
    def requires(self):
        return []
    def output(self):
        return luigi.LocalTarget('/Users/emm/Documents/AttributionData/Data/complete.csv')

state_to_state_transitions.py:

import luigi
import pandas as pd
import separate_csv as sep
class state_to_state(luigi.Task):
    first_file = luigi.Parameter()
    second_file = luigi.Parameter()
    def run(self):
        #iterate through states and find probability of anonymous id existing in next state
        path = '/Users/emm/Documents/AttributionData/Data/Probabilities/'
        first = pd.read_csv(path+self.first_file)
        second = pd.read_csv(path+self.second_file)
        first['probability'] = first.anonymous_id.isin(second.anonymous_id).astype(int)
        #save anonymous id along with probability (1,0) of whether or not it exists in the next state
        with self.output().open('w') as out_csv:
            out_csv.write(first[['anonymous_id','probability']].to_csv('/Users/emm/Documents/AttributionData/Data/Probabilities/'+str(self.first_file[:-4]+'to'+self.second_file)))
    def requires(self):
        files_two = [sep.data_filter(file='/Users/emm/Desktop/Attribution/finalcleanattributiondata.csv')]
        return files_two
    def output(self):
        return luigi.LocalTarget('/Users/emm/Documents/AttributionData/Data/Probabilities/'+str(self.first_file[:-4]+'to'+self.second_file))

wrapper.py

import state_to_state_transitions2 as sst
import pandas as pd
import luigi
class wrapper(luigi.WrapperTask):
    def requires(self):
        files = ['Session.csv', 'lead.csv', 'opportunity.csv', 'complete.csv']
        task_list = []
        for i in range(1, len(files)):
            task_list.append(sst.state_to_state(first_file=files[i-1],second_file=files[i]))
        return task_list
    def run(self):
        print('Wrapper ran')
        pd.DataFrame().to_csv('/Users/emmanuels/Documents/AttributionData/Data/wrangler1.csv')
    def output(self):
        return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/wrangler1.csv')
if __name__ == '__main__':
    luigi.build([wrapper()],workers=8,local_scheduler=True)

以下是我的错误消息的部分内容:

      File "/Users/emm/Documents/GitHub/AttributionModel/Capstone/state_to_state_transitions2.py", line 15, in run
    out_csv.write(first[['anonymous_id','probability']].to_csv('/Users/emm/Documents/AttributionData/Data/Probabilities/'+str(self.first_file[:-4]+'to'+self.second_file)))
TypeError: write() argument must be str, not None
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   state_to_state_lead_csv_opportunity_csv_b31ac9d110   has status   FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 4 pending tasks possibly being run by other workers
DEBUG: There are 4 pending tasks unique to this worker
DEBUG: There are 4 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=152474850, workers=1, host=Emms-MacBook-Pro.local, username=***, pid=***) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 5 tasks of which:
* 1 present dependencies were encountered:
    - 1 data_filter(file=/Users/emm/Desktop/Attribution/finalcleanattributiondata.csv)
* 3 failed:
    - 3 state_to_state(first_file=Session.csv, second_file=lead.csv) ...
* 1 were left pending, among these:
    * 1 had failed dependencies:
        - 1 wrapper()

This progress looks :( because there were failed tasks

您的问题在这里:

with self.output().open('w') as out_csv:
    out_csv.write(first[['anonymous_id','probability']].to_csv('/Users/emm/Documents/AttributionData/Data/Probabilities/'+str(self.first_file[:-4]+'to'+self.second_file)))

而不是 first[['anonymous_id','probability']].to_csv(file_name) 写入文件 file_name,您需要 first[['anonymous_id','probability']].to_csv(),其中 returns 一个包含 csv 数据的字符串。

总的来说,你应该:

with self.output().open('w') as out_csv:
    out_csv.write(first[['anonymous_id','probability']].to_csv())