将 url 读入 luigi 参数
Reading url into luigi parameter
我正在尝试使用 Luigi 包读取本地驱动器中的 csv 文件,特别是 luigi.Parameter() 作为文件名,然后使用 pd.read_csv 将其读入 pandas 数据帧并进行一些数据整理。
这是我为此任务编写的代码:
import luigi
import pandas as pd
class read_blog(luigi.Task):
fileName = luigi.Parameter()
def run(self):
full_file = pd.read_csv(fileName)
read_blog = full_file[full_file['properties__url'].string.contain$
regex=False)]
blog_readers = read_blog[['anonymous_id','channel',
'context__campaign__content','context__campaign__medium',
'context__campaign__name','context__campaign_source',
'context__campaign__term','timestamp','user_id',
'context__page__url','properties__url',
'properties__search','context__page__title',
'properties__path','context__user_agent',
'properties__referrer','rank']]
blog_readers.to_csv('blog_readers.csv')
if __name__ == '__main__':
luigi.run()
然后 运行 在终端上使用这个:
python cleanup.py read_blog --local-scheduler --fileName '/Users/emmanuels/Desktop/attribute.csv'
这应该根据我的理解 运行 read_blog class in cleanup.py 并给 fileName 变量一个参数,它是我的 link csv 文件。
然后我的代码应该将 csv 读取为 pandas 数据帧,但是这并没有发生,这是我收到的完整错误消息:
===== Luigi Execution Summary =====
/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/configuration.py:54:UserWarning: LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: /Users/emmanuels/luigi_tutorial/luigi/luigi.conf
warnings.warn("LUIGI_CONFIG_PATH points to a file which does not exist. Invalidfile: {path}".format(path=config_file))
DEBUG: Checking if read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv) is complete
/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/worker.py:328: UserWarning: Task read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv) without outputs has no custom complete() method
is_complete = task.complete()
INFO: Informed scheduler that task read_blog__Users_emmanuels_23aa7e1a57 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 94938] Worker Worker(salt=156803262, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=94938) running read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv)
ERROR: [pid 94938] Worker Worker(salt=156803262, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=94938) failed read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv)
Traceback (most recent call last):
File "/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/worker.py", line 191, in run
new_deps = self._run_get_new_deps()
File "/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/worker.py", line 129, in _run_get_new_deps
task_gen = self.task.run()
File "cleanup.py", line 8, in run
full_file = pd.read_csv(fileName)
NameError: name 'fileName' is not defined
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task read_blog__Users_emmanuels_23aa7e1a57 has status FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 1 pending tasks possibly being run by other workers
DEBUG: There are 1 pending tasks unique to this worker
DEBUG: There are 1 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=156803262, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=94938) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 failed:
- 1 read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv)
This progress looks :( because there were failed tasks
fileName
是 class read_blog
的属性,因此通过 self
!
访问 fileName
full_file = pd.read_csv(self.fileName)
我正在尝试使用 Luigi 包读取本地驱动器中的 csv 文件,特别是 luigi.Parameter() 作为文件名,然后使用 pd.read_csv 将其读入 pandas 数据帧并进行一些数据整理。
这是我为此任务编写的代码:
import luigi
import pandas as pd
class read_blog(luigi.Task):
fileName = luigi.Parameter()
def run(self):
full_file = pd.read_csv(fileName)
read_blog = full_file[full_file['properties__url'].string.contain$
regex=False)]
blog_readers = read_blog[['anonymous_id','channel',
'context__campaign__content','context__campaign__medium',
'context__campaign__name','context__campaign_source',
'context__campaign__term','timestamp','user_id',
'context__page__url','properties__url',
'properties__search','context__page__title',
'properties__path','context__user_agent',
'properties__referrer','rank']]
blog_readers.to_csv('blog_readers.csv')
if __name__ == '__main__':
luigi.run()
然后 运行 在终端上使用这个:
python cleanup.py read_blog --local-scheduler --fileName '/Users/emmanuels/Desktop/attribute.csv'
这应该根据我的理解 运行 read_blog class in cleanup.py 并给 fileName 变量一个参数,它是我的 link csv 文件。
然后我的代码应该将 csv 读取为 pandas 数据帧,但是这并没有发生,这是我收到的完整错误消息:
===== Luigi Execution Summary =====
/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/configuration.py:54:UserWarning: LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: /Users/emmanuels/luigi_tutorial/luigi/luigi.conf
warnings.warn("LUIGI_CONFIG_PATH points to a file which does not exist. Invalidfile: {path}".format(path=config_file))
DEBUG: Checking if read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv) is complete
/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/worker.py:328: UserWarning: Task read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv) without outputs has no custom complete() method
is_complete = task.complete()
INFO: Informed scheduler that task read_blog__Users_emmanuels_23aa7e1a57 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 94938] Worker Worker(salt=156803262, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=94938) running read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv)
ERROR: [pid 94938] Worker Worker(salt=156803262, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=94938) failed read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv)
Traceback (most recent call last):
File "/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/worker.py", line 191, in run
new_deps = self._run_get_new_deps()
File "/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/worker.py", line 129, in _run_get_new_deps
task_gen = self.task.run()
File "cleanup.py", line 8, in run
full_file = pd.read_csv(fileName)
NameError: name 'fileName' is not defined
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task read_blog__Users_emmanuels_23aa7e1a57 has status FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 1 pending tasks possibly being run by other workers
DEBUG: There are 1 pending tasks unique to this worker
DEBUG: There are 1 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=156803262, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=94938) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 failed:
- 1 read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv)
This progress looks :( because there were failed tasks
fileName
是 class read_blog
的属性,因此通过 self
!
fileName
full_file = pd.read_csv(self.fileName)