如何使用 pandas 不断获取数据的文件读取并推送到 SQL 数据库
How to read and push to an SQL database using pandas a file that is constantly getting data
我有一个 CSV 文件,它不断地从另一个程序中获取新数据(新行附加在末尾)。
我的目标是在 python 中读取此文件并将传入数据在线推送到 SQL 数据库中。我不想在推送数据之前等待(或者更确切地说,我负担不起)文件完成。文件打开后,程序应不断等待新行推送到数据库。
Pandas 嵌入了一个非常有用的方法 to_sql,我目前正在使用该方法将数据推送到数据库,这就是为什么我愿意继续使用 pandas。
Pandas' read_csv 方法接受我尝试使用的 chunk_size 参数。这种行为很有趣,但是每次查询块时文件 不会 重新加载,因此没有解决我的问题。在这个例子中
df = pd.read_csv('filename.csv', chunksize=1)
time.sleep(10)
df.get_chunk(5)
如果文件中的数据在休眠期间被修改,查询chunks时不会被捕获
有人知道我该怎么做吗?
提前致谢
总的来说:就是不要。如果您正在读取由另一个进程打开的文件,则无法确定您正在读取有效的 CSV(因为写入过程可能在该行的中途)。
如果您坚持,有几种方法可以做到。第一个是使用 watchdog
。这个 Python 库可以“监听”文件更改。一旦文件发生变化(因为另一个进程写入了新行),您可以再次调用 pandas.read_csv
。
另一种方法是定期(例如每 10 秒)检查文件的内容。这是此类程序的示例:
import time
import pandas
def check_updates(n_rows_read: int) -> int:
""" Check for updates in a file, starting from a specific row number.
Args:
n_rows_read (int): The number of rows in the file that are already read
Returns:
int: The total number of lines that are now processed in the file
"""
# Read the file, starting from where we left off earlier
df = pandas.read_csv("./test.csv", skiprows=n_rows_read)
# TODO: Implement your logic here, and push to your SQL database
# df.to_sql(...)
return df.shape[0] + n_rows_read
if __name__ == "__main__":
# In the begining we haven't read any lines yet
n_rows_read = 0
# Start an infinite loop
while True:
# Get updates, and keep track of how many rows of the file are processed
n_rows_read = check_updates(n_rows_read)
# Wait for a bit
time.sleep(10)
注意:您需要根据自己的目的修改此代码
我有一个 CSV 文件,它不断地从另一个程序中获取新数据(新行附加在末尾)。
我的目标是在 python 中读取此文件并将传入数据在线推送到 SQL 数据库中。我不想在推送数据之前等待(或者更确切地说,我负担不起)文件完成。文件打开后,程序应不断等待新行推送到数据库。
Pandas 嵌入了一个非常有用的方法 to_sql,我目前正在使用该方法将数据推送到数据库,这就是为什么我愿意继续使用 pandas。
Pandas' read_csv 方法接受我尝试使用的 chunk_size 参数。这种行为很有趣,但是每次查询块时文件 不会 重新加载,因此没有解决我的问题。在这个例子中
df = pd.read_csv('filename.csv', chunksize=1)
time.sleep(10)
df.get_chunk(5)
如果文件中的数据在休眠期间被修改,查询chunks时不会被捕获
有人知道我该怎么做吗?
提前致谢
总的来说:就是不要。如果您正在读取由另一个进程打开的文件,则无法确定您正在读取有效的 CSV(因为写入过程可能在该行的中途)。
如果您坚持,有几种方法可以做到。第一个是使用 watchdog
。这个 Python 库可以“监听”文件更改。一旦文件发生变化(因为另一个进程写入了新行),您可以再次调用 pandas.read_csv
。
另一种方法是定期(例如每 10 秒)检查文件的内容。这是此类程序的示例:
import time
import pandas
def check_updates(n_rows_read: int) -> int:
""" Check for updates in a file, starting from a specific row number.
Args:
n_rows_read (int): The number of rows in the file that are already read
Returns:
int: The total number of lines that are now processed in the file
"""
# Read the file, starting from where we left off earlier
df = pandas.read_csv("./test.csv", skiprows=n_rows_read)
# TODO: Implement your logic here, and push to your SQL database
# df.to_sql(...)
return df.shape[0] + n_rows_read
if __name__ == "__main__":
# In the begining we haven't read any lines yet
n_rows_read = 0
# Start an infinite loop
while True:
# Get updates, and keep track of how many rows of the file are processed
n_rows_read = check_updates(n_rows_read)
# Wait for a bit
time.sleep(10)
注意:您需要根据自己的目的修改此代码