Gunicorn 未在 Flask 应用程序的表单提交上重新加载
Gunicorn not reloading on form submit of flask app
我是 flask 和 gunicorn 的新手。我正在使用 gunicorn 进行产品托管。我有一个提交按钮,它将表单数据存储在数据库中;我希望从数据库中获取新存储的数据,并在重新加载页面时显示出来。我观察到的是,Gunicorn 在重新启动之前不会重新加载任何内容。谁能帮我在不停止服务器的情况下重新加载数据?
- 我试过在启动 gunicorn 时使用 --reload,但没有用。我的应用程序曾经在 Flask 的开发服务器上按预期工作。所以基本上我想要像 gunicorn 中的这个功能:app.run(host=so.gethostbyname(so.gethostname()), port=v_port, debug=True, use_reloader =真)。 TIA
############################ This is main_app.py #######################
from flask import Flask, render_template, request, redirect, send_file, send_from_directory, current_app, flash, abort, jsonify, session, url_for
import os as os
import sys as sys
from ldap3 import Server, Connection, NTLM
import ldap3
import socket as so
import subprocess
from subprocess import Popen, PIPE
from subprocess import check_output
import ssl
v_port = 2001
v_project_path = os.path.dirname(os.path.abspath(__file__)).replace('\scripts','')
v_scripts_path = v_project_path + os.sep + 'scripts' + os.sep
sys.path.insert(0,v_scripts_path)
app = Flask(__name__)
import get_status as gs
#import save_cmnt as cmt
import base64
import struct
import cx_Oracle
from Crypto.Cipher import AES
SECRET_KEY = b"xyz"
class AESCipher(object):
def __init__(self, key):
self.bs = AES.block_size
self.key = key
def _pad(self, s):
return s + (self.bs - len(s) % self.bs) * '0'
def decrypt(self, enc):
enc = base64.b64decode(enc)
iv = enc[:self.bs]
raw_size = struct.unpack('<i', enc[self.bs:self.bs + 4])[0]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
raw_bytes = cipher.decrypt(enc[self.bs + 4:])
print (raw_bytes)
raw = raw_bytes[:raw_size].decode('utf_8')
return raw
pass_f = open(r'C:\unix_report_status\python_db_pass.txt', "rb")
#pass_f = open(r'/apps/axiom_app/flask/python_db_pass.txt', "rb")
enc_db_pass=pass_f.read()
c = AESCipher(SECRET_KEY)
db_pass = c.decrypt(enc_db_pass)
ora_con = cx_Oracle.connect('axiom_data2/'+db_pass+'@RRANYT02')
ora_cur = ora_con.cursor()
def validate_credentials(connection):
try:
connection.bind()
return True
except Exception:
pass
return False
@app.route('/')
def index():
if 'username' in session:
username = session['username']
return 'Logged in as ' + username + '<br>' + "<b><a href = '/logout'>click here to log out</a></b>"
return render_template("login.htm")
@app.route("/login", methods=[ "GET","POST"])
def login():
print('inside login')
if (request.method == 'POST'):
print('in post')
username = request.form['username']
password = request.form['password']
print('login Name ' + username)
server = Server(host='host',
port=636,
get_info=ldap3.ALL,
use_ssl=True,
allowed_referral_hosts='*',
mode='IP_V6_PREFERRED')
connection = Connection(server,
auto_bind=False,
receive_timeout=1000,
client_strategy=ldap3.SYNC,
raise_exceptions=True,
version=3, user=r"intra\{0}".format(username),
password=password,
authentication=NTLM,
check_names=True,
read_only=True, auto_referrals=True)
connection.start_tls()
User_return_value=validate_credentials(connection)
# User_return_value = gs.user_login_auth(login_name)
print('----- ',User_return_value)
if User_return_value == False:
return render_template("wrong_cred.htm")
else :
##check if user has access
ora_cur.execute("""select user_role from rs_user_access where username='"""+username+"""'""")
user_role_details = ora_cur.fetchall()
if len(user_role_details)==0:
return render_template("unauthorised.htm")
else:
user_role=user_role_details[0][0]
session['username']= username
session.permanent = True
return redirect(url_for('dashboard'))
return render_template("login.htm")
@app.route('/logout', methods=["GET", "POST"])
def logout():
session.pop('username', None)
# response = response()
return render_template("login.htm")
@app.route('/report_calender', methods=["GET", "POST"])
def report_calender():
return render_template("dashboard.htm")
@app.route('/admin', methods=["GET", "POST"])
def admin():
gs.get_admin_page(session['username'])
if (request.method == 'POST') :
print('~~~~~~~~', 'adm post')
else:
print('~~~~~~~~', 'adm not post')
admin_flag=request.form['admin_flag']
print('~~~~~~~~~~~~~~', admin_flag)
if admin_flag=='0':
return render_template("admin_page.htm")
elif admin_flag=='1':
user_name = request.form['user_name']
user_role = request.form['user_role']
gs.add_user_role(user_name,user_role)
gs.get_admin_page(session['username'])
return render_template("admin_page.htm")
elif admin_flag=='2':
rm_user_name = request.form['rm_user_name']
gs.remove_user_role(rm_user_name)
gs.get_admin_page(session['username'])
print('~~~~ removing', rm_user_name)
return render_template("admin_page.htm")
else:
gs.get_admin_page(session['username'])
return render_template("admin_page.htm")
@app.route("/dashboard", methods=["GET", "POST"])
def dashboard():
if 'username' in session:
if (request.method == 'POST') :
dashboard_flag = request.form['dashboard_flag']
if dashboard_flag=='0':
print('normal refresh')
gs.get_status(session['username'])
else:
print('in post')
job_name = request.form['job_name']
email_flag = request.form['email_flag']
if email_flag=='1':
print('in email')
recipients = request.form['email_id']
email_content = request.form['email_content']
print(recipients)
email_file = open(r'C:\unix_report_status\templates\email.txt', "w")
email_file.write(email_content)
email_file.close()
# subprocess.call(['./send_email.sh'])
gs.get_status(session['username'])
elif job_name:
print('in delay')
job_name = request.form['job_name']
job_delay = request.form['job_delay']
job_cmt = request.form['job_cmt']
gs.submit_cmt(job_name,job_cmt,job_delay,session['username'])
gs.get_status(session['username'])
else:
gs.get_status(session['username'])
else:
print('normal refresh')
gs.get_status(session['username'])
return render_template("dashboard.htm")
else:
return render_template("wrong_cred.htm")
@app.after_request
def set_response_headers(response):
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
if __name__ == '__main__':
app.secret_key = os.urandom(12)
app.run(host=so.gethostbyname(so.gethostname()), port=v_port, debug=True, use_reloader=True)
from gevent.pywsgi import WSGIServer
http_server = WSGIServer((so.gethostbyname(so.gethostname())))
http_server.serve_forever()
############################ This is get_status.py ######################
import cx_Oracle
from datetime import datetime, time, timedelta
import pytz
#import sqlite3
import base64
import struct
#from Crypto import Random
from Crypto.Cipher import AES
SECRET_KEY = b"xyz"
class AESCipher(object):
def __init__(self, key):
self.bs = AES.block_size
self.key = key
def _pad(self, s):
return s + (self.bs - len(s) % self.bs) * '0'
def decrypt(self, enc):
enc = base64.b64decode(enc)
iv = enc[:self.bs]
raw_size = struct.unpack('<i', enc[self.bs:self.bs + 4])[0]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
raw_bytes = cipher.decrypt(enc[self.bs + 4:])
print (raw_bytes)
raw = raw_bytes[:raw_size].decode('utf_8')
return raw
pass_f = open(r'C:\unix_report_status\python_db_pass.txt', "rb")
#pass_f = open(r'/apps/axiom_app/flask/python_db_pass.txt', "rb")
enc_db_pass=pass_f.read()
c = AESCipher(SECRET_KEY)
db_pass = c.decrypt(enc_db_pass)
ora_con = cx_Oracle.connect('axiom_data2/'+db_pass+'@RRANYT02')
ora_cur = ora_con.cursor()
ora_cur.execute("""select env from rs_env_details""")
env_details = ora_cur.fetchall()
env = (env_details[0][0])
def get_status(username):
ora_con = cx_Oracle.connect('axiom_data2/'+db_pass+'@RRANYT02')
ora_cur = ora_con.cursor()
tz_NY = pytz.timezone('America/New_York')
datetime_NY = datetime.now(tz_NY)
#print("NY time:", datetime_NY.strftime("%H:%M:%S"))
est_time= datetime_NY.strftime("%H:%M:%S")
est_date= datetime_NY.strftime("%d-%m-%y")
cur_time = datetime.strptime(est_date+" "+est_time,'%d-%m-%y %H:%M:%S')
cur_time = cur_time.strftime('%d-%m-%y %H:%M')
cur_time = datetime.strptime(cur_time,'%d-%m-%y %H:%M')
print(cur_time)
# c = AESCipher(SECRET_KEY)
# db_pass = c.decrypt(enc_db_pass)
# print(db_pass)
ora_cur.execute("""select user_role from rs_user_access where username='"""+username+"""'""")
user_role_details = ora_cur.fetchall()
if len(user_role_details)==0:
user_role='view'
else:
user_role=user_role_details[0][0]
print(user_role)
ora_cur.execute("""select env from rs_env_details""")
env_details = ora_cur.fetchall()
env = (env_details[0][0])
# ora_cur.execute("""select BD from from axiom_job_da467762 where job_name='axm_c_rpt_tic_b_bl1_bd'""")
# bd_details = ora_cur.fetchall()
# bd = (bd_details[0][0])
ora_cur.execute("""select t1.job_name,report_name,t1.report_type,report_sla,avg_run_time,avg_start_time,
status,start_time,end_time,delay_hrs,nvl(comments,'-') comments from rs_job_master t1
right join rs_job_status t2 on
t1.job_name=t2.job_name
left join (select run_date,job_name,delay_hrs,comments
from rs_job_delay where run_date='"""+est_date+"""' and id in (select max(id) from rs_job_delay group by JOB_NAME)) t3
on t1.job_name=t3.job_name
left join axiom_job_d1108822 t4
on t1.job_name=t4.job_name where run_flag='Y' and t1.report_type!='MTY'
order by t1.report_type""")
html_s="""</br><div class="row" id="div_generate" >
<text for="cob_date"> <b>Run Date</b> : """+datetime_NY.strftime("%d-%b-%y")+"""</text>
<text for="cob_date"> <b>ME BD </b>: """+'7'+""" </text>
<text for="est_time"> <b>EST Time</b> : """+(datetime.strftime(cur_time,'%d-%b-%y %H:%M'))+"""</text>
</div>
<table class="table1"><tr class="row2"><th align="left" bgcolor="b3d9ff" width="3%" width='90%'>Type</th>
<th align="left" bgcolor="b3d9ff" width="10%" >Report Name</th>
<th align="left" bgcolor="b3d9ff" width="7%">Status</th>
<th align="left" bgcolor="b3d9ff" width="10%">Start Time</th>
<th align="left" bgcolor="b3d9ff" width="10%">End Time</th>
<th align="left" bgcolor="b3d9ff"width="5%">Breach</th>
<th align="left" bgcolor="b3d9ff"width="5%">SLA</th>
<th align="left" bgcolor="b3d9ff"width="5%">ETA</th>
<th align="left" bgcolor="b3d9ff"width="3%">Delay (hrs)</th>
<th align="left" bgcolor="b3d9ff"width="20%">Comments</th>
<th align="left" bgcolor="b3d9ff"width="3%"> </th></tr>"""
email_content="""<table class="table1">
<tr class="row2"><th align="left" bgcolor="b3d9ff" width="3%" >Type</th>
<th align="left" bgcolor="b3d9ff" width="10%" >Report Name</th>
<th align="left" bgcolor="b3d9ff" width="7%">Status</th>
<th align="left" bgcolor="b3d9ff" width="10%">Start Time</th>
<th align="left" bgcolor="b3d9ff" width="10%">End Time</th>
<th align="left" bgcolor="b3d9ff"width="5%">Breach</th>
<th align="left" bgcolor="b3d9ff"width="5%">SLA</th>
<th align="left" bgcolor="b3d9ff"width="5%">ETA</th>
<th align="left" bgcolor="b3d9ff"width="3%">Delay (hrs)</th>
<th align="left" bgcolor="b3d9ff"width="20%">Comments</th></tr>"""
rows = ora_cur.fetchall()
if rows:
table_data=""
email_data=""
for row in rows:
job_name=row[0]
report_name=str(row[1])
report_type=row[2]
report_sla=row[3]
avg_run_time=row[4]
avg_start_time=row[5]
status=row[6]
start_time=row[7]
end_time=row[8]
delay_hrs=row[9]
print("delay_hrs-",delay_hrs)
comments=row[10]
print("comments-",comments)
# print(job_name)
# print(end_time)
sla_breach="-"
eta='-'
# print(start_time)
### ETA and SLA CALCULATION
sla_time = est_date+' '+report_sla
sla_time = datetime.strptime(sla_time,'%d-%m-%y %H:%M')
sla_time = sla_time.strftime('%d-%m-%y %H:%M')
sla_time = datetime.strptime(sla_time,'%d-%m-%y %H:%M')
# print(job_name)
if status=='ACTIVATED' and delay_hrs is not None:
print("here- ",delay_hrs)
avg_start_time=est_date+" "+avg_start_time
avg_start_time=datetime.strptime(avg_start_time,'%d-%m-%y %H:%M')
total_delay=delay_hrs+avg_run_time
new_eta = avg_start_time + timedelta(hours=total_delay)
new_eta=new_eta.strftime('%H:%M')
eta = new_eta
b_eta=est_date+' '+eta
b_eta=datetime.strptime(b_eta,'%d-%m-%y %H:%M')
print(b_eta, sla_time)
if sla_time<b_eta:
sla_breach='YES'
# print('Breaching')
else:
sla_breach='NO'
# print('Not Breaching')
elif status=='ACTIVATED' and delay_hrs is None:
avg_start_time=est_date+" "+avg_start_time
avg_start_time=datetime.strptime(avg_start_time,'%d-%m-%y %H:%M')
new_eta = avg_start_time + timedelta(hours=avg_run_time)
new_eta=new_eta.strftime('%H:%M')
eta = new_eta
# print(report_name)
# print(avg_start_time,sla_time,avg_start_time + timedelta(hours=avg_run_time))
if (avg_start_time + timedelta(hours=avg_run_time))>sla_time:
sla_breach='YES'
# print('Breaching')
else:
sla_breach='NO'
# print('Not Breaching')
elif status=='SUCCESS':
eta = 'NA'
end_time=datetime.strptime(end_time, "%m/%d/%Y %H:%M:%S")
end_time = end_time.strftime('%d-%m-%y %H:%M')
end_time=datetime.strptime(end_time,'%d-%m-%y %H:%M')
start_time=datetime.strptime(start_time,"%m/%d/%Y %H:%M:%S")
start_time = start_time.strftime('%d-%m-%y %H:%M')
start_time=datetime.strptime(start_time,'%d-%m-%y %H:%M')
# start_time='Not Started'
# print(end_time, sla_time)
if end_time>sla_time:
sla_breach='YES'
# print('Breached')
else:
sla_breach='NO'
# print('Not Breached')
elif status=='RUNNING':
# start_time=est_date+' '+start_time
start_time=datetime.strptime(start_time,"%m/%d/%Y %H:%M:%S")
start_time = start_time.strftime('%d-%m-%y %H:%M')
start_time=datetime.strptime(start_time,'%d-%m-%y %H:%M')
new_eta=start_time + timedelta(hours=avg_run_time)
new_eta = new_eta.strftime('%H:%M')
eta = new_eta
if (start_time+ timedelta(hours=avg_run_time))>sla_time:
sla_breach='YES'
# print('Breaching')
else:
sla_breach='NO'
# print('Not Breaching')
else:
if start_time != '-':
start_time=datetime.strptime(start_time,"%m/%d/%Y %H:%M:%S")
start_time = start_time.strftime('%d-%m-%y %H:%M')
start_time=datetime.strptime(start_time,'%d-%m-%y %H:%M')
if end_time != '-':
end_time=datetime.strptime(end_time, "%m/%d/%Y %H:%M:%S")
end_time = end_time.strftime('%d-%m-%y %H:%M')
end_time=datetime.strptime(end_time,'%d-%m-%y %H:%M')
eta = '-'
### ETA and SLACALCULATION ENDS
print("start_time:-",start_time)
if start_time == '-':
start_time='Not Started'
else:
start_time = start_time.strftime('%H:%M')
start_time=str(start_time)
if end_time == '-':
end_time='-'
print('1')
else:
# end_time=datetime.strptime(end_time, "%m/%d/%Y %H:%M:%S")
# end_time = end_time.strftime('%d-%m-%y %H:%M')
# end_time=datetime.strptime(end_time,'%d-%m-%y %H:%M')
end_time = end_time.strftime('%H:%M')
end_time=str(end_time)
# print('2')
if delay_hrs == None:
delay_hrs = '-'
else:
delay_hrs = str(delay_hrs)
# print(start_time)
# print(type(status))
print(report_name,start_time,end_time,report_sla)
table_data=table_data+"""<tr>
<td>"""+report_type+"""</td>
<td>"""+report_name+"""</td>
<td>"""+status+"""</td>
<td>"""+start_time+"""</td>
<td>"""+end_time+"""</td>"""
if sla_breach=='YES':
table_data=table_data+"""<td bgcolor="red"><p style="color:FFFFFF">"""+sla_breach+"""</p></td>"""
elif sla_breach=='NO':
table_data=table_data+"""<td bgcolor="green"><p style="color:FFFFFF">"""+sla_breach+"""</p></td>"""
else:
table_data=table_data+"""<td>"""+sla_breach+"""</td>"""
table_data=table_data+"""<td>"""+report_sla+"""</td>
<td>"""+eta+"""</td>
<td><input type='text' height='3' style='border: 1px solid #ccc; border-radius: 4px; box-sizing: border-box;' size="1" id='"""+job_name+"""_d' value='"""+delay_hrs+"""'></td>
<td><textarea style='border: 1px solid #ccc; border-radius: 4px; ' rows='2' cols='50' name='"""+job_name+"""_c' id='"""+job_name+"""_c'>"""+comments+"""</textarea></td>"""
if user_role=='admin' or user_role=='superuser':
table_data=table_data+"""<td><button name='"""+job_name+"""_b' type='' value='' onclick="btn_sv('"""+job_name+"""')">Save</button></td>"""
else:
table_data=table_data+"""<td></td>"""
table_data=table_data+"""</tr>"""
email_data=email_data+"""
<tr><td>"""+report_type+"""</td>
<td>"""+report_name+"""</td>
<td>"""+status+"""</td>
<td>"""+start_time+"""</td>
<td>"""+end_time+"""</td>
"""
if sla_breach=='YES':
email_data=email_data+"""<td bgcolor="red"><p style="color:black">"""+sla_breach+"""</p></td>"""
elif sla_breach=='NO':
email_data=email_data+"""<td bgcolor="green"><p style="color:black">"""+sla_breach+"""</p></td>"""
else:
email_data=email_data+"""<td>"""+sla_breach+"""</td>"""
email_data=email_data+"""<td>"""+report_sla+"""</td>
<td>"""+eta+"""</td>
<td>"""+delay_hrs+"""</td>
<td>"""+comments+"""</td></tr>
"""
email_e="""</table>
<p>
For further queries, please contact <a href="mailto:saket.parab@barclays.com">Saket Parab.</a>
<p>
Regards,
<br>
<b>Axiom Dev</b>
</p>
</body></html>"""
e = open(r'C:\unix_report_status\templates\email_1.txt', "r")
# e = open(r'/apps/axiom_app/flask/templates/email_1.txt', "r")
email_head=e.read()
email_data=email_data+email_e
email_content=email_head+email_content+email_data
email_content=email_content.replace('{DATE_HERE}',datetime_NY.strftime("%d-%b-%y"))
email_content=email_content.replace('{TIME_HERE}',est_time)
# print(email_content)
html_e="""</table>
</br>"""
if user_role=='admin' or user_role=='superuser':
html_e=html_e+"""
<form id='cmt_form' method='POST' action = '/dashboard' enctype='multipart/form-data'>
<input class='add_input' type='hidden' name='job_name' id='job_name' placeholder=''>
<input class='add_input' type='hidden' name='job_delay' id='job_delay' placeholder=''>
<input class='add_input' type='hidden' name='job_cmt' id='job_cmt' placeholder=''>
<input class="add_input" type="hidden" name="dashboard_flag" id="dashboard_flag" >
<textarea name='email_content' style="display:none" id='email_content' >"""+email_content+"""</textarea>
<input class='add_input' type='hidden' name='email_flag' id='email_flag' placeholder=''>
<input class='add_input' type='' name='email_id' id='email_id' placeholder=''>
<button name='email_b' type='' value='' onclick='btn_em()'>Email</button>
</form>"""
html_e=html_e+"""</div></body></html>"""
# print(table_data)
final_html=html_s+table_data+html_e
f = open(r'C:\unix_report_status\templates\dashboard_1.htm', "r")
# f = open(r'/apps/axiom_app/flask/templates/dashboard_1.htm', "r")
html_head=f.read()
html_head=html_head.replace('{ENV_HERE}','('+env+')')
html_head=html_head.replace('{USERNAME_HERE}',username)
#print(f.read())
html_file = open(r'C:\unix_report_status\templates\dashboard.htm', "w")
# html_file = open(r'/apps/axiom_app/flask/templates/dashboard.htm', "w")
html_file.write(html_head+' '+final_html)
html_file.close()
print('done')
ora_cur.close()
return 'refresh'
def submit_cmt(job_name,job_cmt,job_delay,username):
tz_NY = pytz.timezone('America/New_York')
datetime_NY = datetime.now(tz_NY)
#print("NY time:", datetime_NY.strftime("%H:%M:%S"))
est_time= datetime_NY.strftime("%H:%M:%S")
est_date= datetime_NY.strftime("%d-%m-%y")
cur_time = datetime.strptime(est_date+" "+est_time,'%d-%m-%y %H:%M:%S')
cur_time = cur_time.strftime('%d-%m-%y %H:%M')
cur_time = datetime.strptime(cur_time,'%d-%m-%y %H:%M')
ora_con = cx_Oracle.connect('axiom_data2/'+db_pass+'@RRANYT02')
ora_cur = ora_con.cursor()
ora_cur.execute("insert into rs_job_delay values((select nvl(max(id),0)+1 from rs_job_delay),'"+est_date+"','"+job_name+"',"+job_delay+",'"+job_cmt+"','"+username+"')")
ora_cur.execute('commit')
ora_cur.close()
return 'cmt saved'
def user_login_auth(user_window_id):
ora_con = cx_Oracle.connect('axiom_data2/'+db_pass+'@RRANYT02')
ora_cur = ora_con.cursor()
logged_in_user = user_window_id
v_sql="select count(1) as cnt from DS_REPORT_S1060297 where user_windows_id ='"+logged_in_user+"' and user_active='Y' "
# print(v_sql)
ora_cur.execute(v_sql)
rows = ora_cur.fetchall()
res_list=[i[0] for i in rows]
if (res_list == [1]) :
return 0
else:
return 1
def get_admin_page(username):
login_user_role=''
ora_con = cx_Oracle.connect('axiom_data2/'+db_pass+'@RRANYT02')
ora_cur = ora_con.cursor()
ora_cur.execute("""select user_role from rs_user_access where username='"""+username+"""'""")
user_role_details = ora_cur.fetchall()
if len(user_role_details)==0:
login_user_role='view'
else:
login_user_role=user_role_details[0][0]
f = open(r'C:\unix_report_status\templates\dashboard_1.htm', "r")
# f = open(r'/apps/axiom_app/flask/templates/dashboard_1.htm', "r")
html_head=f.read()
html_file = open(r'C:\unix_report_status\templates\admin_page.htm', "w")
html_body='</br><center><div style="max-width: 98%;">'
if login_user_role=='superuser' or login_user_role=='admin':
html_body = html_body+'''<label for="fname">Windows User Name </label>
<input type="text" id="username" name="username" value="">
<select name="userrole" id="userrole">
<option value="superuser">Superuser</option>
<option value="admin">Admin</option>
<option value="view">View</option>
</select>
<button name='add_role' type='' value='' onclick="btn_add_role('add_role')">Add User</button>
</br>'''
html_body = html_body+'''</br><table class="table1"><tr class="row2"><th align="left" bgcolor="b3d9ff" width="">User Name</th>
<th align="left" bgcolor="b3d9ff" width="10%" >User Role</th>
<th align="left" width="10%" > </th></tr>'''
ora_cur.execute('select username,user_role from rs_user_access')
rows = ora_cur.fetchall()
if rows:
for row in rows:
user_name=row[0]
user_role=row[1]
html_body=html_body+'''<tr><td>'''+user_name+'''</td> <td>'''+user_role+'''</td>'''
if login_user_role=='superuser' or login_user_role=='admin':
if user_role=='superuser':
html_body=html_body+'''<td> </td>'''
else:
html_body=html_body+'''<td><button name="'''+user_name+'''_b" onclick='btn_remove_role("'''+user_name+'''")'>Remove</button></td></tr>'''
else:
html_body=html_body+'''<td> </td>'''
html_body=html_body+'</table>'
html_file.write(html_head+' '+html_body+' </div></center></div></body></html>')
html_file.close()
print(user_role)
def add_user_role(user_name,user_role):
# print("insert into rs_user_access values('"+user_name+"','"+user_role+"')")
ora_cur.execute("insert into rs_user_access values('"+user_name+"','"+user_role+"')")
ora_cur.execute('commit')
def remove_user_role(user_name):
ora_cur.execute("delete from rs_user_access where username='"+user_name+"'")
ora_cur.execute('commit')
get_status('')
这看起来需要重新设计。我会尝试解释这个问题...
您在整个代码的不同位置写入模板文件。
例如在 get_status
函数中你做的:
html_file = open(r'C:\unix_report_status\templates\dashboard.htm', "w")
然后你写一些HTML到那个文件。
然而,您随后会在 Flask 视图函数的不同位置调用此函数:
@app.route("/dashboard", methods=["GET", "POST"])
def dashboard():
# ...
# some if statements ...
gs.get_status(session['username'])
然后继续return调用gs.get_status
函数时编写的渲染模板:
return render_template("dashboard.htm")
这个问题可能不会立即显现出来,一旦来自不同用户的多个请求开始命中此应用程序,他们就会争先恐后地覆盖模板文件,从而导致随机错误。此应用程序根本无法扩展到多个用户。
我认为您需要返回并重新设计它,这样您就不会一直手动写入模板文件。阅读 Flask Templates which support Jinja Syntax 并使用它来实现您想要手动执行的操作。
这将使您在磁盘上拥有一个永不更改的模板文件,并且可以包含 Jinja 语法,例如:
dashboard.html
<p>Show me {{ some_variable }}.</p>
和一些相应的代码来填充它,将参数传递给 render_template
:
def get_status():
# Any old logic
return 'success'
@app.route('/dashboard')
def dashboard():
result = get_status()
return render_template('dashboard.html', some_variable = result)
呈现为:
<p>Show me success.</p>
使用这种方法,您可以自动支持许多请求,因为模板是使用动态数据按需呈现的。
也许这不是您想要的答案,但最好重新设计此应用程序以使用 Jinja2 模板引擎,并解决进程中的数据库重新加载错误。寻找像 --reload
标志这样的解决方法来使 gunicorn 像开发服务器一样运行可能会导致进一步的不可预见的问题。
我是 flask 和 gunicorn 的新手。我正在使用 gunicorn 进行产品托管。我有一个提交按钮,它将表单数据存储在数据库中;我希望从数据库中获取新存储的数据,并在重新加载页面时显示出来。我观察到的是,Gunicorn 在重新启动之前不会重新加载任何内容。谁能帮我在不停止服务器的情况下重新加载数据?
- 我试过在启动 gunicorn 时使用 --reload,但没有用。我的应用程序曾经在 Flask 的开发服务器上按预期工作。所以基本上我想要像 gunicorn 中的这个功能:app.run(host=so.gethostbyname(so.gethostname()), port=v_port, debug=True, use_reloader =真)。 TIA
############################ This is main_app.py #######################
from flask import Flask, render_template, request, redirect, send_file, send_from_directory, current_app, flash, abort, jsonify, session, url_for
import os as os
import sys as sys
from ldap3 import Server, Connection, NTLM
import ldap3
import socket as so
import subprocess
from subprocess import Popen, PIPE
from subprocess import check_output
import ssl
v_port = 2001
v_project_path = os.path.dirname(os.path.abspath(__file__)).replace('\scripts','')
v_scripts_path = v_project_path + os.sep + 'scripts' + os.sep
sys.path.insert(0,v_scripts_path)
app = Flask(__name__)
import get_status as gs
#import save_cmnt as cmt
import base64
import struct
import cx_Oracle
from Crypto.Cipher import AES
SECRET_KEY = b"xyz"
class AESCipher(object):
def __init__(self, key):
self.bs = AES.block_size
self.key = key
def _pad(self, s):
return s + (self.bs - len(s) % self.bs) * '0'
def decrypt(self, enc):
enc = base64.b64decode(enc)
iv = enc[:self.bs]
raw_size = struct.unpack('<i', enc[self.bs:self.bs + 4])[0]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
raw_bytes = cipher.decrypt(enc[self.bs + 4:])
print (raw_bytes)
raw = raw_bytes[:raw_size].decode('utf_8')
return raw
pass_f = open(r'C:\unix_report_status\python_db_pass.txt', "rb")
#pass_f = open(r'/apps/axiom_app/flask/python_db_pass.txt', "rb")
enc_db_pass=pass_f.read()
c = AESCipher(SECRET_KEY)
db_pass = c.decrypt(enc_db_pass)
ora_con = cx_Oracle.connect('axiom_data2/'+db_pass+'@RRANYT02')
ora_cur = ora_con.cursor()
def validate_credentials(connection):
try:
connection.bind()
return True
except Exception:
pass
return False
@app.route('/')
def index():
if 'username' in session:
username = session['username']
return 'Logged in as ' + username + '<br>' + "<b><a href = '/logout'>click here to log out</a></b>"
return render_template("login.htm")
@app.route("/login", methods=[ "GET","POST"])
def login():
print('inside login')
if (request.method == 'POST'):
print('in post')
username = request.form['username']
password = request.form['password']
print('login Name ' + username)
server = Server(host='host',
port=636,
get_info=ldap3.ALL,
use_ssl=True,
allowed_referral_hosts='*',
mode='IP_V6_PREFERRED')
connection = Connection(server,
auto_bind=False,
receive_timeout=1000,
client_strategy=ldap3.SYNC,
raise_exceptions=True,
version=3, user=r"intra\{0}".format(username),
password=password,
authentication=NTLM,
check_names=True,
read_only=True, auto_referrals=True)
connection.start_tls()
User_return_value=validate_credentials(connection)
# User_return_value = gs.user_login_auth(login_name)
print('----- ',User_return_value)
if User_return_value == False:
return render_template("wrong_cred.htm")
else :
##check if user has access
ora_cur.execute("""select user_role from rs_user_access where username='"""+username+"""'""")
user_role_details = ora_cur.fetchall()
if len(user_role_details)==0:
return render_template("unauthorised.htm")
else:
user_role=user_role_details[0][0]
session['username']= username
session.permanent = True
return redirect(url_for('dashboard'))
return render_template("login.htm")
@app.route('/logout', methods=["GET", "POST"])
def logout():
session.pop('username', None)
# response = response()
return render_template("login.htm")
@app.route('/report_calender', methods=["GET", "POST"])
def report_calender():
return render_template("dashboard.htm")
@app.route('/admin', methods=["GET", "POST"])
def admin():
gs.get_admin_page(session['username'])
if (request.method == 'POST') :
print('~~~~~~~~', 'adm post')
else:
print('~~~~~~~~', 'adm not post')
admin_flag=request.form['admin_flag']
print('~~~~~~~~~~~~~~', admin_flag)
if admin_flag=='0':
return render_template("admin_page.htm")
elif admin_flag=='1':
user_name = request.form['user_name']
user_role = request.form['user_role']
gs.add_user_role(user_name,user_role)
gs.get_admin_page(session['username'])
return render_template("admin_page.htm")
elif admin_flag=='2':
rm_user_name = request.form['rm_user_name']
gs.remove_user_role(rm_user_name)
gs.get_admin_page(session['username'])
print('~~~~ removing', rm_user_name)
return render_template("admin_page.htm")
else:
gs.get_admin_page(session['username'])
return render_template("admin_page.htm")
@app.route("/dashboard", methods=["GET", "POST"])
def dashboard():
if 'username' in session:
if (request.method == 'POST') :
dashboard_flag = request.form['dashboard_flag']
if dashboard_flag=='0':
print('normal refresh')
gs.get_status(session['username'])
else:
print('in post')
job_name = request.form['job_name']
email_flag = request.form['email_flag']
if email_flag=='1':
print('in email')
recipients = request.form['email_id']
email_content = request.form['email_content']
print(recipients)
email_file = open(r'C:\unix_report_status\templates\email.txt', "w")
email_file.write(email_content)
email_file.close()
# subprocess.call(['./send_email.sh'])
gs.get_status(session['username'])
elif job_name:
print('in delay')
job_name = request.form['job_name']
job_delay = request.form['job_delay']
job_cmt = request.form['job_cmt']
gs.submit_cmt(job_name,job_cmt,job_delay,session['username'])
gs.get_status(session['username'])
else:
gs.get_status(session['username'])
else:
print('normal refresh')
gs.get_status(session['username'])
return render_template("dashboard.htm")
else:
return render_template("wrong_cred.htm")
@app.after_request
def set_response_headers(response):
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
return response
if __name__ == '__main__':
app.secret_key = os.urandom(12)
app.run(host=so.gethostbyname(so.gethostname()), port=v_port, debug=True, use_reloader=True)
from gevent.pywsgi import WSGIServer
http_server = WSGIServer((so.gethostbyname(so.gethostname())))
http_server.serve_forever()
############################ This is get_status.py ######################
import cx_Oracle
from datetime import datetime, time, timedelta
import pytz
#import sqlite3
import base64
import struct
#from Crypto import Random
from Crypto.Cipher import AES
SECRET_KEY = b"xyz"
class AESCipher(object):
def __init__(self, key):
self.bs = AES.block_size
self.key = key
def _pad(self, s):
return s + (self.bs - len(s) % self.bs) * '0'
def decrypt(self, enc):
enc = base64.b64decode(enc)
iv = enc[:self.bs]
raw_size = struct.unpack('<i', enc[self.bs:self.bs + 4])[0]
cipher = AES.new(self.key, AES.MODE_CBC, iv)
raw_bytes = cipher.decrypt(enc[self.bs + 4:])
print (raw_bytes)
raw = raw_bytes[:raw_size].decode('utf_8')
return raw
pass_f = open(r'C:\unix_report_status\python_db_pass.txt', "rb")
#pass_f = open(r'/apps/axiom_app/flask/python_db_pass.txt', "rb")
enc_db_pass=pass_f.read()
c = AESCipher(SECRET_KEY)
db_pass = c.decrypt(enc_db_pass)
ora_con = cx_Oracle.connect('axiom_data2/'+db_pass+'@RRANYT02')
ora_cur = ora_con.cursor()
ora_cur.execute("""select env from rs_env_details""")
env_details = ora_cur.fetchall()
env = (env_details[0][0])
def get_status(username):
ora_con = cx_Oracle.connect('axiom_data2/'+db_pass+'@RRANYT02')
ora_cur = ora_con.cursor()
tz_NY = pytz.timezone('America/New_York')
datetime_NY = datetime.now(tz_NY)
#print("NY time:", datetime_NY.strftime("%H:%M:%S"))
est_time= datetime_NY.strftime("%H:%M:%S")
est_date= datetime_NY.strftime("%d-%m-%y")
cur_time = datetime.strptime(est_date+" "+est_time,'%d-%m-%y %H:%M:%S')
cur_time = cur_time.strftime('%d-%m-%y %H:%M')
cur_time = datetime.strptime(cur_time,'%d-%m-%y %H:%M')
print(cur_time)
# c = AESCipher(SECRET_KEY)
# db_pass = c.decrypt(enc_db_pass)
# print(db_pass)
ora_cur.execute("""select user_role from rs_user_access where username='"""+username+"""'""")
user_role_details = ora_cur.fetchall()
if len(user_role_details)==0:
user_role='view'
else:
user_role=user_role_details[0][0]
print(user_role)
ora_cur.execute("""select env from rs_env_details""")
env_details = ora_cur.fetchall()
env = (env_details[0][0])
# ora_cur.execute("""select BD from from axiom_job_da467762 where job_name='axm_c_rpt_tic_b_bl1_bd'""")
# bd_details = ora_cur.fetchall()
# bd = (bd_details[0][0])
ora_cur.execute("""select t1.job_name,report_name,t1.report_type,report_sla,avg_run_time,avg_start_time,
status,start_time,end_time,delay_hrs,nvl(comments,'-') comments from rs_job_master t1
right join rs_job_status t2 on
t1.job_name=t2.job_name
left join (select run_date,job_name,delay_hrs,comments
from rs_job_delay where run_date='"""+est_date+"""' and id in (select max(id) from rs_job_delay group by JOB_NAME)) t3
on t1.job_name=t3.job_name
left join axiom_job_d1108822 t4
on t1.job_name=t4.job_name where run_flag='Y' and t1.report_type!='MTY'
order by t1.report_type""")
html_s="""</br><div class="row" id="div_generate" >
<text for="cob_date"> <b>Run Date</b> : """+datetime_NY.strftime("%d-%b-%y")+"""</text>
<text for="cob_date"> <b>ME BD </b>: """+'7'+""" </text>
<text for="est_time"> <b>EST Time</b> : """+(datetime.strftime(cur_time,'%d-%b-%y %H:%M'))+"""</text>
</div>
<table class="table1"><tr class="row2"><th align="left" bgcolor="b3d9ff" width="3%" width='90%'>Type</th>
<th align="left" bgcolor="b3d9ff" width="10%" >Report Name</th>
<th align="left" bgcolor="b3d9ff" width="7%">Status</th>
<th align="left" bgcolor="b3d9ff" width="10%">Start Time</th>
<th align="left" bgcolor="b3d9ff" width="10%">End Time</th>
<th align="left" bgcolor="b3d9ff"width="5%">Breach</th>
<th align="left" bgcolor="b3d9ff"width="5%">SLA</th>
<th align="left" bgcolor="b3d9ff"width="5%">ETA</th>
<th align="left" bgcolor="b3d9ff"width="3%">Delay (hrs)</th>
<th align="left" bgcolor="b3d9ff"width="20%">Comments</th>
<th align="left" bgcolor="b3d9ff"width="3%"> </th></tr>"""
email_content="""<table class="table1">
<tr class="row2"><th align="left" bgcolor="b3d9ff" width="3%" >Type</th>
<th align="left" bgcolor="b3d9ff" width="10%" >Report Name</th>
<th align="left" bgcolor="b3d9ff" width="7%">Status</th>
<th align="left" bgcolor="b3d9ff" width="10%">Start Time</th>
<th align="left" bgcolor="b3d9ff" width="10%">End Time</th>
<th align="left" bgcolor="b3d9ff"width="5%">Breach</th>
<th align="left" bgcolor="b3d9ff"width="5%">SLA</th>
<th align="left" bgcolor="b3d9ff"width="5%">ETA</th>
<th align="left" bgcolor="b3d9ff"width="3%">Delay (hrs)</th>
<th align="left" bgcolor="b3d9ff"width="20%">Comments</th></tr>"""
rows = ora_cur.fetchall()
if rows:
table_data=""
email_data=""
for row in rows:
job_name=row[0]
report_name=str(row[1])
report_type=row[2]
report_sla=row[3]
avg_run_time=row[4]
avg_start_time=row[5]
status=row[6]
start_time=row[7]
end_time=row[8]
delay_hrs=row[9]
print("delay_hrs-",delay_hrs)
comments=row[10]
print("comments-",comments)
# print(job_name)
# print(end_time)
sla_breach="-"
eta='-'
# print(start_time)
### ETA and SLA CALCULATION
sla_time = est_date+' '+report_sla
sla_time = datetime.strptime(sla_time,'%d-%m-%y %H:%M')
sla_time = sla_time.strftime('%d-%m-%y %H:%M')
sla_time = datetime.strptime(sla_time,'%d-%m-%y %H:%M')
# print(job_name)
if status=='ACTIVATED' and delay_hrs is not None:
print("here- ",delay_hrs)
avg_start_time=est_date+" "+avg_start_time
avg_start_time=datetime.strptime(avg_start_time,'%d-%m-%y %H:%M')
total_delay=delay_hrs+avg_run_time
new_eta = avg_start_time + timedelta(hours=total_delay)
new_eta=new_eta.strftime('%H:%M')
eta = new_eta
b_eta=est_date+' '+eta
b_eta=datetime.strptime(b_eta,'%d-%m-%y %H:%M')
print(b_eta, sla_time)
if sla_time<b_eta:
sla_breach='YES'
# print('Breaching')
else:
sla_breach='NO'
# print('Not Breaching')
elif status=='ACTIVATED' and delay_hrs is None:
avg_start_time=est_date+" "+avg_start_time
avg_start_time=datetime.strptime(avg_start_time,'%d-%m-%y %H:%M')
new_eta = avg_start_time + timedelta(hours=avg_run_time)
new_eta=new_eta.strftime('%H:%M')
eta = new_eta
# print(report_name)
# print(avg_start_time,sla_time,avg_start_time + timedelta(hours=avg_run_time))
if (avg_start_time + timedelta(hours=avg_run_time))>sla_time:
sla_breach='YES'
# print('Breaching')
else:
sla_breach='NO'
# print('Not Breaching')
elif status=='SUCCESS':
eta = 'NA'
end_time=datetime.strptime(end_time, "%m/%d/%Y %H:%M:%S")
end_time = end_time.strftime('%d-%m-%y %H:%M')
end_time=datetime.strptime(end_time,'%d-%m-%y %H:%M')
start_time=datetime.strptime(start_time,"%m/%d/%Y %H:%M:%S")
start_time = start_time.strftime('%d-%m-%y %H:%M')
start_time=datetime.strptime(start_time,'%d-%m-%y %H:%M')
# start_time='Not Started'
# print(end_time, sla_time)
if end_time>sla_time:
sla_breach='YES'
# print('Breached')
else:
sla_breach='NO'
# print('Not Breached')
elif status=='RUNNING':
# start_time=est_date+' '+start_time
start_time=datetime.strptime(start_time,"%m/%d/%Y %H:%M:%S")
start_time = start_time.strftime('%d-%m-%y %H:%M')
start_time=datetime.strptime(start_time,'%d-%m-%y %H:%M')
new_eta=start_time + timedelta(hours=avg_run_time)
new_eta = new_eta.strftime('%H:%M')
eta = new_eta
if (start_time+ timedelta(hours=avg_run_time))>sla_time:
sla_breach='YES'
# print('Breaching')
else:
sla_breach='NO'
# print('Not Breaching')
else:
if start_time != '-':
start_time=datetime.strptime(start_time,"%m/%d/%Y %H:%M:%S")
start_time = start_time.strftime('%d-%m-%y %H:%M')
start_time=datetime.strptime(start_time,'%d-%m-%y %H:%M')
if end_time != '-':
end_time=datetime.strptime(end_time, "%m/%d/%Y %H:%M:%S")
end_time = end_time.strftime('%d-%m-%y %H:%M')
end_time=datetime.strptime(end_time,'%d-%m-%y %H:%M')
eta = '-'
### ETA and SLACALCULATION ENDS
print("start_time:-",start_time)
if start_time == '-':
start_time='Not Started'
else:
start_time = start_time.strftime('%H:%M')
start_time=str(start_time)
if end_time == '-':
end_time='-'
print('1')
else:
# end_time=datetime.strptime(end_time, "%m/%d/%Y %H:%M:%S")
# end_time = end_time.strftime('%d-%m-%y %H:%M')
# end_time=datetime.strptime(end_time,'%d-%m-%y %H:%M')
end_time = end_time.strftime('%H:%M')
end_time=str(end_time)
# print('2')
if delay_hrs == None:
delay_hrs = '-'
else:
delay_hrs = str(delay_hrs)
# print(start_time)
# print(type(status))
print(report_name,start_time,end_time,report_sla)
table_data=table_data+"""<tr>
<td>"""+report_type+"""</td>
<td>"""+report_name+"""</td>
<td>"""+status+"""</td>
<td>"""+start_time+"""</td>
<td>"""+end_time+"""</td>"""
if sla_breach=='YES':
table_data=table_data+"""<td bgcolor="red"><p style="color:FFFFFF">"""+sla_breach+"""</p></td>"""
elif sla_breach=='NO':
table_data=table_data+"""<td bgcolor="green"><p style="color:FFFFFF">"""+sla_breach+"""</p></td>"""
else:
table_data=table_data+"""<td>"""+sla_breach+"""</td>"""
table_data=table_data+"""<td>"""+report_sla+"""</td>
<td>"""+eta+"""</td>
<td><input type='text' height='3' style='border: 1px solid #ccc; border-radius: 4px; box-sizing: border-box;' size="1" id='"""+job_name+"""_d' value='"""+delay_hrs+"""'></td>
<td><textarea style='border: 1px solid #ccc; border-radius: 4px; ' rows='2' cols='50' name='"""+job_name+"""_c' id='"""+job_name+"""_c'>"""+comments+"""</textarea></td>"""
if user_role=='admin' or user_role=='superuser':
table_data=table_data+"""<td><button name='"""+job_name+"""_b' type='' value='' onclick="btn_sv('"""+job_name+"""')">Save</button></td>"""
else:
table_data=table_data+"""<td></td>"""
table_data=table_data+"""</tr>"""
email_data=email_data+"""
<tr><td>"""+report_type+"""</td>
<td>"""+report_name+"""</td>
<td>"""+status+"""</td>
<td>"""+start_time+"""</td>
<td>"""+end_time+"""</td>
"""
if sla_breach=='YES':
email_data=email_data+"""<td bgcolor="red"><p style="color:black">"""+sla_breach+"""</p></td>"""
elif sla_breach=='NO':
email_data=email_data+"""<td bgcolor="green"><p style="color:black">"""+sla_breach+"""</p></td>"""
else:
email_data=email_data+"""<td>"""+sla_breach+"""</td>"""
email_data=email_data+"""<td>"""+report_sla+"""</td>
<td>"""+eta+"""</td>
<td>"""+delay_hrs+"""</td>
<td>"""+comments+"""</td></tr>
"""
email_e="""</table>
<p>
For further queries, please contact <a href="mailto:saket.parab@barclays.com">Saket Parab.</a>
<p>
Regards,
<br>
<b>Axiom Dev</b>
</p>
</body></html>"""
e = open(r'C:\unix_report_status\templates\email_1.txt', "r")
# e = open(r'/apps/axiom_app/flask/templates/email_1.txt', "r")
email_head=e.read()
email_data=email_data+email_e
email_content=email_head+email_content+email_data
email_content=email_content.replace('{DATE_HERE}',datetime_NY.strftime("%d-%b-%y"))
email_content=email_content.replace('{TIME_HERE}',est_time)
# print(email_content)
html_e="""</table>
</br>"""
if user_role=='admin' or user_role=='superuser':
html_e=html_e+"""
<form id='cmt_form' method='POST' action = '/dashboard' enctype='multipart/form-data'>
<input class='add_input' type='hidden' name='job_name' id='job_name' placeholder=''>
<input class='add_input' type='hidden' name='job_delay' id='job_delay' placeholder=''>
<input class='add_input' type='hidden' name='job_cmt' id='job_cmt' placeholder=''>
<input class="add_input" type="hidden" name="dashboard_flag" id="dashboard_flag" >
<textarea name='email_content' style="display:none" id='email_content' >"""+email_content+"""</textarea>
<input class='add_input' type='hidden' name='email_flag' id='email_flag' placeholder=''>
<input class='add_input' type='' name='email_id' id='email_id' placeholder=''>
<button name='email_b' type='' value='' onclick='btn_em()'>Email</button>
</form>"""
html_e=html_e+"""</div></body></html>"""
# print(table_data)
final_html=html_s+table_data+html_e
f = open(r'C:\unix_report_status\templates\dashboard_1.htm', "r")
# f = open(r'/apps/axiom_app/flask/templates/dashboard_1.htm', "r")
html_head=f.read()
html_head=html_head.replace('{ENV_HERE}','('+env+')')
html_head=html_head.replace('{USERNAME_HERE}',username)
#print(f.read())
html_file = open(r'C:\unix_report_status\templates\dashboard.htm', "w")
# html_file = open(r'/apps/axiom_app/flask/templates/dashboard.htm', "w")
html_file.write(html_head+' '+final_html)
html_file.close()
print('done')
ora_cur.close()
return 'refresh'
def submit_cmt(job_name,job_cmt,job_delay,username):
tz_NY = pytz.timezone('America/New_York')
datetime_NY = datetime.now(tz_NY)
#print("NY time:", datetime_NY.strftime("%H:%M:%S"))
est_time= datetime_NY.strftime("%H:%M:%S")
est_date= datetime_NY.strftime("%d-%m-%y")
cur_time = datetime.strptime(est_date+" "+est_time,'%d-%m-%y %H:%M:%S')
cur_time = cur_time.strftime('%d-%m-%y %H:%M')
cur_time = datetime.strptime(cur_time,'%d-%m-%y %H:%M')
ora_con = cx_Oracle.connect('axiom_data2/'+db_pass+'@RRANYT02')
ora_cur = ora_con.cursor()
ora_cur.execute("insert into rs_job_delay values((select nvl(max(id),0)+1 from rs_job_delay),'"+est_date+"','"+job_name+"',"+job_delay+",'"+job_cmt+"','"+username+"')")
ora_cur.execute('commit')
ora_cur.close()
return 'cmt saved'
def user_login_auth(user_window_id):
ora_con = cx_Oracle.connect('axiom_data2/'+db_pass+'@RRANYT02')
ora_cur = ora_con.cursor()
logged_in_user = user_window_id
v_sql="select count(1) as cnt from DS_REPORT_S1060297 where user_windows_id ='"+logged_in_user+"' and user_active='Y' "
# print(v_sql)
ora_cur.execute(v_sql)
rows = ora_cur.fetchall()
res_list=[i[0] for i in rows]
if (res_list == [1]) :
return 0
else:
return 1
def get_admin_page(username):
login_user_role=''
ora_con = cx_Oracle.connect('axiom_data2/'+db_pass+'@RRANYT02')
ora_cur = ora_con.cursor()
ora_cur.execute("""select user_role from rs_user_access where username='"""+username+"""'""")
user_role_details = ora_cur.fetchall()
if len(user_role_details)==0:
login_user_role='view'
else:
login_user_role=user_role_details[0][0]
f = open(r'C:\unix_report_status\templates\dashboard_1.htm', "r")
# f = open(r'/apps/axiom_app/flask/templates/dashboard_1.htm', "r")
html_head=f.read()
html_file = open(r'C:\unix_report_status\templates\admin_page.htm', "w")
html_body='</br><center><div style="max-width: 98%;">'
if login_user_role=='superuser' or login_user_role=='admin':
html_body = html_body+'''<label for="fname">Windows User Name </label>
<input type="text" id="username" name="username" value="">
<select name="userrole" id="userrole">
<option value="superuser">Superuser</option>
<option value="admin">Admin</option>
<option value="view">View</option>
</select>
<button name='add_role' type='' value='' onclick="btn_add_role('add_role')">Add User</button>
</br>'''
html_body = html_body+'''</br><table class="table1"><tr class="row2"><th align="left" bgcolor="b3d9ff" width="">User Name</th>
<th align="left" bgcolor="b3d9ff" width="10%" >User Role</th>
<th align="left" width="10%" > </th></tr>'''
ora_cur.execute('select username,user_role from rs_user_access')
rows = ora_cur.fetchall()
if rows:
for row in rows:
user_name=row[0]
user_role=row[1]
html_body=html_body+'''<tr><td>'''+user_name+'''</td> <td>'''+user_role+'''</td>'''
if login_user_role=='superuser' or login_user_role=='admin':
if user_role=='superuser':
html_body=html_body+'''<td> </td>'''
else:
html_body=html_body+'''<td><button name="'''+user_name+'''_b" onclick='btn_remove_role("'''+user_name+'''")'>Remove</button></td></tr>'''
else:
html_body=html_body+'''<td> </td>'''
html_body=html_body+'</table>'
html_file.write(html_head+' '+html_body+' </div></center></div></body></html>')
html_file.close()
print(user_role)
def add_user_role(user_name,user_role):
# print("insert into rs_user_access values('"+user_name+"','"+user_role+"')")
ora_cur.execute("insert into rs_user_access values('"+user_name+"','"+user_role+"')")
ora_cur.execute('commit')
def remove_user_role(user_name):
ora_cur.execute("delete from rs_user_access where username='"+user_name+"'")
ora_cur.execute('commit')
get_status('')
这看起来需要重新设计。我会尝试解释这个问题...
您在整个代码的不同位置写入模板文件。
例如在 get_status
函数中你做的:
html_file = open(r'C:\unix_report_status\templates\dashboard.htm', "w")
然后你写一些HTML到那个文件。
然而,您随后会在 Flask 视图函数的不同位置调用此函数:
@app.route("/dashboard", methods=["GET", "POST"])
def dashboard():
# ...
# some if statements ...
gs.get_status(session['username'])
然后继续return调用gs.get_status
函数时编写的渲染模板:
return render_template("dashboard.htm")
这个问题可能不会立即显现出来,一旦来自不同用户的多个请求开始命中此应用程序,他们就会争先恐后地覆盖模板文件,从而导致随机错误。此应用程序根本无法扩展到多个用户。
我认为您需要返回并重新设计它,这样您就不会一直手动写入模板文件。阅读 Flask Templates which support Jinja Syntax 并使用它来实现您想要手动执行的操作。
这将使您在磁盘上拥有一个永不更改的模板文件,并且可以包含 Jinja 语法,例如:
dashboard.html
<p>Show me {{ some_variable }}.</p>
和一些相应的代码来填充它,将参数传递给 render_template
:
def get_status():
# Any old logic
return 'success'
@app.route('/dashboard')
def dashboard():
result = get_status()
return render_template('dashboard.html', some_variable = result)
呈现为:
<p>Show me success.</p>
使用这种方法,您可以自动支持许多请求,因为模板是使用动态数据按需呈现的。
也许这不是您想要的答案,但最好重新设计此应用程序以使用 Jinja2 模板引擎,并解决进程中的数据库重新加载错误。寻找像 --reload
标志这样的解决方法来使 gunicorn 像开发服务器一样运行可能会导致进一步的不可预见的问题。