使用 crontab 安排 python 脚本
schedule python script with crontab
我有一个 bash 脚本,我正在尝试 运行 使用 cron 作业。我正在尝试 运行 我的 ubuntu 服务器上的 cron 作业。我希望它在世界标准时间每天 8 点 运行。 bash 脚本激活一个 conda python 虚拟环境和 运行 一个 python 脚本。该脚本应该提取数据并将其加载到 mysql 数据库中。我还在整个 python 脚本中进行了日志记录。昨晚数据库中没有新数据出现,也没有创建新日志。下面我展示了 crontab 中的内容以及 stocks_etl.sh 脚本中的内容。有没有人看到可能是什么问题,以及如何解决它?
sudo crontab -e
crontab 显示
0 8 * * * /mnt/data/sda/user_storage/stocks_etl.sh
stocks_etl.sh
#!/bin/bash
source activate py36
python /mnt/data/sda/user_storage/stocks_etl.py
更新#3:
当我 运行 在我的 ubuntu 服务器上的命令行中使用此命令时,它工作正常
bash ~/etl_scripts/stocks_etl.bashrc
当我 运行 它在 crontab 中使用同一个用户时,它会抛出以下错误
错误:
Started stocks_etl.bash
Thu Feb 25 05:20:01 UTC 2021
/home/user/etl_scripts/stocks_etl.bashrc: line 5: activate: No such file or directory
Traceback (most recent call last):
File "/home/user/etl_scripts/stocks_etl.py", line 4, in <module>
import numpy as np
ImportError: No module named numpy
这是 bashrc 文件:
#!/bin/bash -l
echo 'Started stocks_etl.bash'
date +'%a %b %e %H:%M:%S %Z %Y'
source activate py36
python ~/etl_scripts/stocks_etl.py
这就像我在 crontab 中 运行 它找不到 conda,它只是 运行 将它与没有安装 numpy 的基本 python 安装结合在一起。有没有人看到可能是什么问题,你能建议如何解决吗?
更新#2:
现在我已经 运行 chmod 777 在文件上,当 crontab 执行时,我收到以下错误。就像 conda 虚拟环境没有被激活,它只是试图 运行 它与基础 python 安装
错误:
/mnt/data/sda/user_storage/etl_scripts/stocks_etl.sh: line 2: activate: No such file or directory
Traceback (most recent call last):
File "/mnt/data/sda/user_storage/etl_scripts/stocks_etl.py", line 1, in <module>
import numpy as np
ImportError: No module named numpy
更新:
stocks_etl.py
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from yahoofinancials import YahooFinancials
import pymysql
import datetime
import logging
import time
import glob
from sqlalchemy import create_engine
import os
import datetime
# helper functions
# function for creating error logs
# Note: function not currently working, doesn't recognize logger
def error_logger(path):
# adding a timestamp to logname
ts=str(datetime.datetime.now().isoformat())
# logging.basicConfig(filename='example.log',level=logging.DEBUG)
logging.basicConfig(filename=path+ts+'.log', level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(name)s %(message)s')
logger=logging.getLogger(__name__)
# function to query mysql db and return dataframe of results
def mysql_query(user,password,database,host,query):
connection = pymysql.connect(user=user, password=password, database=database, host=host)
try:
with connection.cursor() as cursor:
query = query
df = pd.read_sql(query, connection)
logging.info('query succeeded: '+query)
# finally:
connection.close()
logging.info('close connection mysql')
except Exception as err:
logger.error('query failed: '+query+' got error: '+str(err))
return df
pass
# function to download OHLC stock data
def download_stocks(Ticker_list,start_date,end_date,time_interval,path):
# get data for stocks in Ticker_list and save as csv
failed_list=[]
passed_list=[]
Ticker_list = Ticker_list
for x in range(len(Ticker_list)):
try:
yahoo_financials = YahooFinancials(Ticker_list[x])
# data = yahoo_financials.get_historical_price_data('2019-01-01', '2019-09-30', time_interval='daily')
data = yahoo_financials.get_historical_price_data(start_date, end_date, time_interval=time_interval)
prices_df=pd.DataFrame(data[Ticker_list[x]]['prices'])
prices_df=prices_df[['adjclose', 'close', 'formatted_date', 'high', 'low', 'open',
'volume']]
prices_df['date']=prices_df['formatted_date']
prices_df=prices_df[['date','adjclose', 'close', 'high', 'low', 'open',
'volume']]
prices_df['Ticker']=Ticker_list[x]
prices_df.to_csv(path+Ticker_list[x]+'.csv')
passed_list.append(Ticker_list[x])
logging.info('downloaded: '+Ticker_list[x])
time.sleep(1)
except Exception as err:
failed_list.append(Ticker_list[x])
logger.error('tried download: '+Ticker_list[x]+' got error: '+str(err))
pass
# function read csv in and append to one dataframe
def stock_dataframe(path):
try:
path = path
all_files = glob.glob(path + "/*.csv")
li = []
for filename in all_files:
df = pd.read_csv(filename, index_col=None, header=0)
li.append(df)
frame = pd.concat(li, axis=0, ignore_index=True)
frame=frame[['date', 'adjclose', 'close', 'high', 'low', 'open',
'volume', 'Ticker']]
return frame
logging.info('created stock dataframe')
except Exception as err:
logger.error('stock dataframe create failed got error: '+str(err))
pass
# write dataframe to mysql db
def write_dataframe(username, password, host, schema,dataframe,table,if_exists,index):
try:
from sqlalchemy import create_engine
# connection = pymysql.connect(user='user', password='psswd', database='sandbox', host='xxxxx')
engine = create_engine("mysql+pymysql://"+str(username)+":"+str(password)+"@"+str(host)+"/"+str(schema))
# engine = create_engine("mysql+mysqldb://user:"+'psswd'+"@xxxxx/sandbox")
dataframe.to_sql(con=engine, name=table, if_exists=if_exists, index=index)
logging.info('write_dataframe succeeded')
except Exception as err:
logger.error('write_dataframe failed got error: '+str(err))
pass
# to do
# - create directory with datetime prefix as part of path
# - add step that checks max date in current table
# - only pull data later than max date in current table
# - check max date in current derived table
# - only pull data later than current date from source table
def etl_pipeline(table_var):
i=table_var
max_date_query="""select max(date) as max_date from """+i+""""""
try:
max_date_df=mysql_query(user='user',
password='psswd',
database='stocks',
host='xxxxx',
query=max_date_query)
logging.info('max_date succeeded: '+i)
except Exception as err:
logger.error('max_date failed: '+i)
pass
# In[8]:
try:
# get max date
max_date=max_date_df.astype(str)['max_date'][0]
# create directory
base_path='/mnt/data/sda/user_storage/stock_data_downloads/'
# get current_date
current_date=datetime.datetime.today().strftime('%Y-%m-%d')
directory_path=base_path+i+'/'+current_date
# create directory for downloading new stocks in to
os.mkdir(directory_path)
logging.info('create directory succeeded: '+i)
except Exception as err:
logger.error('create directory failed: '+i)
pass
# In[9]:
# getting ticker symbols
ticker_query="""select distinct ticker as ticker from """+i+""""""
try:
tickers_df=mysql_query(user='user',
password='psswd',
database='stocks',
host='xxxxx',
query=ticker_query)
logging.info('get tickers succeeded: '+i)
except Exception as err:
logger.error('get tickers failed: '+i)
pass
# In[12]:
# get ticker symbols
stocks=tickers_df.ticker.tolist()
# download stocks
# Note: must add '/' to end of path
# '2019-01-01', '2021-01-01', time_interval='daily'
download_stocks(Ticker_list=stocks,
start_date=max_date,
end_date=current_date,
time_interval='daily',
path=directory_path+'/')
# In[70]:
# directory_path
# In[13]:
# create dataframe
stocks_df=stock_dataframe(path=directory_path)
# trav_stocks_df.head()
# In[14]:
# create mysql table
write_dataframe(username='user',
password='psswd',
host='xxxxx',
schema='stocks',
dataframe=stocks_df,
table=i,
if_exists='append',
index=False)
# In[15]:
# creating additional avg annual returns
try:
query="""select ticker, avg(annual_returns) as avg_annual_returns from (
select ticker,date, ( -1 +
a.adjclose / max(a.adjclose) over (partition by ticker
order by date
range between interval 365 day preceding and interval 365 day preceding
)
) as annual_returns
from """+i+""" a
) b where annual_returns is not null
group by ticker"""
df=mysql_query(user='user',password='psswd',database='stocks',host='xxxxx',query=query)
logging.info('etl succeeded: '+i+'_returns')
except Exception as err:
logger.error('etl failed: '+i+'_returns')
pass
# In[16]:
# adding additional avg annual returns to table
# create mysql table
write_dataframe(username='user',
password='psswd',
host='xxxxx',
schema='stocks',
dataframe=df,
table=i+'_returns',
if_exists='replace',
index=False)
# start logging
# adding a timestamp to logname
ts=str(datetime.datetime.now().isoformat())
# logging.basicConfig(filename='example.log',level=logging.DEBUG)
logging.basicConfig(filename='/mnt/data/sda/user_storage/logs/etl_scripts/'+ts+'.log', level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(name)s %(message)s')
logger=logging.getLogger(__name__)
table_list=['trav_stocks','s_and_p','american_mutual_funds']
for j in table_list:
try:
etl_pipeline(j)
logging.info('etl_pipeline succeeded: '+j)
except Exception as err:
logger.error('etl_pipeline failed: '+j)
pass
更新:
我将文件更改为 .bash 文件,并将其中的代码更改为
#!/bin/bash -l
echo ''
'Started stocks_etl.bash'
date +'%a %b %e %H:%M:%S %Z %Y'
source /home/user/anaconda3/envs/py36/bin/activate
conda activate py36
python ~/etl_scripts/stocks_etl.py
现在我在 crontab运行
中收到以下错误
错误:
/home/user/etl_scripts/stocks_etl.bash: line 3: Started stocks_etl.bash: command not found
Fri Feb 26 16:28:01 UTC 2021
/home/user/etl_scripts/stocks_etl.bash: line 7: /home/user/anaconda3/envs/py36/bin/activate: No such file or directory
/home/user/etl_scripts/stocks_etl.bash: line 8: conda: command not found
Traceback (most recent call last):
File "/home/user/etl_scripts/stocks_etl.py", line 4, in <module>
import numpy as np
ImportError: No module named numpy
更新:
代码:
#!/bin/bash
echo ''
'Started stocks_etl.bash'
date +'%a %b %e %H:%M:%S %Z %Y'
/home/user/anaconda3 run -n py36 python ~/user/etl_scripts/stocks_etl.py
错误:
/home/user/etl_scripts/stocks_etl.bash: line 3: Started stocks_etl.bash: command not found
Fri Feb 26 16:43:01 UTC 2021
/home/user/etl_scripts/stocks_etl.bash: line 7: /home/user/anaconda3: Is a directory
您是否检查过您的 bash 文件是否可执行?
如果不是,您应该更改其模式:
chmod 755 /mnt/data/sda/user_storage/stocks_etl.sh
或者用bash显式执行:
0 8 * * * bash /mnt/data/sda/user_storage/stocks_etl.sh
对我来说只是:
crontab -e
输入我的执行线:
0 8 * * * python3 script.py&
并保存。
把'&'放在最后告诉它在后台运行。我正在使用 AWS ubuntu 服务器,所以一切都需要 python3.
首先,source activate
语法在几年前就已弃用(您的 Conda 实例有多大?)- 您应该使用 conda activate
。其次,Conda shell 命令作为采购 .bashrc
或 .bash_profile
的一部分加载到 shell 中。所以至少,你需要在 shebang 中包含 -l
和
#!/bin/bash -l
conda activate py36
python /mnt/data/sda/user_storage/stocks_etl.py
您可能需要做一些额外的事情来确保 .bashrc
它的来源是正确的(例如,哪个用户 运行 作为?)。
请注意,Conda 还有 conda run
命令用于在 envs 中执行命令,我认为应该优先使用:
#!/bin/bash -l
conda run -n py36 python /mnt/data/sda/user_storage/stocks_etl.py
后一种形式在没有 Conda 初始化的情况下也可以工作,但提供 conda
入口点的完整路径:
#!/bin/bash
# change to match where your `conda` location
/home/user/anaconda3/condabin/conda run -n py36 python /mnt/data/sda/user_storage/stocks_etl.py
我有一个 bash 脚本,我正在尝试 运行 使用 cron 作业。我正在尝试 运行 我的 ubuntu 服务器上的 cron 作业。我希望它在世界标准时间每天 8 点 运行。 bash 脚本激活一个 conda python 虚拟环境和 运行 一个 python 脚本。该脚本应该提取数据并将其加载到 mysql 数据库中。我还在整个 python 脚本中进行了日志记录。昨晚数据库中没有新数据出现,也没有创建新日志。下面我展示了 crontab 中的内容以及 stocks_etl.sh 脚本中的内容。有没有人看到可能是什么问题,以及如何解决它?
sudo crontab -e
crontab 显示
0 8 * * * /mnt/data/sda/user_storage/stocks_etl.sh
stocks_etl.sh
#!/bin/bash
source activate py36
python /mnt/data/sda/user_storage/stocks_etl.py
更新#3:
当我 运行 在我的 ubuntu 服务器上的命令行中使用此命令时,它工作正常
bash ~/etl_scripts/stocks_etl.bashrc
当我 运行 它在 crontab 中使用同一个用户时,它会抛出以下错误
错误:
Started stocks_etl.bash
Thu Feb 25 05:20:01 UTC 2021
/home/user/etl_scripts/stocks_etl.bashrc: line 5: activate: No such file or directory
Traceback (most recent call last):
File "/home/user/etl_scripts/stocks_etl.py", line 4, in <module>
import numpy as np
ImportError: No module named numpy
这是 bashrc 文件:
#!/bin/bash -l
echo 'Started stocks_etl.bash'
date +'%a %b %e %H:%M:%S %Z %Y'
source activate py36
python ~/etl_scripts/stocks_etl.py
这就像我在 crontab 中 运行 它找不到 conda,它只是 运行 将它与没有安装 numpy 的基本 python 安装结合在一起。有没有人看到可能是什么问题,你能建议如何解决吗?
更新#2: 现在我已经 运行 chmod 777 在文件上,当 crontab 执行时,我收到以下错误。就像 conda 虚拟环境没有被激活,它只是试图 运行 它与基础 python 安装
错误:
/mnt/data/sda/user_storage/etl_scripts/stocks_etl.sh: line 2: activate: No such file or directory
Traceback (most recent call last):
File "/mnt/data/sda/user_storage/etl_scripts/stocks_etl.py", line 1, in <module>
import numpy as np
ImportError: No module named numpy
更新:
stocks_etl.py
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from yahoofinancials import YahooFinancials
import pymysql
import datetime
import logging
import time
import glob
from sqlalchemy import create_engine
import os
import datetime
# helper functions
# function for creating error logs
# Note: function not currently working, doesn't recognize logger
def error_logger(path):
# adding a timestamp to logname
ts=str(datetime.datetime.now().isoformat())
# logging.basicConfig(filename='example.log',level=logging.DEBUG)
logging.basicConfig(filename=path+ts+'.log', level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(name)s %(message)s')
logger=logging.getLogger(__name__)
# function to query mysql db and return dataframe of results
def mysql_query(user,password,database,host,query):
connection = pymysql.connect(user=user, password=password, database=database, host=host)
try:
with connection.cursor() as cursor:
query = query
df = pd.read_sql(query, connection)
logging.info('query succeeded: '+query)
# finally:
connection.close()
logging.info('close connection mysql')
except Exception as err:
logger.error('query failed: '+query+' got error: '+str(err))
return df
pass
# function to download OHLC stock data
def download_stocks(Ticker_list,start_date,end_date,time_interval,path):
# get data for stocks in Ticker_list and save as csv
failed_list=[]
passed_list=[]
Ticker_list = Ticker_list
for x in range(len(Ticker_list)):
try:
yahoo_financials = YahooFinancials(Ticker_list[x])
# data = yahoo_financials.get_historical_price_data('2019-01-01', '2019-09-30', time_interval='daily')
data = yahoo_financials.get_historical_price_data(start_date, end_date, time_interval=time_interval)
prices_df=pd.DataFrame(data[Ticker_list[x]]['prices'])
prices_df=prices_df[['adjclose', 'close', 'formatted_date', 'high', 'low', 'open',
'volume']]
prices_df['date']=prices_df['formatted_date']
prices_df=prices_df[['date','adjclose', 'close', 'high', 'low', 'open',
'volume']]
prices_df['Ticker']=Ticker_list[x]
prices_df.to_csv(path+Ticker_list[x]+'.csv')
passed_list.append(Ticker_list[x])
logging.info('downloaded: '+Ticker_list[x])
time.sleep(1)
except Exception as err:
failed_list.append(Ticker_list[x])
logger.error('tried download: '+Ticker_list[x]+' got error: '+str(err))
pass
# function read csv in and append to one dataframe
def stock_dataframe(path):
try:
path = path
all_files = glob.glob(path + "/*.csv")
li = []
for filename in all_files:
df = pd.read_csv(filename, index_col=None, header=0)
li.append(df)
frame = pd.concat(li, axis=0, ignore_index=True)
frame=frame[['date', 'adjclose', 'close', 'high', 'low', 'open',
'volume', 'Ticker']]
return frame
logging.info('created stock dataframe')
except Exception as err:
logger.error('stock dataframe create failed got error: '+str(err))
pass
# write dataframe to mysql db
def write_dataframe(username, password, host, schema,dataframe,table,if_exists,index):
try:
from sqlalchemy import create_engine
# connection = pymysql.connect(user='user', password='psswd', database='sandbox', host='xxxxx')
engine = create_engine("mysql+pymysql://"+str(username)+":"+str(password)+"@"+str(host)+"/"+str(schema))
# engine = create_engine("mysql+mysqldb://user:"+'psswd'+"@xxxxx/sandbox")
dataframe.to_sql(con=engine, name=table, if_exists=if_exists, index=index)
logging.info('write_dataframe succeeded')
except Exception as err:
logger.error('write_dataframe failed got error: '+str(err))
pass
# to do
# - create directory with datetime prefix as part of path
# - add step that checks max date in current table
# - only pull data later than max date in current table
# - check max date in current derived table
# - only pull data later than current date from source table
def etl_pipeline(table_var):
i=table_var
max_date_query="""select max(date) as max_date from """+i+""""""
try:
max_date_df=mysql_query(user='user',
password='psswd',
database='stocks',
host='xxxxx',
query=max_date_query)
logging.info('max_date succeeded: '+i)
except Exception as err:
logger.error('max_date failed: '+i)
pass
# In[8]:
try:
# get max date
max_date=max_date_df.astype(str)['max_date'][0]
# create directory
base_path='/mnt/data/sda/user_storage/stock_data_downloads/'
# get current_date
current_date=datetime.datetime.today().strftime('%Y-%m-%d')
directory_path=base_path+i+'/'+current_date
# create directory for downloading new stocks in to
os.mkdir(directory_path)
logging.info('create directory succeeded: '+i)
except Exception as err:
logger.error('create directory failed: '+i)
pass
# In[9]:
# getting ticker symbols
ticker_query="""select distinct ticker as ticker from """+i+""""""
try:
tickers_df=mysql_query(user='user',
password='psswd',
database='stocks',
host='xxxxx',
query=ticker_query)
logging.info('get tickers succeeded: '+i)
except Exception as err:
logger.error('get tickers failed: '+i)
pass
# In[12]:
# get ticker symbols
stocks=tickers_df.ticker.tolist()
# download stocks
# Note: must add '/' to end of path
# '2019-01-01', '2021-01-01', time_interval='daily'
download_stocks(Ticker_list=stocks,
start_date=max_date,
end_date=current_date,
time_interval='daily',
path=directory_path+'/')
# In[70]:
# directory_path
# In[13]:
# create dataframe
stocks_df=stock_dataframe(path=directory_path)
# trav_stocks_df.head()
# In[14]:
# create mysql table
write_dataframe(username='user',
password='psswd',
host='xxxxx',
schema='stocks',
dataframe=stocks_df,
table=i,
if_exists='append',
index=False)
# In[15]:
# creating additional avg annual returns
try:
query="""select ticker, avg(annual_returns) as avg_annual_returns from (
select ticker,date, ( -1 +
a.adjclose / max(a.adjclose) over (partition by ticker
order by date
range between interval 365 day preceding and interval 365 day preceding
)
) as annual_returns
from """+i+""" a
) b where annual_returns is not null
group by ticker"""
df=mysql_query(user='user',password='psswd',database='stocks',host='xxxxx',query=query)
logging.info('etl succeeded: '+i+'_returns')
except Exception as err:
logger.error('etl failed: '+i+'_returns')
pass
# In[16]:
# adding additional avg annual returns to table
# create mysql table
write_dataframe(username='user',
password='psswd',
host='xxxxx',
schema='stocks',
dataframe=df,
table=i+'_returns',
if_exists='replace',
index=False)
# start logging
# adding a timestamp to logname
ts=str(datetime.datetime.now().isoformat())
# logging.basicConfig(filename='example.log',level=logging.DEBUG)
logging.basicConfig(filename='/mnt/data/sda/user_storage/logs/etl_scripts/'+ts+'.log', level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(name)s %(message)s')
logger=logging.getLogger(__name__)
table_list=['trav_stocks','s_and_p','american_mutual_funds']
for j in table_list:
try:
etl_pipeline(j)
logging.info('etl_pipeline succeeded: '+j)
except Exception as err:
logger.error('etl_pipeline failed: '+j)
pass
更新:
我将文件更改为 .bash 文件,并将其中的代码更改为
#!/bin/bash -l
echo ''
'Started stocks_etl.bash'
date +'%a %b %e %H:%M:%S %Z %Y'
source /home/user/anaconda3/envs/py36/bin/activate
conda activate py36
python ~/etl_scripts/stocks_etl.py
现在我在 crontab运行
中收到以下错误错误:
/home/user/etl_scripts/stocks_etl.bash: line 3: Started stocks_etl.bash: command not found
Fri Feb 26 16:28:01 UTC 2021
/home/user/etl_scripts/stocks_etl.bash: line 7: /home/user/anaconda3/envs/py36/bin/activate: No such file or directory
/home/user/etl_scripts/stocks_etl.bash: line 8: conda: command not found
Traceback (most recent call last):
File "/home/user/etl_scripts/stocks_etl.py", line 4, in <module>
import numpy as np
ImportError: No module named numpy
更新:
代码:
#!/bin/bash
echo ''
'Started stocks_etl.bash'
date +'%a %b %e %H:%M:%S %Z %Y'
/home/user/anaconda3 run -n py36 python ~/user/etl_scripts/stocks_etl.py
错误:
/home/user/etl_scripts/stocks_etl.bash: line 3: Started stocks_etl.bash: command not found
Fri Feb 26 16:43:01 UTC 2021
/home/user/etl_scripts/stocks_etl.bash: line 7: /home/user/anaconda3: Is a directory
您是否检查过您的 bash 文件是否可执行?
如果不是,您应该更改其模式:
chmod 755 /mnt/data/sda/user_storage/stocks_etl.sh
或者用bash显式执行:
0 8 * * * bash /mnt/data/sda/user_storage/stocks_etl.sh
对我来说只是:
crontab -e
输入我的执行线:
0 8 * * * python3 script.py&
并保存。
把'&'放在最后告诉它在后台运行。我正在使用 AWS ubuntu 服务器,所以一切都需要 python3.
首先,source activate
语法在几年前就已弃用(您的 Conda 实例有多大?)- 您应该使用 conda activate
。其次,Conda shell 命令作为采购 .bashrc
或 .bash_profile
的一部分加载到 shell 中。所以至少,你需要在 shebang 中包含 -l
和
#!/bin/bash -l
conda activate py36
python /mnt/data/sda/user_storage/stocks_etl.py
您可能需要做一些额外的事情来确保 .bashrc
它的来源是正确的(例如,哪个用户 运行 作为?)。
请注意,Conda 还有 conda run
命令用于在 envs 中执行命令,我认为应该优先使用:
#!/bin/bash -l
conda run -n py36 python /mnt/data/sda/user_storage/stocks_etl.py
后一种形式在没有 Conda 初始化的情况下也可以工作,但提供 conda
入口点的完整路径:
#!/bin/bash
# change to match where your `conda` location
/home/user/anaconda3/condabin/conda run -n py36 python /mnt/data/sda/user_storage/stocks_etl.py