Error: Message: Too many sources provided: 15285. Limit is 10000
Error: Message: Too many sources provided: 15285. Limit is 10000
我目前正在尝试 运行 数据流(Apache Beam,Python SDK)任务以将 >100GB 的推文文件导入 BigQuery,但 运行 正在 Error: Message: Too many sources provided: 15285. Limit is 10000.
该任务获取推文 (JSON),提取 5 个相关字段,transforms/sanitizes 对它们进行一些转换,然后将这些值写入 BigQuery,用于进一步处理。
有,但这似乎是由于有很多不同的输入文件造成的,而我只有一个输入文件,所以它似乎不相关。此外,那里提到的解决方案相当神秘,我不确定 if/how 我是否可以将它们应用于我的问题。
我的猜测是 BigQuery 在持久化之前为每一行或其他内容写入临时文件,这就是 "too many sources" 的意思?
我该如何解决这个问题?
[编辑]
代码:
import argparse
import json
import logging
import apache_beam as beam
class JsonCoder(object):
"""A JSON coder interpreting each line as a JSON string."""
def encode(self, x):
return json.dumps(x)
def decode(self, x):
return json.loads(x)
def filter_by_nonempty_county(record):
if 'county_fips' in record and record['county_fips'] is not None:
yield record
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
default='...',
help=('Input twitter json file specified as: '
'gs://path/to/tweets.json'))
parser.add_argument(
'--output',
required=True,
help=
('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
'or DATASET.TABLE.'))
known_args, pipeline_args = parser.parse_known_args(argv)
p = beam.Pipeline(argv=pipeline_args)
# read text file
#Read all tweets from given source file
read_tweets = "Read Tweet File" >> beam.io.ReadFromText(known_args.input, coder=JsonCoder())
#Extract the relevant fields of the source file
extract_fields = "Project relevant fields" >> beam.Map(lambda row: {'text': row['text'],
'user_id': row['user']['id'],
'location': row['user']['location'] if 'location' in row['user'] else None,
'geo':row['geo'] if 'geo' in row else None,
'tweet_id': row['id'],
'time': row['created_at']})
#check what type of geo-location the user has
has_geo_location_or_not = "partition by has geo or not" >> beam.Partition(lambda element, partitions: 0 if element['geo'] is None else 1, 2)
check_county_not_empty = lambda element, partitions: 1 if 'county_fips' in element and element['county_fips'] is not None else 0
#tweet has coordinates partition or not
coordinate_partition = (p
| read_tweets
| extract_fields
| beam.ParDo(TimeConversion())
| has_geo_location_or_not)
#lookup by coordinates
geo_lookup = (coordinate_partition[1] | "geo coordinates mapping" >> beam.ParDo(BeamGeoLocator())
| "filter successful geo coords" >> beam.Partition(check_county_not_empty, 2))
#lookup by profile
profile_lookup = ((coordinate_partition[0], geo_lookup[0])
| "join streams" >> beam.Flatten()
| "Lookup from profile location" >> beam.ParDo(ComputeLocationFromProfile())
)
bigquery_output = "write output to BigQuery" >> beam.io.Write(
beam.io.BigQuerySink(known_args.output,
schema='text:STRING, user_id:INTEGER, county_fips:STRING, tweet_id:INTEGER, time:TIMESTAMP, county_source:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
#file_output = "write output" >> beam.io.WriteToText(known_args.output, coder=JsonCoder())
output = ((profile_lookup, geo_lookup[1]) | "merge streams" >> beam.Flatten()
| "Filter entries without location" >> beam.FlatMap(filter_by_nonempty_county)
| "project relevant fields" >> beam.Map(lambda row: {'text': row['text'],
'user_id': row['user_id'],
'county_fips': row['county_fips'],
'tweet_id': row['tweet_id'],
'time': row['time'],
'county_source': row['county_source']})
| bigquery_output)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
run()
有点复杂,如果直接在bigquery里做,估计会比较费时间。代码读取推文 json,根据是否带有地理标记来拆分 PCollection,如果没有,它会尝试通过配置文件位置进行查找,将位置映射到与我们的 GIS 分析相关的位置,然后将其写入 BigQuery。
文件数量对应于处理元素的分片数量。
减少这种情况的一个技巧是生成一些随机键,并在写出之前根据这些键对元素进行分组。
例如,您可以在管道中使用以下 DoFn
和 PTransform
:
class _RoundRobinKeyFn(beam.DoFn):
def __init__(self, count):
self.count = count
def start_bundle(self):
self.counter = random.randint(0, self.count - 1)
def process(self, element):
self.counter += 1
if self.counter >= self.count:
self.counter -= self.count
yield self.counter, element
class LimitBundles(beam.PTransform):
def __init__(self, count):
self.count = count
def expand(self, input):
return input
| beam.ParDo(_RoundRobinKeyFn(self.count))
| beam.GroupByKey()
| beam.FlatMap(lambda kv: kv[1])
您只需在 bigquery_output 之前使用它:
output = (# ...
| LimitBundles(10000)
| bigquery_output)
(请注意,我只是在没有测试的情况下输入它,所以可能有一些 Python 错别字。)
我目前正在尝试 运行 数据流(Apache Beam,Python SDK)任务以将 >100GB 的推文文件导入 BigQuery,但 运行 正在 Error: Message: Too many sources provided: 15285. Limit is 10000.
该任务获取推文 (JSON),提取 5 个相关字段,transforms/sanitizes 对它们进行一些转换,然后将这些值写入 BigQuery,用于进一步处理。
有
我的猜测是 BigQuery 在持久化之前为每一行或其他内容写入临时文件,这就是 "too many sources" 的意思?
我该如何解决这个问题?
[编辑]
代码:
import argparse
import json
import logging
import apache_beam as beam
class JsonCoder(object):
"""A JSON coder interpreting each line as a JSON string."""
def encode(self, x):
return json.dumps(x)
def decode(self, x):
return json.loads(x)
def filter_by_nonempty_county(record):
if 'county_fips' in record and record['county_fips'] is not None:
yield record
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
default='...',
help=('Input twitter json file specified as: '
'gs://path/to/tweets.json'))
parser.add_argument(
'--output',
required=True,
help=
('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
'or DATASET.TABLE.'))
known_args, pipeline_args = parser.parse_known_args(argv)
p = beam.Pipeline(argv=pipeline_args)
# read text file
#Read all tweets from given source file
read_tweets = "Read Tweet File" >> beam.io.ReadFromText(known_args.input, coder=JsonCoder())
#Extract the relevant fields of the source file
extract_fields = "Project relevant fields" >> beam.Map(lambda row: {'text': row['text'],
'user_id': row['user']['id'],
'location': row['user']['location'] if 'location' in row['user'] else None,
'geo':row['geo'] if 'geo' in row else None,
'tweet_id': row['id'],
'time': row['created_at']})
#check what type of geo-location the user has
has_geo_location_or_not = "partition by has geo or not" >> beam.Partition(lambda element, partitions: 0 if element['geo'] is None else 1, 2)
check_county_not_empty = lambda element, partitions: 1 if 'county_fips' in element and element['county_fips'] is not None else 0
#tweet has coordinates partition or not
coordinate_partition = (p
| read_tweets
| extract_fields
| beam.ParDo(TimeConversion())
| has_geo_location_or_not)
#lookup by coordinates
geo_lookup = (coordinate_partition[1] | "geo coordinates mapping" >> beam.ParDo(BeamGeoLocator())
| "filter successful geo coords" >> beam.Partition(check_county_not_empty, 2))
#lookup by profile
profile_lookup = ((coordinate_partition[0], geo_lookup[0])
| "join streams" >> beam.Flatten()
| "Lookup from profile location" >> beam.ParDo(ComputeLocationFromProfile())
)
bigquery_output = "write output to BigQuery" >> beam.io.Write(
beam.io.BigQuerySink(known_args.output,
schema='text:STRING, user_id:INTEGER, county_fips:STRING, tweet_id:INTEGER, time:TIMESTAMP, county_source:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
#file_output = "write output" >> beam.io.WriteToText(known_args.output, coder=JsonCoder())
output = ((profile_lookup, geo_lookup[1]) | "merge streams" >> beam.Flatten()
| "Filter entries without location" >> beam.FlatMap(filter_by_nonempty_county)
| "project relevant fields" >> beam.Map(lambda row: {'text': row['text'],
'user_id': row['user_id'],
'county_fips': row['county_fips'],
'tweet_id': row['tweet_id'],
'time': row['time'],
'county_source': row['county_source']})
| bigquery_output)
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
run()
有点复杂,如果直接在bigquery里做,估计会比较费时间。代码读取推文 json,根据是否带有地理标记来拆分 PCollection,如果没有,它会尝试通过配置文件位置进行查找,将位置映射到与我们的 GIS 分析相关的位置,然后将其写入 BigQuery。
文件数量对应于处理元素的分片数量。
减少这种情况的一个技巧是生成一些随机键,并在写出之前根据这些键对元素进行分组。
例如,您可以在管道中使用以下 DoFn
和 PTransform
:
class _RoundRobinKeyFn(beam.DoFn):
def __init__(self, count):
self.count = count
def start_bundle(self):
self.counter = random.randint(0, self.count - 1)
def process(self, element):
self.counter += 1
if self.counter >= self.count:
self.counter -= self.count
yield self.counter, element
class LimitBundles(beam.PTransform):
def __init__(self, count):
self.count = count
def expand(self, input):
return input
| beam.ParDo(_RoundRobinKeyFn(self.count))
| beam.GroupByKey()
| beam.FlatMap(lambda kv: kv[1])
您只需在 bigquery_output 之前使用它:
output = (# ...
| LimitBundles(10000)
| bigquery_output)
(请注意,我只是在没有测试的情况下输入它,所以可能有一些 Python 错别字。)