产生所需的 luigi 输出的问题

Problems producing desired luigi output

我正在尝试创建一个接收 3 个文件的管道,从每个文件中获取 n 行(由 obs_num 表示)将文件中的每个值与 0 和之间的随机浮点数进行比较1 如果它大于随机数,则 returns 为 obs_num,否则为 false。然后我将这些值附加到列表(列表 1)

我接着看下一个第二个文件,检查obs_num的位置,如果相同位置的num returned false 在上一个文件中return false,或者再次检查 num 是否大于随机浮点数,然后我对第三个文件做同样的事情。我还将这些值附加到列表(列表 2 和 3)

然后我将这 3 个列表转换为数据框,每个列表代表一列。

但是问题是,当我 运行 我的管道输出是一个包含一个空白列的文件,而不是一个 csv 行等同于 obs_num.

这是我用于包装器的代码:

import pandas as pd
import luigi
import state_to_state_machine as ssm
class wrapper(luigi.WrapperTask):
    def requires(self):
        file_tag = ['Sessiontolead','leadtoopportunity','opportunitytocomplete']
        size = 10
        for j in range(1,int(size)):
            return[ssm.state_machine(file_tag=i,size=size,obs_nums=j)for i in file_tag]
    def run(self):
        print('The wrapper is complete')
        pd.DataFrame().to_csv('/Users/emm/Documents/AttributionData/Data/datawranglerwrapper3.csv') #never returns anything
    def output(self):
        return luigi.LocalTarget('/Users/emm/Documents/AttributionData/Data/datawranglerwrapper3.csv')
if __name__ == '__main__':
    luigi.build([wrapper()],workers=8,local_scheduler=True)

状态机:

import pandas as pd
import get_samples as gs
import luigi
import random
class state_machine(luigi.Task):
    file_tag = luigi.Parameter()
    obs_nums = luigi.Parameter() #directly get element - don't write to file
    size = luigi.Parameter()
    def run(self):
        path = '/Users/emm/Documents/AttributionData/Data/Probabilities/'
        file = path+self.file_tag+'sampleprobs.csv'
        def generic_state_machine(tag,file=file,obs_nums=self.obs_nums):
            if file.split('/')[7][:4] == tag:
                state_machine = pd.read_csv(file)
                return state_machine.ix[:,1][obs_nums] if s.ix[:,1][obs_nums] > random.uniform(0,1) else False
        session_to_leads = []
        lead_to_opps = []
        opps_to_comp = []
        session_to_leads.append(generic_state_machine(tag='Sessiontoload+sampleprobabs',file=file,obs_nums=self.obs_nums))
        lead_to_opps.append(generic_state_machine(tag='leadtoopportunity+sampleprobabs',file=file,obs_nums=self.obs_nums)) if session_to_leads[self.obs_nums-1] != False else lead_to_opps.append(False)
        opps_to_comp.append(generic_state_machine(tag='opportunitytocomplete+sampleprobabs',file=file,obs_nums=self.obs_nums)) if lead_to_opps[self.obs_nums-1] != False else opps_to_comps.append(False)
        df = pd.DataFrame(zip(session_to_leads,lead_to_opps,opps_to_comp),columns=['session_to_leads','lead_to_opps','oops_to_comp'])
    with self.output().open('w') as out_csv:
        out_csv.write(df.to_csv('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+str(self.size)+'statemachine.csv'))
def output(self):
    return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+str(self.size)+'statemachine.csv')

我问过这个问题的类似版本,但每次都改变了,我已经设法解决了大部分最初的问题 - 所以这不是对以前问题的重复

所以根据我的理解,在这种情况下,这应该产生 3 个状态机文件,每个状态机文件有 10 行,用于每次观察和进行的比较。

这三个文件实际上是具有 2 列的文件,第一列是索引,第二列是 0 到 1 之间的概率

我不确定这是我的代码逻辑问题还是我使用 Luigi 的方式问题

检查你的格式。在您的状态机文件中,由于某种原因,您的 with 语句处于 class 级别,而 output 方法处于命名空间级别。