基于 Flask Mega 教程的 Tweet Scheduler:获取 flask-sqlalchemy 数据库字段的问题

Tweet Scheduler based on Flask Megatutorial: Issues with getting fields of flask-sqlalchemy databse

我正在修改按照 Miguel Grinberg 的 Flask Mega 教程创建的 Flask 应用程序,以便可以 post 推文。我导入了用于访问 twitter api 的 tweepy 并修改了数据库以保存推文的预定时间。 我希望在当前时间与预定时间匹配时迭代 current_user 的 post 和 SQLAlchemy 数据库中的相应时间以及 post 。

model.py中的数据库模型修改如下:

class Post(db.Model):
id = db.Column(db.Integer, primary_key = True)
body = db.Column(db.String(140))
timestamp = db.Column(db.DateTime, index=True, default=datetime.utcnow)
socialnetwork = db.Column(db.String(40))
user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
#This is the stuff for scheduling, just date
hour = db.Column(db.Integer)
minute = db.Column(db.Integer)
day = db.Column(db.Integer)
month = db.Column(db.Integer)
year = db.Column(db.Integer)
ampm = db.Column(db.String(2))

作为测试,我想遍历当前用户的 post 并使用 tweepy 向他们发送推文:

@app.before_first_request
def activate_job():
    def run_job():
        posts = current_user.followed_posts().filter_by(socialnetwork ='Twitter')
        for post in posts:
            tweepy_api.update_status(message)

            time.sleep(30)
    thread = threading.Thread(target=run_job)
    thread.start()

但是,这返回了错误:

AttributeError: 'NoneType' object has no attribute 'followed_posts'

在终端上。这让我感到困惑,因为我在同一个文件中多次使用 current_user 来通过社交网络过滤 post。

如下例routes.py

@app.route('/<username>')
@login_required
def user(username):
    user = User.query.filter_by(username = username).first_or_404()
    socialnetwork = request.args.get("socialnetwork")

    if socialnetwork == 'Facebook':

    posts = current_user.followed_posts().filter_by(socialnetwork = 'Facebook')

    elif socialnetwork == 'Twitter':
        posts = current_user.followed_posts().filter_by(socialnetwork = 'Twitter')
    else:
        posts = current_user.followed_posts()


    return render_template('user.html', user = user, posts = posts, form = socialnetwork)

上面的代码没有错误并且工作完美。

如果有人能指出我做错了什么,我将不胜感激。

您可能 运行 遇到问题,因为您正试图在不同的线程上获取 current_user(有关详细信息,请参阅 Flask docs)。您在没有任何当前用户的不同上下文中调用 run_job()(因为没有活动请求)。

我会对其进行修改,以便您在主线程中获取当前用户的帖子(即在 activate_job() 中,然后将帖子列表传递给后台进程进行处理。

类似于:

def activate_job():
    posts = current_user.followed_posts().filter_by(socialnetwork ='Twitter')
    def run_job(posts):
        for post in posts:
            tweepy_api.update_status(message)

            time.sleep(30)
    thread = threading.Thread(target=run_job, args=[posts])
    thread.start()

还值得注意的是,您可能需要重新考虑您的整体方法。与其检查每个请求是否有任何预定的推文要发送,不如使用某种独立于 Web 进程运行的后台任务队列。这样,您就不会对每个请求进行冗余检查,并且您不依赖于用户在预定时间发出请求。

参见 The Flask Mega-Tutorial Part XXII: Background Jobs for more details, and look into Celery