如何在将文件复制到 GCP 存储桶时从 Python 进行云日志记录
How to make Cloud Logging from Python while copying file to GCP Bucket
我写了一个 python 脚本来将文件从本地复制到 gcp 存储桶并捕获日志信息。
gsutil rsync
命令运行良好,文件正在被复制到相应的目标文件夹。
但是,日志信息没有出现在 gcp 日志查看器上。下面给出了示例脚本。请提出建议。
## python3 /home/sant/multiprocessing_gs.py
from multiprocessing import Pool
from subprocess import Popen, PIPE, TimeoutExpired, run, CalledProcessError
import os
import sys
import logging as lg
import google.cloud.logging as gcl
from google.cloud.logging.handlers import CloudLoggingHandler
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/sant/key.json"
ftp_path1 = "/home/sant"
GCS_DATA_INGEST_BUCKET_URL = "dev2-ingest-manual"
class GcsMover:
def __init__(self):
self.folder_list = ["raw_amr", "osr_data"]
self.logger = self.create_logger()
def create_logger(self, log_name="Root_Logger", log_level=lg.INFO):
try:
log_format = lg.Formatter("%(levelname)s %(asctime)s - %(message)s")
client = gcl.Client()
log_handler = CloudLoggingHandler(client)
log_handler.setFormatter(log_format)
logger = lg.getLogger(log_name)
logger.setLevel(log_level)
logger.addHandler(log_handler)
return logger
except Exception as e:
sys.exit("WARNING - Invalid cloud logging")
def execute_jobs(self, cmd):
try:
gs_sp = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
print(f"starting process with Pid {str(gs_sp.pid)} for command {cmd}")
self.logger.info(f"starting process with Pid {str(gs_sp.pid)} for command {cmd}")
sp_out, sp_err = gs_sp.communicate(timeout=int(3600))
except OSError:
self.logger.error(f"Processing aborted for Pid {str(gs_sp.pid)}")
except TimeoutExpired:
gs_sp.kill()
self.logger.error(f"Processing aborted for Pid {str(gs_sp.pid)}")
else:
if gs_sp.returncode:
self.logger.error(f"Failure due to {sp_err} for Pid {str(gs_sp.pid)} and command {cmd}")
else:
print(f"Loading successful for Pid {str(gs_sp.pid)}")
self.logger.info(f"Loading successful for Pid {str(gs_sp.pid)}")
def move_files(self):
command_list = []
for folder in self.folder_list:
gs_command = f"gsutil -m rsync -r {ftp_path1}/{folder} gs://{GCS_DATA_INGEST_BUCKET_URL}/{folder}"
command_list.append(gs_command)
pool = Pool(processes=2, maxtasksperchild=1)
pool.map(self.execute_jobs, iterable=command_list)
pool.close()
pool.join()
def main():
gsu = GcsMover()
gsu.move_files()
if __name__ == "__main__":
main()
documentation 解释了如何使用存储触发器将 activity 记录到具有 Cloud Functions 的 GGS 存储桶中。
我已经对其进行了测试并且它对我有用,我使用了与文档中提供的代码相同的代码:
def hello_gcs(event, context):
"""Background Cloud Function to be triggered by Cloud Storage.
This generic function logs relevant data when a file is changed.
Args:
event (dict): The dictionary with data specific to this type of event.
The `data` field contains a description of the event in
the Cloud Storage `object` format described here:
https://cloud.google.com/storage/docs/json_api/v1/objects#resource
context (google.cloud.functions.Context): Metadata of triggering event.
Returns:
None; the output is written to Stackdriver Logging
"""
print('Event ID: {}'.format(context.event_id))
print('Event type: {}'.format(context.event_type))
print('Bucket: {}'.format(event['bucket']))
print('File: {}'.format(event['name']))
print('Metageneration: {}'.format(event['metageneration']))
print('Created: {}'.format(event['timeCreated']))
print('Updated: {}'.format(event['updated']))
为了部署,我使用了命令:
gcloud functions deploy hello_gcs \
--runtime python37 \
--trigger-resource YOUR_TRIGGER_BUCKET_NAME \
--trigger-event google.storage.object.finalize
Google Cloud Storage 可以记录对对象执行的操作,如您项目中的described in the documentation. You might need to activate audit logs。
由于您的脚本使用 rsync
,这会在 GCS 上执行一些操作(详见 the code of the command),但作为概述,它将检查对象是否存在于存储桶中(通过列出存储桶),如果存在,它将比较本地文件的哈希值与远程文件的哈希值,如果文件已更改或之前不存在,它将上传文件。
所有这些操作都将记录在数据访问日志中,您可以从 the console.
访问这些日志
如果您还想保留本地日志(以防本地错误未记录在云中),您可以通过将重定向附加到日志文件来更改执行的命令:
gsutil -m rsync -r /source/path gs://bucket/folder &> /path/to/log
我写了一个 python 脚本来将文件从本地复制到 gcp 存储桶并捕获日志信息。
gsutil rsync
命令运行良好,文件正在被复制到相应的目标文件夹。
但是,日志信息没有出现在 gcp 日志查看器上。下面给出了示例脚本。请提出建议。
## python3 /home/sant/multiprocessing_gs.py
from multiprocessing import Pool
from subprocess import Popen, PIPE, TimeoutExpired, run, CalledProcessError
import os
import sys
import logging as lg
import google.cloud.logging as gcl
from google.cloud.logging.handlers import CloudLoggingHandler
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/sant/key.json"
ftp_path1 = "/home/sant"
GCS_DATA_INGEST_BUCKET_URL = "dev2-ingest-manual"
class GcsMover:
def __init__(self):
self.folder_list = ["raw_amr", "osr_data"]
self.logger = self.create_logger()
def create_logger(self, log_name="Root_Logger", log_level=lg.INFO):
try:
log_format = lg.Formatter("%(levelname)s %(asctime)s - %(message)s")
client = gcl.Client()
log_handler = CloudLoggingHandler(client)
log_handler.setFormatter(log_format)
logger = lg.getLogger(log_name)
logger.setLevel(log_level)
logger.addHandler(log_handler)
return logger
except Exception as e:
sys.exit("WARNING - Invalid cloud logging")
def execute_jobs(self, cmd):
try:
gs_sp = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
print(f"starting process with Pid {str(gs_sp.pid)} for command {cmd}")
self.logger.info(f"starting process with Pid {str(gs_sp.pid)} for command {cmd}")
sp_out, sp_err = gs_sp.communicate(timeout=int(3600))
except OSError:
self.logger.error(f"Processing aborted for Pid {str(gs_sp.pid)}")
except TimeoutExpired:
gs_sp.kill()
self.logger.error(f"Processing aborted for Pid {str(gs_sp.pid)}")
else:
if gs_sp.returncode:
self.logger.error(f"Failure due to {sp_err} for Pid {str(gs_sp.pid)} and command {cmd}")
else:
print(f"Loading successful for Pid {str(gs_sp.pid)}")
self.logger.info(f"Loading successful for Pid {str(gs_sp.pid)}")
def move_files(self):
command_list = []
for folder in self.folder_list:
gs_command = f"gsutil -m rsync -r {ftp_path1}/{folder} gs://{GCS_DATA_INGEST_BUCKET_URL}/{folder}"
command_list.append(gs_command)
pool = Pool(processes=2, maxtasksperchild=1)
pool.map(self.execute_jobs, iterable=command_list)
pool.close()
pool.join()
def main():
gsu = GcsMover()
gsu.move_files()
if __name__ == "__main__":
main()
documentation 解释了如何使用存储触发器将 activity 记录到具有 Cloud Functions 的 GGS 存储桶中。 我已经对其进行了测试并且它对我有用,我使用了与文档中提供的代码相同的代码:
def hello_gcs(event, context):
"""Background Cloud Function to be triggered by Cloud Storage.
This generic function logs relevant data when a file is changed.
Args:
event (dict): The dictionary with data specific to this type of event.
The `data` field contains a description of the event in
the Cloud Storage `object` format described here:
https://cloud.google.com/storage/docs/json_api/v1/objects#resource
context (google.cloud.functions.Context): Metadata of triggering event.
Returns:
None; the output is written to Stackdriver Logging
"""
print('Event ID: {}'.format(context.event_id))
print('Event type: {}'.format(context.event_type))
print('Bucket: {}'.format(event['bucket']))
print('File: {}'.format(event['name']))
print('Metageneration: {}'.format(event['metageneration']))
print('Created: {}'.format(event['timeCreated']))
print('Updated: {}'.format(event['updated']))
为了部署,我使用了命令:
gcloud functions deploy hello_gcs \
--runtime python37 \
--trigger-resource YOUR_TRIGGER_BUCKET_NAME \
--trigger-event google.storage.object.finalize
Google Cloud Storage 可以记录对对象执行的操作,如您项目中的described in the documentation. You might need to activate audit logs。
由于您的脚本使用 rsync
,这会在 GCS 上执行一些操作(详见 the code of the command),但作为概述,它将检查对象是否存在于存储桶中(通过列出存储桶),如果存在,它将比较本地文件的哈希值与远程文件的哈希值,如果文件已更改或之前不存在,它将上传文件。
所有这些操作都将记录在数据访问日志中,您可以从 the console.
访问这些日志如果您还想保留本地日志(以防本地错误未记录在云中),您可以通过将重定向附加到日志文件来更改执行的命令:
gsutil -m rsync -r /source/path gs://bucket/folder &> /path/to/log