Luigi - 在 运行 时间未完成 %s
Luigi - Unfulfilled %s at run time
我正在尝试以一种非常简单的方式学习 luigi 的工作原理。作为一个新手,我想到了这个代码
import luigi
class class1(luigi.Task):
def requires(self):
return class2()
def output(self):
return luigi.LocalTarget('class1.txt')
def run(self):
print 'IN class A'
class class2(luigi.Task):
def requires(self):
return []
def output(self):
return luigi.LocalTarget('class2.txt')
if __name__ == '__main__':
luigi.run()
运行 这在命令提示符中给出错误提示
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ',', '.join(missing)))
即:
RuntimeError: Unfulfilled dependency at run time: class2__99914b932b
发生这种情况是因为您为 class2
定义了输出但从未创建它。
让我们分解一下...
当运行宁
python file.py class2 --local-scheduler
luigi 会问:
class2
的输出已经在磁盘上了吗?没有
- 检查
class2
的依赖项:NONE
- 执行
run
方法(默认为空方法pass
)
- 运行 方法没有 return 错误,因此作业成功完成。
但是,当运行宁
python file.py class1 --local-scheduler
路易吉将:
class1
的输出已经在磁盘上了吗?没有
- 检查任务依赖性:是:
class2
- 暂停检查 class2 的状态
- 磁盘上是
class2
的输出吗?没有
- 运行
class2
-> 运行ning -> 没有错误
- 磁盘上是
class2
的输出吗?否 -> 引发错误
luigi 永远不会 运行 一个任务,除非它以前的所有依赖项都得到满足。 (即它们的输出在文件系统上)
我也是luigi的初学者。感谢您指出此类错误。
接下来,我设法解决了之前的答案添加到 class2
def run(self):
_out = self.output().open('w')
_out.write(u"Hello World!\n")
_out.close()
print('in class B')
出现此错误是因为如果您获得永远不会创建的输出。
前任。如果输出文件夹按时间戳创建。
时间戳每秒都在变化,因此它永远不会相同。
所以可能会出现错误。
我也有同样的错误,但我还是没有找到
class data_ingestion(luigi.Task):
def run(self):
data = pd.read_csv('F:\Mega\MEGAsync\VS Code\winequality-red.csv', sep=';')
data.to_csv(self.output().path, index=False)
def output(self):
return luigi.LocalTarget('WineQuality.csv')
class data_prep(luigi.Task):
def requires(self):
return data_ingestion()
def output(self):
return [luigi.LocalTarget('Train.csv'), luigi.LocalTarget('Val.csv')]
def run(self):
data = pd.read_csv('WineQuality.csv') # Lendo de um csv
logger.info('\n Leitura rápida nos dados')
data.head()
column_target = 'quality' # Variável que se deseja prever
columns_features = data.drop([column_target], axis=1)
logger.info(f'=== Variável a ser predita: {column_target}')
logger.info(f'=== Características disponíveis: {columns_features}')
logger.info('Divisão do dataset em TREINO e TESTE (Validação)')
data_train, data_val = train_test_split(data, test_size=0.2, stratify=data[column_target], random_state=1)
logger.info(f"Salvando Train File")
data_train.to_csv(self.output()[0].path, index=False)
logger.info(f"Salvando Val File")
data_val.to_csv(self.output()[1].path, index=False)
class 培训(luigi.Task):
def requires(self):
return data_prep()
def output(self):
return luigi.LocalTarget('joblibe_file')
def run(self):
data_train = pd.read_csv(self.input()[0].path)
column_target = 'quality' # Variável que se deseja prever
data_features = data_train.drop([column_target], axis=1)
columns_features = data_features.columns.to_list()
X_train = data_train[columns_features].values
Y_train = data_train[column_target].values
model = DecisionTreeRegressor() # Não implementei nenhum parâmetro pois preciso estudar certinho isso
model.fit(X_train, Y_train)
# Salvando o arquivo em um diretório de trabalho
joblib_file = "joblib_model.pkl"
joblib.dump(model, joblib_file)
class 验证(luigi.Task):
def requires(self):
return training()
def output(self):
return luigi.LocalTarget('Metrics.csv')
def run(self):
data_val = pd.read_csv(self.input()[1].path)
column_target = 'quality' # Variável que se deseja prever
data_features = data_val.drop([column_target], axis=1)
columns_features = data_features.columns.to_list()
X_val = data_val[columns_features].values
Y_val = data_val[column_target].values
# Importando o modelo salvo no treinamento
joblib_model = joblib.load(self.input()[0].path)
y_val_predict = joblib_model.predict(X_val)
score = joblib_model.score(X_val, Y_val)
logger.info('=== Variáveis Preditas')
logger.info(y_val_predict)
logger.info('=== Acurácia')
logger.info('{:.2f} %'.format(score))
dict = {'Predições': [y_val_predict],
'score': [score]}
df = pd.DataFrame(dict)
logger.info(f"Salvando Em arquivo CSV para TESTE")
df.to_csv(self.output()[0].path, index=False)
# salvar várias métricas em um df e exportar
if name == 'main':
luigi.run()
我正在尝试以一种非常简单的方式学习 luigi 的工作原理。作为一个新手,我想到了这个代码
import luigi
class class1(luigi.Task):
def requires(self):
return class2()
def output(self):
return luigi.LocalTarget('class1.txt')
def run(self):
print 'IN class A'
class class2(luigi.Task):
def requires(self):
return []
def output(self):
return luigi.LocalTarget('class2.txt')
if __name__ == '__main__':
luigi.run()
运行 这在命令提示符中给出错误提示
raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ',', '.join(missing)))
即:
RuntimeError: Unfulfilled dependency at run time: class2__99914b932b
发生这种情况是因为您为 class2
定义了输出但从未创建它。
让我们分解一下...
当运行宁
python file.py class2 --local-scheduler
luigi 会问:
class2
的输出已经在磁盘上了吗?没有- 检查
class2
的依赖项:NONE - 执行
run
方法(默认为空方法pass
) - 运行 方法没有 return 错误,因此作业成功完成。
但是,当运行宁
python file.py class1 --local-scheduler
路易吉将:
class1
的输出已经在磁盘上了吗?没有- 检查任务依赖性:是:
class2
- 暂停检查 class2 的状态
- 磁盘上是
class2
的输出吗?没有 - 运行
class2
-> 运行ning -> 没有错误 - 磁盘上是
class2
的输出吗?否 -> 引发错误
- 磁盘上是
luigi 永远不会 运行 一个任务,除非它以前的所有依赖项都得到满足。 (即它们的输出在文件系统上)
我也是luigi的初学者。感谢您指出此类错误。
接下来,我设法解决了之前的答案添加到 class2
def run(self):
_out = self.output().open('w')
_out.write(u"Hello World!\n")
_out.close()
print('in class B')
出现此错误是因为如果您获得永远不会创建的输出。 前任。如果输出文件夹按时间戳创建。 时间戳每秒都在变化,因此它永远不会相同。 所以可能会出现错误。
我也有同样的错误,但我还是没有找到
class data_ingestion(luigi.Task):
def run(self):
data = pd.read_csv('F:\Mega\MEGAsync\VS Code\winequality-red.csv', sep=';')
data.to_csv(self.output().path, index=False)
def output(self):
return luigi.LocalTarget('WineQuality.csv')
class data_prep(luigi.Task):
def requires(self):
return data_ingestion()
def output(self):
return [luigi.LocalTarget('Train.csv'), luigi.LocalTarget('Val.csv')]
def run(self):
data = pd.read_csv('WineQuality.csv') # Lendo de um csv
logger.info('\n Leitura rápida nos dados')
data.head()
column_target = 'quality' # Variável que se deseja prever
columns_features = data.drop([column_target], axis=1)
logger.info(f'=== Variável a ser predita: {column_target}')
logger.info(f'=== Características disponíveis: {columns_features}')
logger.info('Divisão do dataset em TREINO e TESTE (Validação)')
data_train, data_val = train_test_split(data, test_size=0.2, stratify=data[column_target], random_state=1)
logger.info(f"Salvando Train File")
data_train.to_csv(self.output()[0].path, index=False)
logger.info(f"Salvando Val File")
data_val.to_csv(self.output()[1].path, index=False)
class 培训(luigi.Task):
def requires(self):
return data_prep()
def output(self):
return luigi.LocalTarget('joblibe_file')
def run(self):
data_train = pd.read_csv(self.input()[0].path)
column_target = 'quality' # Variável que se deseja prever
data_features = data_train.drop([column_target], axis=1)
columns_features = data_features.columns.to_list()
X_train = data_train[columns_features].values
Y_train = data_train[column_target].values
model = DecisionTreeRegressor() # Não implementei nenhum parâmetro pois preciso estudar certinho isso
model.fit(X_train, Y_train)
# Salvando o arquivo em um diretório de trabalho
joblib_file = "joblib_model.pkl"
joblib.dump(model, joblib_file)
class 验证(luigi.Task):
def requires(self):
return training()
def output(self):
return luigi.LocalTarget('Metrics.csv')
def run(self):
data_val = pd.read_csv(self.input()[1].path)
column_target = 'quality' # Variável que se deseja prever
data_features = data_val.drop([column_target], axis=1)
columns_features = data_features.columns.to_list()
X_val = data_val[columns_features].values
Y_val = data_val[column_target].values
# Importando o modelo salvo no treinamento
joblib_model = joblib.load(self.input()[0].path)
y_val_predict = joblib_model.predict(X_val)
score = joblib_model.score(X_val, Y_val)
logger.info('=== Variáveis Preditas')
logger.info(y_val_predict)
logger.info('=== Acurácia')
logger.info('{:.2f} %'.format(score))
dict = {'Predições': [y_val_predict],
'score': [score]}
df = pd.DataFrame(dict)
logger.info(f"Salvando Em arquivo CSV para TESTE")
df.to_csv(self.output()[0].path, index=False)
# salvar várias métricas em um df e exportar
if name == 'main': luigi.run()