将 url 读入 luigi 参数

Reading url into luigi parameter

我正在尝试使用 Luigi 包读取本地驱动器中的 csv 文件,特别是 luigi.Parameter() 作为文件名,然后使用 pd.read_csv 将其读入 pandas 数据帧并进行一些数据整理。

这是我为此任务编写的代码:

import luigi
import pandas as pd
class read_blog(luigi.Task):
        fileName = luigi.Parameter()
        def run(self):
                full_file = pd.read_csv(fileName)
                read_blog = full_file[full_file['properties__url'].string.contain$
                        regex=False)]
                blog_readers = read_blog[['anonymous_id','channel',
                        'context__campaign__content','context__campaign__medium',
                        'context__campaign__name','context__campaign_source',
                        'context__campaign__term','timestamp','user_id',
                        'context__page__url','properties__url',
                        'properties__search','context__page__title',
                        'properties__path','context__user_agent',
                        'properties__referrer','rank']]
                blog_readers.to_csv('blog_readers.csv')
if __name__ == '__main__':
        luigi.run()

然后 运行 在终端上使用这个:

python cleanup.py read_blog --local-scheduler --fileName '/Users/emmanuels/Desktop/attribute.csv'

这应该根据我的理解 运行 read_blog class in cleanup.py 并给 fileName 变量一个参数,它是我的 link csv 文件。

然后我的代码应该将 csv 读取为 pandas 数据帧,但是这并没有发生,这是我收到的完整错误消息:

===== Luigi Execution Summary =====

/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/configuration.py:54:UserWarning: LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: /Users/emmanuels/luigi_tutorial/luigi/luigi.conf
  warnings.warn("LUIGI_CONFIG_PATH points to a file which does not exist. Invalidfile: {path}".format(path=config_file))
DEBUG: Checking if read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv) is complete
/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/worker.py:328: UserWarning: Task read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv) without outputs has no custom complete() method
  is_complete = task.complete()
INFO: Informed scheduler that task   read_blog__Users_emmanuels_23aa7e1a57   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 94938] Worker Worker(salt=156803262, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=94938) running   read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv)
ERROR: [pid 94938] Worker Worker(salt=156803262, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=94938) failed    read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv)
Traceback (most recent call last):
  File "/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/worker.py", line 191, in run
    new_deps = self._run_get_new_deps()
  File "/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/worker.py", line 129, in _run_get_new_deps
    task_gen = self.task.run()
  File "cleanup.py", line 8, in run
    full_file = pd.read_csv(fileName)
NameError: name 'fileName' is not defined
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   read_blog__Users_emmanuels_23aa7e1a57   has status   FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 1 pending tasks possibly being run by other workers
DEBUG: There are 1 pending tasks unique to this worker
DEBUG: There are 1 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=156803262, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=94938) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 failed:
    - 1 read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv)

This progress looks :( because there were failed tasks

fileName 是 class read_blog 的属性,因此通过 self!

访问 fileName

full_file = pd.read_csv(self.fileName)