如何使用 Mongodb 中的 "Where" 条件到 bigquery 数据流模板?
How to use "Where" condition in Mongodb to bigquery dataflow template?
我已经为 mongodb 编写了 python 代码,以使用 apache beam (Dataflow Runner) 进行 bigquery 数据管道。
Mongodb 有简单的 mysql 像 table 有 2 列(id 和名称)并且没有复杂的 structure.My 代码如下。
#########################################
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.mongodbio import ReadFromMongoDB
import json
options = PipelineOptions()
################################
def parse_json(line):
new_line=str(line)
record = new_line.split(',')
key0, value0 = record[0].strip().split(":", 1)
key1, value1 = record[1].strip().split(":", 1)
json_data = {"_id":value0.replace('"','').replace('ObjectId(','').replace(')','').replace("'","").strip(),
"name":value1.replace('"','').replace("'","").strip()
}
return json_data
#################################
p = beam.Pipeline(options=options)
p | ReadFromMongoDB(uri='mongodb://mongo_ip:mongo_port',db="db
_name",coll="collection_name") | beam.Map(parse_json) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('bq_project_id.bq_dataset_id.bq_table_name',write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
p.run()
###############################################
此代码运行良好。它从 mongodb 集合中提取所有文档并插入 bigquery。
但我想使用 where 条件只处理具有特定 id 的几行。
如何在 ReadFromMongoDB() 中指定 where 条件?
您可以在 ReadFromMongoDB 中使用过滤器参数。
我已经为 mongodb 编写了 python 代码,以使用 apache beam (Dataflow Runner) 进行 bigquery 数据管道。
Mongodb 有简单的 mysql 像 table 有 2 列(id 和名称)并且没有复杂的 structure.My 代码如下。
#########################################
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.mongodbio import ReadFromMongoDB
import json
options = PipelineOptions()
################################
def parse_json(line):
new_line=str(line)
record = new_line.split(',')
key0, value0 = record[0].strip().split(":", 1)
key1, value1 = record[1].strip().split(":", 1)
json_data = {"_id":value0.replace('"','').replace('ObjectId(','').replace(')','').replace("'","").strip(),
"name":value1.replace('"','').replace("'","").strip()
}
return json_data
#################################
p = beam.Pipeline(options=options)
p | ReadFromMongoDB(uri='mongodb://mongo_ip:mongo_port',db="db
_name",coll="collection_name") | beam.Map(parse_json) | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('bq_project_id.bq_dataset_id.bq_table_name',write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
p.run()
###############################################
此代码运行良好。它从 mongodb 集合中提取所有文档并插入 bigquery。
但我想使用 where 条件只处理具有特定 id 的几行。
如何在 ReadFromMongoDB() 中指定 where 条件?
您可以在 ReadFromMongoDB 中使用过滤器参数。