Google Cloud Dataflow with Apache Beam 不显示日志
Google Cloud Dataflow with Apache Beam does not display log
我的问题是数据流上的日志没有显示任何内容(监控 api 已启用),我不知道为什么。
使用以下 Apache Beam 代码(取自 https://cloud.google.com/dataflow/docs/guides/logging),
import argparse
import logging
import re
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam import FlatMap, Map, Pipeline
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
"--input",
dest="input",
default="gs://dataflow-samples/shakespeare/kinglear.txt",
help="Input file to process.",
)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with Pipeline(options=pipeline_options) as p:
filtered_words = (
p
| "Read" >> ReadFromText(known_args.input)
| "Split" >> FlatMap(lambda x: re.findall(r"[A-Za-z\']+", x))
| "Log" >> Map(lambda x: logging.info(f"x: {x}"))
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()
运行 本地有直接亚军收益,
...
INFO:root:x: his
INFO:root:x: enemy
INFO:root:x: king
INFO:root:x: and
INFO:root:x: did
INFO:root:x: him
...
虽然 运行 在 Google Cloud Dataflow 上没有任何结果。
这里是依赖,
python = "^3.8"
apache-beam = {extras = ["gcp"], version = "^2.28.0"}
原来日志路由器中的默认接收器不包括数据流日志。
使用 resource.type="dataflow_step"
的包含过滤器在日志路由器中创建一个新的接收器解决了这个问题。
我的问题是数据流上的日志没有显示任何内容(监控 api 已启用),我不知道为什么。
使用以下 Apache Beam 代码(取自 https://cloud.google.com/dataflow/docs/guides/logging),
import argparse
import logging
import re
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam import FlatMap, Map, Pipeline
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument(
"--input",
dest="input",
default="gs://dataflow-samples/shakespeare/kinglear.txt",
help="Input file to process.",
)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with Pipeline(options=pipeline_options) as p:
filtered_words = (
p
| "Read" >> ReadFromText(known_args.input)
| "Split" >> FlatMap(lambda x: re.findall(r"[A-Za-z\']+", x))
| "Log" >> Map(lambda x: logging.info(f"x: {x}"))
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run()
运行 本地有直接亚军收益,
...
INFO:root:x: his
INFO:root:x: enemy
INFO:root:x: king
INFO:root:x: and
INFO:root:x: did
INFO:root:x: him
...
虽然 运行 在 Google Cloud Dataflow 上没有任何结果。
这里是依赖,
python = "^3.8"
apache-beam = {extras = ["gcp"], version = "^2.28.0"}
原来日志路由器中的默认接收器不包括数据流日志。
使用 resource.type="dataflow_step"
的包含过滤器在日志路由器中创建一个新的接收器解决了这个问题。