Flask 应用程序在多线程上被阻塞

Flask Application gets blocked on multithreading

我正在尝试使用线程并将结果存储在使用单个会话的线程上。这在大多数情况下都工作正常,除了少数情况下我的整个应用程序都得到了,我无法找出原因。

我的应用程序在 notification.save() __filter_notifications_by_status_and_request_type 方法中被阻止。 notification.save() 正在将数据保存到数据库中。

我无法弄清楚这是数据库问题还是线程或锁定问题。

我正在使用 flask 应用程序,我正在通过 apache 使用 passenger_wsgi 点击它。在我的应用程序被阻止后,我的服务器停止接受进一步的请求。

DB python 使用的库 = SqlAlchemy

class Inference:

##
# @brief initializer of the Inference Handler
#
# @param kwargs keywords Arguments
#
# @return None
    def __init__(self, **kwargs):
        """ Calling the inference from here and get the result """
        if((kwargs.has_key('IRISRequest'))):
            self.iris_request  = kwargs['IRISRequest']

        self.racerx_inference_config = Config.get_racerx_inference_config()
        self.thread_lock = threading.Lock()

##
# @brief Call the  Infernce 
#
# @return  Inference Object
    def get_inference_object(self):

        log.info("get_inference_object is called")
        inference_map = {}
        inference_map['inference'] = {}

        if self.iris_request.system == "athena":
            url_to_notification_map = Config.get_url_to_notification_map()
            for notification_id, urls in url_to_notification_map.iteritems():
                inference_map['inference'][notification_id] = any(url in string.lower(self.iris_request.url) for url in urls)    

            title_to_notification_map = Config.get_title_to_notification_map()
            if self.iris_request.context.has_key('title') :
                for notification_id, titles in title_to_notification_map.iteritems():
                    if not inference_map['inference'].has_key(notification_id) or inference_map['inference'][notification_id] == False:
                        inference_map['inference'][notification_id] = any(title in string.lower(self.iris_request.context['title']) for title in titles)

        return inference_map
##
# @brief 
#
# @return the list of the notification required from the reference
    def get_notification_name_list(self):
        inference_object = self.get_inference_object()
        return [y for y in inference_object['inference'] if inference_object['inference'][y] == True]


##
# @brief collect notifications from the various sources
#
# @return notification objects
    def get_notifications(self):
        if(len(self.iris_request.notification_name_list) > 0):
            self.notification_name_list = self.iris_request.notification_name_list              # List of Notifciation List is provided by the client
        else:
            self.notification_name_list = self.get_notification_name_list()                     # Get Notification Name List from the Inference

        string_translations = {}
        for notification_name in self.notification_name_list:
            config = Config.get_config(notification_name)
            nt = {}

            nt['message'] = self.__get_message_from_template(config.message_map)
            nt['subject'] = self.__get_message_from_template(config.subject_map)
            nt['short_message'] = self.__get_message_from_template(config.short_message_map)
            nt['impact_summary'] = self.__get_message_from_template(config.impact_summary_map)

            action_string_map = {}
            for h in config.action_summary_map:
                if h.has_key('string_id'):
                     action_string_map[h['string_id']] = self.__get_message_from_template(h)
            nt['action_summary_list'] = action_string_map

            help_strings_map = {}
            for h in config.help_content:
                if h.has_key('string_id'):
                     help_strings_map[h['string_id']] = self.__get_message_from_template(h)
            nt['help_content_strings'] = help_strings_map

            string_translations[notification_name] = nt


        notifications_map = {}

        log.info("starting the thread pool for getting the notifications data")

        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            future_to_notification_name = dict((executor.submit(self.fetch_notifications_by_notification_name, notification_name, string_translations), notification_name)
                                                 for notification_name in self.notification_name_list) 

        log.info("end of threadpool")

        log.info("start processing the data produced by the thread pool")

        for future in concurrent.futures.as_completed(future_to_notification_name):
            notification_name = future_to_notification_name[future]
            if future.exception() is not None:
                raise Exception("Error occured while fetching the data for notification: "+notification_name+", error: "+str(future.exception()))

            if len(future.result()) > 0:
                notifications_map[notification_name] = future.result() 

        log.info("end processing the data produced by the thread pool")
        self.iris_request.session.commit()
        log.info("Commited the DB session for the notifications")

        return notifications_map

###
# @brief This function collect the notifications for the specified notification type, by making object model call
#
# @input notification_name : Type of the notification to be fetched
# @input string_translations : List of string translations
# @input notification_map : Map of notifications, collected notifications will be pushed to this map 
    def fetch_notifications_by_notification_name (self, notification_name, string_translations):
        log.info("fetch_notifications_by_notification_name is called")
        object_model = ObjectModel(IRISRequest = self.iris_request, NotificationName = notification_name, StringMap = string_translations[notification_name])
        notifications = object_model.get_iris_notification_objects()

        filtered_notifications = self.__filter_notifications_by_status_and_request_type(notifications)
        if len(filtered_notifications) > 0:                    
            return filtered_notifications
        else: 
            return []

###
# @brief This function filter the notification based on status, i.e. of notification is expired, snoozed or dismissed
#        and also based on request type
#
# @input notifications: List of notifications
#
# @return Filtered notification list
    def __filter_notifications_by_status_and_request_type(self, notifications):
        log.info("__filter_notifications_by_status_and_request_type is called")
        filtered_notifications = []
        for notification in notifications:
            keep_notification = True

            # Extracting read status of notifications and storing new notifications
            log.info("Acquiring the lock on thread, to save the data into DB")
            self.thread_lock.acquire()
            notification.save() 
            self.thread_lock.release()
            log.info("Releasing the lock after saving the data into DB")

            # Filtering inactive notifications, i.e dismissed notifications
            if notification.is_active == False:
                keep_notification = False

            # Filtering expired notifications, if validity = -1 then notification will never expire    
            if notification.validity != -1 and (datetime.date.today() - notification.creation_date).days > notification.validity:
                keep_notification = False

            # Filtering out the snoozed notifications
            if notification.snooze_date != None and (datetime.datetime.today() - notification.snooze_date).days <= notification.snooze_duration:
                keep_notification = False

            # Filtering out unread notification when request type is FETCH_READ
            if self.iris_request.notifcation_fetch_type == Constants.FETCH_TYPE_READ and notification.is_read == False:
                keep_notification = False

            # Filtering out read notification when request type is FETCH_UNREAD
            if self.iris_request.notifcation_fetch_type == Constants.FETCH_TYPE_UNREAD and notification.is_read == True:
                keep_notification = False

            if keep_notification == True:
                filtered_notifications.append(notification)

        return filtered_notifications

我以给定的方式使用锁

self.thread_lock.acquire()
notification.save() 
self.thread_lock.release()

当notification.save()抛出异常时,系统将无法释放线程。

它可以通过适当的错误处理轻松修复。

self.thread_lock.acquire()
try:
    notification.save()
except Exception as e:
    log.error("unable to store info in DB")
finally:
    self.thread_lock.release()