如何将 PySpark / AWS Glue 中 RDD 的所有行连接/连接/合并为一条长线?
How to join / concatenate / merge all rows of an RDD in PySpark / AWS Glue into one single long line?
我有一个协议需要接收许多(读取数百万)条记录。该协议要求所有数据都是单行输入 (InfluxDB / QuestDB)。使用 InfluxDB 客户端目前不是一个选项,因此我需要通过套接字执行此操作。
我正处于 ETL 过程的末尾,我现在只需要获取我创建的最终 RDD 并获取所有这些行并将它们转置为一行但无法弄清楚如何执行此操作(以及如何正确地做到这一点!)
在 PySpark / AWS Glue 中我目前有:
def write_to_quest(df, measurement, table, timestamp_field, args):
HOST = args['questdb_host']
PORT = int(args['questdb_port'])
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((HOST, PORT))
rows = df.rdd.map(lambda row: row.asDict(True))
new_rdd = rows.map(lambda row:
_row_to_line_protocol(row, measurement, table, timestamp_field)).glom()
#transform new_rdd to single_line_rdd here
sock.sendall((single_line_rdd).encode())
except socket.error as e:
print("Got error: %s" % (e))
呼叫者:
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
allDaily = glueContext.create_dynamic_frame.from_catalog(database=args['db_name'],
table_name="daily",
transformation_ctx="allDaily",
push_down_predicate="(date_str='20040302' and meter_id='NEM1206106')"
# for faster testing
)
# TODO: Handle entire DF as a single payload
df = allDaily.toDF()
tdf = df.withColumn('reading_date_time', F.to_timestamp(df['reading_date_time'], '%Y-%m-%dT%H:%M:%S.%f'))
tdf = tdf.drop(*["ingestion_date", "period_start", "period_end", "quality_method",
"event", "import_reactive_total", "export_reactive_total"])
write_to_quest(df=tdf, measurement="meter_id", table="daily", timestamp_field="reading_date_time", args=args)
new_rdd 的形状是一组字符串列表:
RDD[
['string here,to,join','another string,to,join'...x70]
['string here,to,join','another string,to,join'...x70]
['string here,to,join','another string,to,join'...x70]
x200
]
我如何得到这个,所以我只有一行,所有内容都由“\n”(换行符)连接?
例如:
'string here,to,join\nanother string,to,join\n....'
到目前为止,我已经尝试了几种 foreach 组合,例如:
foreach(lambda x: ("\n".join(x)))
但绝对无济于事,我也担心这方面的可扩展性 - 例如,我很确定在数百万行上使用 .collect()
会杀死东西
那么解决这最后一步的最佳方法是什么?
接受答案后编辑
我实施的 Werners 回答的具体解决方案是这样的(我删除了 Glob 以获得每行一个列表项,然后删除了空格(因为 Influx / Quest 对空格敏感)
def write_to_quest(df, measurement, table, timestamp_field, args):
"""
Open a socket and write the row directly into Quest
:param df_row:
:param measurement:
:param table:
:param timestamp_field:
:param args:
:return:
"""
HOST = args['questdb_host']
PORT = int(args['questdb_port'])
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((HOST, PORT))
rows = df.rdd.map(lambda row: row.asDict(True))
new_rdd = rows.map(lambda row:
_row_to_line_protocol(row, measurement, table, timestamp_field))
result = new_rdd.map(lambda r: "".join(r) + "\n") \
.aggregate("", lambda a, b: a + b, lambda a, b: a + b)
sock.sendall((result.encode()))
except socket.error as e:
print("Got error: %s" % (e))
sock.close()
rdd 的每一行都可以使用 map and then the result of the map call can be aggregated 映射到一个大字符串中:
result = rdd.map(lambda r: " ".join(r) + "\n")\
.aggregate("", lambda a,b: a+b, lambda a,b: a+b)
如果目标是拥有一个大字符串,则至少在最后一步必须将所有数据移动到一个地方。在这里使用 aggregate
比收集所有行并在驱动程序上连接字符串 稍微 更好,因为 aggregate
可以在大多数时间分布式和并行地做事情。然而,在单个节点上仍然需要足够的内存来存储整个最终字符串。
我有一个协议需要接收许多(读取数百万)条记录。该协议要求所有数据都是单行输入 (InfluxDB / QuestDB)。使用 InfluxDB 客户端目前不是一个选项,因此我需要通过套接字执行此操作。
我正处于 ETL 过程的末尾,我现在只需要获取我创建的最终 RDD 并获取所有这些行并将它们转置为一行但无法弄清楚如何执行此操作(以及如何正确地做到这一点!)
在 PySpark / AWS Glue 中我目前有:
def write_to_quest(df, measurement, table, timestamp_field, args):
HOST = args['questdb_host']
PORT = int(args['questdb_port'])
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((HOST, PORT))
rows = df.rdd.map(lambda row: row.asDict(True))
new_rdd = rows.map(lambda row:
_row_to_line_protocol(row, measurement, table, timestamp_field)).glom()
#transform new_rdd to single_line_rdd here
sock.sendall((single_line_rdd).encode())
except socket.error as e:
print("Got error: %s" % (e))
呼叫者:
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
allDaily = glueContext.create_dynamic_frame.from_catalog(database=args['db_name'],
table_name="daily",
transformation_ctx="allDaily",
push_down_predicate="(date_str='20040302' and meter_id='NEM1206106')"
# for faster testing
)
# TODO: Handle entire DF as a single payload
df = allDaily.toDF()
tdf = df.withColumn('reading_date_time', F.to_timestamp(df['reading_date_time'], '%Y-%m-%dT%H:%M:%S.%f'))
tdf = tdf.drop(*["ingestion_date", "period_start", "period_end", "quality_method",
"event", "import_reactive_total", "export_reactive_total"])
write_to_quest(df=tdf, measurement="meter_id", table="daily", timestamp_field="reading_date_time", args=args)
new_rdd 的形状是一组字符串列表:
RDD[
['string here,to,join','another string,to,join'...x70]
['string here,to,join','another string,to,join'...x70]
['string here,to,join','another string,to,join'...x70]
x200
]
我如何得到这个,所以我只有一行,所有内容都由“\n”(换行符)连接?
例如:
'string here,to,join\nanother string,to,join\n....'
到目前为止,我已经尝试了几种 foreach 组合,例如:
foreach(lambda x: ("\n".join(x)))
但绝对无济于事,我也担心这方面的可扩展性 - 例如,我很确定在数百万行上使用 .collect()
会杀死东西
那么解决这最后一步的最佳方法是什么?
接受答案后编辑
我实施的 Werners 回答的具体解决方案是这样的(我删除了 Glob 以获得每行一个列表项,然后删除了空格(因为 Influx / Quest 对空格敏感)
def write_to_quest(df, measurement, table, timestamp_field, args):
"""
Open a socket and write the row directly into Quest
:param df_row:
:param measurement:
:param table:
:param timestamp_field:
:param args:
:return:
"""
HOST = args['questdb_host']
PORT = int(args['questdb_port'])
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((HOST, PORT))
rows = df.rdd.map(lambda row: row.asDict(True))
new_rdd = rows.map(lambda row:
_row_to_line_protocol(row, measurement, table, timestamp_field))
result = new_rdd.map(lambda r: "".join(r) + "\n") \
.aggregate("", lambda a, b: a + b, lambda a, b: a + b)
sock.sendall((result.encode()))
except socket.error as e:
print("Got error: %s" % (e))
sock.close()
rdd 的每一行都可以使用 map and then the result of the map call can be aggregated 映射到一个大字符串中:
result = rdd.map(lambda r: " ".join(r) + "\n")\
.aggregate("", lambda a,b: a+b, lambda a,b: a+b)
如果目标是拥有一个大字符串,则至少在最后一步必须将所有数据移动到一个地方。在这里使用 aggregate
比收集所有行并在驱动程序上连接字符串 稍微 更好,因为 aggregate
可以在大多数时间分布式和并行地做事情。然而,在单个节点上仍然需要足够的内存来存储整个最终字符串。