Google Cloud Dataflow 访问云存储上的 .txt 文件
Google Cloud Dataflow access .txt file on cloud storage
如果我在 GCS 上存储了一个 .txt 文件,其中包含将用作 beam.Filter
一部分的单词列表,可以在我的 apache Beam 管道中动态访问此列表吗?我知道我可以将此列表定义为管道中的全局变量,但我不确定如何将整个文件读入列表以及是否有任何光束技巧可以实现此目的。有什么建议么?这是我当前无法正常工作的实现..
def boolean_terms(word, term_list):
if word in term_list:
return (word, 1)
else:
return (word, 0)
# side table
filter_terms = p | beam.io.ReadFromText(path_to_gcs_txt_file)
words = ...
filtered_words = words | beam.FlatMap(lambda x:
[boolean_terms(word, filter_terms) for word in x])
我收到以下错误 "TypeError: argument of type '_InvalidUnpickledPCollection' is not iterable"
您可以作为 side input 访问单词列表。我相信 beam.Filter
转换支持使用过滤器函数的侧输入,其方式与示例中的 FlatMap
和 ParDo
完全相同 link.
类似于:
words | beam.Filter(lambda x, filter_terms: word in filter_terms,
filter_terms=pvalue.AsList(p | beam.io.ReadFromText(path)))
如果我在 GCS 上存储了一个 .txt 文件,其中包含将用作 beam.Filter
一部分的单词列表,可以在我的 apache Beam 管道中动态访问此列表吗?我知道我可以将此列表定义为管道中的全局变量,但我不确定如何将整个文件读入列表以及是否有任何光束技巧可以实现此目的。有什么建议么?这是我当前无法正常工作的实现..
def boolean_terms(word, term_list):
if word in term_list:
return (word, 1)
else:
return (word, 0)
# side table
filter_terms = p | beam.io.ReadFromText(path_to_gcs_txt_file)
words = ...
filtered_words = words | beam.FlatMap(lambda x:
[boolean_terms(word, filter_terms) for word in x])
我收到以下错误 "TypeError: argument of type '_InvalidUnpickledPCollection' is not iterable"
您可以作为 side input 访问单词列表。我相信 beam.Filter
转换支持使用过滤器函数的侧输入,其方式与示例中的 FlatMap
和 ParDo
完全相同 link.
类似于:
words | beam.Filter(lambda x, filter_terms: word in filter_terms,
filter_terms=pvalue.AsList(p | beam.io.ReadFromText(path)))