使用数据流读取 CSV 文件,但在将行提取到 Google BigQuery 之前添加两个额外的列 op_type 和 op_time
Read CSV file with dataflow but add two additional columns op_type and op_time before ingesting rows into Google BigQuery
我有一个数据流代码,它从 gs:// 中的存储桶读取 CSV 文件并将该 CSV 文件提取到 BigQuery table。 BigQuery table 已创建。下面的代码工作正常。
class DataIngestion:
"""A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""
def parse_method(self, string_input):
values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
values))
return row
def run(argv=None):
data_ingestion = DataIngestion()
p = beam.Pipeline(options=PipelineOptions())
(p
| 'Create PCollection' >> beam.Create(source_file)
| 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1) ## ignore the csv header
| 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
| 'Write to BigQuery' >> beam.io.Write(
beam.io.WriteToBigQuery(
'DUMMY',
dataset='test',
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
result = p.run()
result.wait_until_finish()
但是,我需要为 CSV 文件中的每一行提取两个额外的列;即 op_type 和 op_time。这些来自 BigQuery table 定义如下。
Field name
Type
Mode
Policy tags
Description
ID FLOAT REQUIRED
CLUSTERED FLOAT NULLABLE
SCATTERED FLOAT NULLABLE
RANDOMISED FLOAT NULLABLE
RANDOM_STRING STRING NULLABLE
SMALL_VC STRING NULLABLE
PADDING STRING NULLABLE
op_type INTEGER REQUIRED
op_time TIMESTAMP REQUIRED
在 PySpark 中,我可以通过向数据框添加两列来实现此目的,如下所示:
df= self.spark.createDataFrame(rdd, schema = Schema)
df = df. \
withColumn("op_type", lit(1)). \
withColumn("op_time", current_timestamp())
所以 op_type 设置为 1,表示插入,op_time 需要是 current_timestamp()
如何使用数据流实现这一点?这两列是添加的列,所以 'String To BigQuery Row' 应该反映出来?
谢谢
返回前更新字典。
def parse_method(self, string_input):
values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
values))
static_cols = {'op_time':'some_time','Op_type':'som_type'}
row.update(static_cols)
return row
这有效
from datetime import datetime
def parse_method(self, string_input):
values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
values))
Timestamp = datetime.now()
static_cols = {'op_type': 1, 'op_time': Timestamp}
row.update(static_cols)
return row
注意Python中Timestamp的定义 -> Timestamp = datetime.now() 正确映射到BigQuery中TIMESTAMP类型的列
我有一个数据流代码,它从 gs:// 中的存储桶读取 CSV 文件并将该 CSV 文件提取到 BigQuery table。 BigQuery table 已创建。下面的代码工作正常。
class DataIngestion:
"""A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""
def parse_method(self, string_input):
values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
values))
return row
def run(argv=None):
data_ingestion = DataIngestion()
p = beam.Pipeline(options=PipelineOptions())
(p
| 'Create PCollection' >> beam.Create(source_file)
| 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1) ## ignore the csv header
| 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
| 'Write to BigQuery' >> beam.io.Write(
beam.io.WriteToBigQuery(
'DUMMY',
dataset='test',
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
result = p.run()
result.wait_until_finish()
但是,我需要为 CSV 文件中的每一行提取两个额外的列;即 op_type 和 op_time。这些来自 BigQuery table 定义如下。
Field name
Type
Mode
Policy tags
Description
ID FLOAT REQUIRED
CLUSTERED FLOAT NULLABLE
SCATTERED FLOAT NULLABLE
RANDOMISED FLOAT NULLABLE
RANDOM_STRING STRING NULLABLE
SMALL_VC STRING NULLABLE
PADDING STRING NULLABLE
op_type INTEGER REQUIRED
op_time TIMESTAMP REQUIRED
在 PySpark 中,我可以通过向数据框添加两列来实现此目的,如下所示:
df= self.spark.createDataFrame(rdd, schema = Schema)
df = df. \
withColumn("op_type", lit(1)). \
withColumn("op_time", current_timestamp())
所以 op_type 设置为 1,表示插入,op_time 需要是 current_timestamp()
如何使用数据流实现这一点?这两列是添加的列,所以 'String To BigQuery Row' 应该反映出来?
谢谢
返回前更新字典。
def parse_method(self, string_input):
values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
values))
static_cols = {'op_time':'some_time','Op_type':'som_type'}
row.update(static_cols)
return row
这有效
from datetime import datetime
def parse_method(self, string_input):
values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
values))
Timestamp = datetime.now()
static_cols = {'op_type': 1, 'op_time': Timestamp}
row.update(static_cols)
return row
注意Python中Timestamp的定义 -> Timestamp = datetime.now() 正确映射到BigQuery中TIMESTAMP类型的列