如何在将文件复制到 GCP 存储桶时从 Python 进行云日志记录

How to make Cloud Logging from Python while copying file to GCP Bucket

我写了一个 python 脚本来将文件从本地复制到 gcp 存储桶并捕获日志信息。

gsutil rsync 命令运行良好,文件正在被复制到相应的目标文件夹。

但是,日志信息没有出现在 gcp 日志查看器上。下面给出了示例脚本。请提出建议。

## python3 /home/sant/multiprocessing_gs.py
from multiprocessing import Pool
from subprocess import Popen, PIPE, TimeoutExpired, run, CalledProcessError
import os
import sys
import logging as lg
import google.cloud.logging as gcl
from google.cloud.logging.handlers import CloudLoggingHandler

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/sant/key.json"
ftp_path1 = "/home/sant"
GCS_DATA_INGEST_BUCKET_URL = "dev2-ingest-manual"


class GcsMover:
    def __init__(self):
        self.folder_list = ["raw_amr", "osr_data"]
        self.logger = self.create_logger()

    def create_logger(self, log_name="Root_Logger", log_level=lg.INFO):
        try:
            log_format = lg.Formatter("%(levelname)s %(asctime)s - %(message)s")
            client = gcl.Client()
            log_handler = CloudLoggingHandler(client)
            log_handler.setFormatter(log_format)
            logger = lg.getLogger(log_name)
            logger.setLevel(log_level)
            logger.addHandler(log_handler)
            return logger
        except Exception as e:
            sys.exit("WARNING - Invalid cloud logging")

    def execute_jobs(self, cmd):
        try:
            gs_sp = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
            print(f"starting process with Pid {str(gs_sp.pid)} for command {cmd}")
            self.logger.info(f"starting process with Pid {str(gs_sp.pid)} for command {cmd}")
            sp_out, sp_err = gs_sp.communicate(timeout=int(3600))
        except OSError:
            self.logger.error(f"Processing aborted for Pid {str(gs_sp.pid)}")
        except TimeoutExpired:
            gs_sp.kill()
            self.logger.error(f"Processing aborted for Pid {str(gs_sp.pid)}")
        else:
            if gs_sp.returncode:
                self.logger.error(f"Failure due to {sp_err} for Pid {str(gs_sp.pid)} and command {cmd}")
            else:
                print(f"Loading successful for Pid {str(gs_sp.pid)}")
                self.logger.info(f"Loading successful for Pid {str(gs_sp.pid)}")

    def move_files(self):
        command_list = []
        for folder in self.folder_list:
            gs_command = f"gsutil -m rsync -r {ftp_path1}/{folder} gs://{GCS_DATA_INGEST_BUCKET_URL}/{folder}"
            command_list.append(gs_command)
        pool = Pool(processes=2, maxtasksperchild=1)
        pool.map(self.execute_jobs, iterable=command_list)
        pool.close()
        pool.join()


def main():
    gsu = GcsMover()
    gsu.move_files()


if __name__ == "__main__":
    main()

documentation 解释了如何使用存储触发器将 activity 记录到具有 Cloud Functions 的 GGS 存储桶中。 我已经对其进行了测试并且它对我有用,我使用了与文档中提供的代码相同的代码:

def hello_gcs(event, context):
    """Background Cloud Function to be triggered by Cloud Storage.
       This generic function logs relevant data when a file is changed.

    Args:
        event (dict):  The dictionary with data specific to this type of event.
                       The `data` field contains a description of the event in
                       the Cloud Storage `object` format described here:
                       https://cloud.google.com/storage/docs/json_api/v1/objects#resource
        context (google.cloud.functions.Context): Metadata of triggering event.
    Returns:
        None; the output is written to Stackdriver Logging
    """

    print('Event ID: {}'.format(context.event_id))
    print('Event type: {}'.format(context.event_type))
    print('Bucket: {}'.format(event['bucket']))
    print('File: {}'.format(event['name']))
    print('Metageneration: {}'.format(event['metageneration']))
    print('Created: {}'.format(event['timeCreated']))
    print('Updated: {}'.format(event['updated']))

为了部署,我使用了命令:

gcloud functions deploy hello_gcs \
--runtime python37 \
--trigger-resource YOUR_TRIGGER_BUCKET_NAME \
--trigger-event google.storage.object.finalize

Google Cloud Storage 可以记录对对象执行的操作,如您项目中的described in the documentation. You might need to activate audit logs

由于您的脚本使用 rsync,这会在 GCS 上执行一些操作(详见 the code of the command),但作为概述,它将检查对象是否存在于存储桶中(通过列出存储桶),如果存在,它将比较本地文件的哈希值与远程文件的哈希值,如果文件已更改或之前不存在,它将上传文件。

所有这些操作都将记录在数据访问日志中,您可以从 the console.

访问这些日志

如果您还想保留本地日志(以防本地错误未记录在云中),您可以通过将重定向附加到日志文件来更改执行的命令:

gsutil -m rsync -r /source/path gs://bucket/folder &> /path/to/log