产生所需的 luigi 输出的问题
Problems producing desired luigi output
我正在尝试创建一个接收 3 个文件的管道,从每个文件中获取 n 行(由 obs_num 表示)将文件中的每个值与 0 和之间的随机浮点数进行比较1 如果它大于随机数,则 returns 为 obs_num,否则为 false。然后我将这些值附加到列表(列表 1)
我接着看下一个第二个文件,检查obs_num的位置,如果相同位置的num returned false 在上一个文件中return false,或者再次检查 num 是否大于随机浮点数,然后我对第三个文件做同样的事情。我还将这些值附加到列表(列表 2 和 3)
然后我将这 3 个列表转换为数据框,每个列表代表一列。
但是问题是,当我 运行 我的管道输出是一个包含一个空白列的文件,而不是一个 csv 行等同于 obs_num.
这是我用于包装器的代码:
import pandas as pd
import luigi
import state_to_state_machine as ssm
class wrapper(luigi.WrapperTask):
def requires(self):
file_tag = ['Sessiontolead','leadtoopportunity','opportunitytocomplete']
size = 10
for j in range(1,int(size)):
return[ssm.state_machine(file_tag=i,size=size,obs_nums=j)for i in file_tag]
def run(self):
print('The wrapper is complete')
pd.DataFrame().to_csv('/Users/emm/Documents/AttributionData/Data/datawranglerwrapper3.csv') #never returns anything
def output(self):
return luigi.LocalTarget('/Users/emm/Documents/AttributionData/Data/datawranglerwrapper3.csv')
if __name__ == '__main__':
luigi.build([wrapper()],workers=8,local_scheduler=True)
状态机:
import pandas as pd
import get_samples as gs
import luigi
import random
class state_machine(luigi.Task):
file_tag = luigi.Parameter()
obs_nums = luigi.Parameter() #directly get element - don't write to file
size = luigi.Parameter()
def run(self):
path = '/Users/emm/Documents/AttributionData/Data/Probabilities/'
file = path+self.file_tag+'sampleprobs.csv'
def generic_state_machine(tag,file=file,obs_nums=self.obs_nums):
if file.split('/')[7][:4] == tag:
state_machine = pd.read_csv(file)
return state_machine.ix[:,1][obs_nums] if s.ix[:,1][obs_nums] > random.uniform(0,1) else False
session_to_leads = []
lead_to_opps = []
opps_to_comp = []
session_to_leads.append(generic_state_machine(tag='Sessiontoload+sampleprobabs',file=file,obs_nums=self.obs_nums))
lead_to_opps.append(generic_state_machine(tag='leadtoopportunity+sampleprobabs',file=file,obs_nums=self.obs_nums)) if session_to_leads[self.obs_nums-1] != False else lead_to_opps.append(False)
opps_to_comp.append(generic_state_machine(tag='opportunitytocomplete+sampleprobabs',file=file,obs_nums=self.obs_nums)) if lead_to_opps[self.obs_nums-1] != False else opps_to_comps.append(False)
df = pd.DataFrame(zip(session_to_leads,lead_to_opps,opps_to_comp),columns=['session_to_leads','lead_to_opps','oops_to_comp'])
with self.output().open('w') as out_csv:
out_csv.write(df.to_csv('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+str(self.size)+'statemachine.csv'))
def output(self):
return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+str(self.size)+'statemachine.csv')
我问过这个问题的类似版本,但每次都改变了,我已经设法解决了大部分最初的问题 - 所以这不是对以前问题的重复
所以根据我的理解,在这种情况下,这应该产生 3 个状态机文件,每个状态机文件有 10 行,用于每次观察和进行的比较。
这三个文件实际上是具有 2 列的文件,第一列是索引,第二列是 0 到 1 之间的概率
我不确定这是我的代码逻辑问题还是我使用 Luigi 的方式问题
检查你的格式。在您的状态机文件中,由于某种原因,您的 with
语句处于 class 级别,而 output
方法处于命名空间级别。
我正在尝试创建一个接收 3 个文件的管道,从每个文件中获取 n 行(由 obs_num 表示)将文件中的每个值与 0 和之间的随机浮点数进行比较1 如果它大于随机数,则 returns 为 obs_num,否则为 false。然后我将这些值附加到列表(列表 1)
我接着看下一个第二个文件,检查obs_num的位置,如果相同位置的num returned false 在上一个文件中return false,或者再次检查 num 是否大于随机浮点数,然后我对第三个文件做同样的事情。我还将这些值附加到列表(列表 2 和 3)
然后我将这 3 个列表转换为数据框,每个列表代表一列。
但是问题是,当我 运行 我的管道输出是一个包含一个空白列的文件,而不是一个 csv 行等同于 obs_num.
这是我用于包装器的代码:
import pandas as pd
import luigi
import state_to_state_machine as ssm
class wrapper(luigi.WrapperTask):
def requires(self):
file_tag = ['Sessiontolead','leadtoopportunity','opportunitytocomplete']
size = 10
for j in range(1,int(size)):
return[ssm.state_machine(file_tag=i,size=size,obs_nums=j)for i in file_tag]
def run(self):
print('The wrapper is complete')
pd.DataFrame().to_csv('/Users/emm/Documents/AttributionData/Data/datawranglerwrapper3.csv') #never returns anything
def output(self):
return luigi.LocalTarget('/Users/emm/Documents/AttributionData/Data/datawranglerwrapper3.csv')
if __name__ == '__main__':
luigi.build([wrapper()],workers=8,local_scheduler=True)
状态机:
import pandas as pd
import get_samples as gs
import luigi
import random
class state_machine(luigi.Task):
file_tag = luigi.Parameter()
obs_nums = luigi.Parameter() #directly get element - don't write to file
size = luigi.Parameter()
def run(self):
path = '/Users/emm/Documents/AttributionData/Data/Probabilities/'
file = path+self.file_tag+'sampleprobs.csv'
def generic_state_machine(tag,file=file,obs_nums=self.obs_nums):
if file.split('/')[7][:4] == tag:
state_machine = pd.read_csv(file)
return state_machine.ix[:,1][obs_nums] if s.ix[:,1][obs_nums] > random.uniform(0,1) else False
session_to_leads = []
lead_to_opps = []
opps_to_comp = []
session_to_leads.append(generic_state_machine(tag='Sessiontoload+sampleprobabs',file=file,obs_nums=self.obs_nums))
lead_to_opps.append(generic_state_machine(tag='leadtoopportunity+sampleprobabs',file=file,obs_nums=self.obs_nums)) if session_to_leads[self.obs_nums-1] != False else lead_to_opps.append(False)
opps_to_comp.append(generic_state_machine(tag='opportunitytocomplete+sampleprobabs',file=file,obs_nums=self.obs_nums)) if lead_to_opps[self.obs_nums-1] != False else opps_to_comps.append(False)
df = pd.DataFrame(zip(session_to_leads,lead_to_opps,opps_to_comp),columns=['session_to_leads','lead_to_opps','oops_to_comp'])
with self.output().open('w') as out_csv:
out_csv.write(df.to_csv('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+str(self.size)+'statemachine.csv'))
def output(self):
return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+str(self.size)+'statemachine.csv')
我问过这个问题的类似版本,但每次都改变了,我已经设法解决了大部分最初的问题 - 所以这不是对以前问题的重复
所以根据我的理解,在这种情况下,这应该产生 3 个状态机文件,每个状态机文件有 10 行,用于每次观察和进行的比较。
这三个文件实际上是具有 2 列的文件,第一列是索引,第二列是 0 到 1 之间的概率
我不确定这是我的代码逻辑问题还是我使用 Luigi 的方式问题
检查你的格式。在您的状态机文件中,由于某种原因,您的 with
语句处于 class 级别,而 output
方法处于命名空间级别。