Luigi - 在 运行 时间未完成 %s

Luigi - Unfulfilled %s at run time

我正在尝试以一种非常简单的方式学习 luigi 的工作原理。作为一个新手,我想到了这个代码

import luigi

class class1(luigi.Task):

  def requires(self):
     return class2()

  def output(self):
    return luigi.LocalTarget('class1.txt')

 def run(self):
    print 'IN class A'


class class2(luigi.Task): 

  def requires(self):
     return []

  def output(self):
     return luigi.LocalTarget('class2.txt')


if __name__ == '__main__':
  luigi.run()

运行 这在命令提示符中给出错误提示

raise RuntimeError('Unfulfilled %s at run time: %s' % (deps, ',', '.join(missing)))      

即:

RuntimeError: Unfulfilled dependency at run time: class2__99914b932b  
   

发生这种情况是因为您为 class2 定义了输出但从未创建它。

让我们分解一下...

当运行宁

python file.py class2 --local-scheduler

luigi 会问:

  • class2 的输出已经在磁盘上了吗?没有
  • 检查 class2 的依赖项:NONE
  • 执行run方法(默认为空方法pass
  • 运行 方法没有 return 错误,因此作业成功完成。

但是,当运行宁

python file.py class1 --local-scheduler

路易吉将:

  • class1 的输出已经在磁盘上了吗?没有
  • 检查任务依赖性:是:class2
  • 暂停检查 class2 的状态
    • 磁盘上是class2的输出吗?没有
    • 运行 class2 -> 运行ning -> 没有错误
    • 磁盘上是class2的输出吗?否 -> 引发错误

luigi 永远不会 运行 一个任务,除非它以前的所有依赖项都得到满足。 (即它们的输出在文件系统上)

我也是luigi的初学者。感谢您指出此类错误。

接下来,我设法解决了之前的答案添加到 class2

def run(self):
     _out = self.output().open('w')
     _out.write(u"Hello World!\n")
     _out.close()
     print('in class B')

出现此错误是因为如果您获得永远不会创建的输出。 前任。如果输出文件夹按时间戳创建。 时间戳每秒都在变化,因此它永远不会相同。 所以可能会出现错误。

我也有同样的错误,但我还是没有找到

class data_ingestion(luigi.Task):

def run(self):
    data = pd.read_csv('F:\Mega\MEGAsync\VS Code\winequality-red.csv', sep=';')
    data.to_csv(self.output().path, index=False)

def output(self):
    return luigi.LocalTarget('WineQuality.csv')

class data_prep(luigi.Task):

def requires(self):
    return data_ingestion()

def output(self):
    return [luigi.LocalTarget('Train.csv'), luigi.LocalTarget('Val.csv')]

def run(self):
    data = pd.read_csv('WineQuality.csv') # Lendo de um csv
    logger.info('\n Leitura rápida nos dados')
    data.head()

    column_target = 'quality' # Variável que se deseja prever
    columns_features = data.drop([column_target], axis=1)

    logger.info(f'=== Variável a ser predita: {column_target}')
    logger.info(f'=== Características disponíveis: {columns_features}')

    logger.info('Divisão do dataset em TREINO e TESTE (Validação)')
    data_train, data_val = train_test_split(data, test_size=0.2, stratify=data[column_target], random_state=1)
    
    logger.info(f"Salvando Train File")
    data_train.to_csv(self.output()[0].path, index=False)

    logger.info(f"Salvando Val File")
    data_val.to_csv(self.output()[1].path, index=False)
    

class 培训(luigi.Task):

def requires(self):
    return data_prep()

def output(self):
    return luigi.LocalTarget('joblibe_file') 

def run(self):

    data_train = pd.read_csv(self.input()[0].path)

    column_target = 'quality' # Variável que se deseja prever
    data_features = data_train.drop([column_target], axis=1)
    columns_features = data_features.columns.to_list()

    X_train = data_train[columns_features].values
    Y_train = data_train[column_target].values

    model = DecisionTreeRegressor() # Não implementei nenhum parâmetro pois preciso estudar certinho isso
    model.fit(X_train, Y_train)

    # Salvando o arquivo em um diretório de trabalho
    joblib_file = "joblib_model.pkl"
    joblib.dump(model, joblib_file)

    

class 验证(luigi.Task):

def requires(self):
    return training()

def output(self):
    return luigi.LocalTarget('Metrics.csv')

def run(self):
    data_val = pd.read_csv(self.input()[1].path)
    
    column_target = 'quality' # Variável que se deseja prever
    data_features = data_val.drop([column_target], axis=1)
    columns_features = data_features.columns.to_list()

    X_val = data_val[columns_features].values
    Y_val = data_val[column_target].values
    
    # Importando o modelo salvo no treinamento
    joblib_model = joblib.load(self.input()[0].path) 

    y_val_predict = joblib_model.predict(X_val)
    score = joblib_model.score(X_val, Y_val)

    logger.info('=== Variáveis Preditas')
    logger.info(y_val_predict)
    logger.info('=== Acurácia')
    logger.info('{:.2f} %'.format(score))


    dict = {'Predições': [y_val_predict],
          'score': [score]} 

    df = pd.DataFrame(dict)

    logger.info(f"Salvando Em arquivo CSV para TESTE")
    df.to_csv(self.output()[0].path, index=False)
   
    # salvar várias métricas em um df e exportar

if name == 'main': luigi.run()