GCP Composer - Airflow 网络服务器不断关闭

GCP Composer - Airflow webserver shutdown constantly

我正在使用具有最新映像版本 composer-1.16.1-airflow-1.10.15 的 GCP Composer。

我的网络服务器有时会因为一些缓存文件丢失而死机

{cli.py:1050} ERROR - [Errno 2] No such file or directory

有人知道怎么解决吗?


附加信息:

工人: 节点数 3 磁盘大小 (GB) 20 机器类型 n1-standard-1

网络服务器配置: 机器类型 composer-n1-webserver-8(8 个 vCPU,7.6 GB 内存)

配置覆盖:


更新 27.04.2021

我已经设法找到负责杀死网络服务器的地方

https://github.com/apache/airflow/blob/4aec433e48dcc66c9c7b74947c499260ab6be9e9/airflow/bin/cli.py#L1032-L1138

GCP Composer 在底层使用 Celery Executor - 在检查期间它会尝试读取一些已被工作人员删除的缓存文件吗?

我找到了! Aa 我会将该错误报告给 GCP Composer 团队

因此,如果配置 webserver.reload_on_plugin_change=True 那么 cli 将进入该部分: https://github.com/apache/airflow/blob/4aec433e48dcc66c9c7b74947c499260ab6be9e9/airflow/bin/cli.py#L1118-L1138

 # if we should check the directory with the plugin,
    if self.reload_on_plugin_change:
        # compare the previous and current contents of the directory
        new_state = self._generate_plugin_state()
        # If changed, wait until its content is fully saved.
        if new_state != self._last_plugin_state:
            self.log.debug(
                '[%d / %d] Plugins folder changed. The gunicorn will be restarted the next time the '
                'plugin directory is checked, if there is no change in it.',
                num_ready_workers_running, num_workers_running
            )
            self._restart_on_next_plugin_check = True
            self._last_plugin_state = new_state
        elif self._restart_on_next_plugin_check:
            self.log.debug(
                '[%d / %d] Starts reloading the gunicorn configuration.',
                num_ready_workers_running, num_workers_running
            )
            self._restart_on_next_plugin_check = False
            self._last_refresh_time = time.time()
            self._reload_gunicorn()

def _generate_plugin_state(self):
    """
    Generate dict of filenames and last modification time of all files in settings.PLUGINS_FOLDER
    directory.
    """
    if not settings.PLUGINS_FOLDER:
        return {}
    all_filenames = []
    for (root, _, filenames) in os.walk(settings.PLUGINS_FOLDER):
        all_filenames.extend(os.path.join(root, f) for f in filenames)
    plugin_state = {f: self._get_file_hash(f) for f in sorted(all_filenames)}
    return plugin_state

它正在生成文件以通过调用 os.walk(settings.PLUGINS_FOLDER) 函数进行检查。

同时 gcsfuse 决定删除部分文件 并且发生错误 - 找不到文件。

所以禁用 webserver.reload_on_plugin_change 是可行的 - 但这个选项真的很方便所以我会为 google

创建错误票