如何优化此气流操作员代码以在 celery worker 上使用最少的 RAM?
How to optimize this airflow operator code to use minimal RAM on the celery worker?
我正在使用 Airflow(Astronomer.io 部署),此 DAG 代码在 Celery 部署中。
这个DAG从数据库(SQL服务器)获取数据,然后对记录列表进行如下操作。下面是一个片段,因为这是 Airflow,所以使用 SQLAlchemy 获取数据,然后我将其转换为列表。
# Perform query and convert returned tuple to list
logging.info(f"QUERY: {query}")
results = list(hook.get_records(query))
logging.info("Successfully performed MSSQL query:")
# logging.info(f"{results}")
results = [dict([k.lower(), str(v)] if v is not None else [k, v]
for k, v in i.items()) for i in results]
results = '\n'.join([json.dumps(i) for i in results])
results = results.encode('utf-8')
logging.info("Uploading!")
fout.write(results)
这段代码最终会导致 celery worker 循环,直到它在日志消息“成功执行 MSSQL 查询”的步骤之后超时。
似乎列表推导在 200 万行以上时表现不佳。
我对列表推导还很陌生,所以我正在寻找一种优化它的方法,以免在我的 celery worker 中使用太多资源 (RAM)。
您的问题是 get_records
将整个结果集读入内存。
您想在任何给定时间限制内存中的行数。
你要的是发电机。像这样:
class MyOp(BaseOperator):
BATCH_SIZE = 10000
def get_rows(self):
with self.hook.get_conn() as cnx:
cur = cnx.cursor()
cur.execute(query)
while True:
curr_batch = cur.fetchmany(self.BATCH_SIZE)
if not curr_batch:
break
yield from curr_batch
def execute(self, context=None):
rows = self.get_rows()
self.do_something_with_rows(rows)
在这里,您一次检索 10k 行,并在执行过程中生成它们。
或者,您可以在该挂钩上使用另一种方法 get_pandas_df
,并向其传递参数 chunksize
,它将生成大小为 chunksize
的列表,您可以将其处理为你走类似的路。
我留给您实施的方法 do_something_with_rows
,但通常您所做的是写入一个临时的本地临时文件,然后上传到某个服务。有关示例,请参阅 tempfile
库及其在气流中的用法。
我正在使用 Airflow(Astronomer.io 部署),此 DAG 代码在 Celery 部署中。
这个DAG从数据库(SQL服务器)获取数据,然后对记录列表进行如下操作。下面是一个片段,因为这是 Airflow,所以使用 SQLAlchemy 获取数据,然后我将其转换为列表。
# Perform query and convert returned tuple to list
logging.info(f"QUERY: {query}")
results = list(hook.get_records(query))
logging.info("Successfully performed MSSQL query:")
# logging.info(f"{results}")
results = [dict([k.lower(), str(v)] if v is not None else [k, v]
for k, v in i.items()) for i in results]
results = '\n'.join([json.dumps(i) for i in results])
results = results.encode('utf-8')
logging.info("Uploading!")
fout.write(results)
这段代码最终会导致 celery worker 循环,直到它在日志消息“成功执行 MSSQL 查询”的步骤之后超时。
似乎列表推导在 200 万行以上时表现不佳。
我对列表推导还很陌生,所以我正在寻找一种优化它的方法,以免在我的 celery worker 中使用太多资源 (RAM)。
您的问题是 get_records
将整个结果集读入内存。
您想在任何给定时间限制内存中的行数。
你要的是发电机。像这样:
class MyOp(BaseOperator):
BATCH_SIZE = 10000
def get_rows(self):
with self.hook.get_conn() as cnx:
cur = cnx.cursor()
cur.execute(query)
while True:
curr_batch = cur.fetchmany(self.BATCH_SIZE)
if not curr_batch:
break
yield from curr_batch
def execute(self, context=None):
rows = self.get_rows()
self.do_something_with_rows(rows)
在这里,您一次检索 10k 行,并在执行过程中生成它们。
或者,您可以在该挂钩上使用另一种方法 get_pandas_df
,并向其传递参数 chunksize
,它将生成大小为 chunksize
的列表,您可以将其处理为你走类似的路。
我留给您实施的方法 do_something_with_rows
,但通常您所做的是写入一个临时的本地临时文件,然后上传到某个服务。有关示例,请参阅 tempfile
库及其在气流中的用法。