如何使用 pandas 不断获取数据的文件读取并推送到 SQL 数据库

How to read and push to an SQL database using pandas a file that is constantly getting data

我有一个 CSV 文件,它不断地从另一个程序中获取新数据(新行附加在末尾)。

我的目标是在 python 中读取此文件并将传入数据在线推送到 SQL 数据库中。我不想在推送数据之前等待(或者更确切地说,我负担不起)文件完成。文件打开后,程序应不断等待新行推送到数据库。

Pandas 嵌入了一个非常有用的方法 to_sql,我目前正在使用该方法将数据推送到数据库,这就是为什么我愿意继续使用 pandas。

Pandas' read_csv 方法接受我尝试使用的 chunk_size 参数。这种行为很有趣,但是每次查询块时文件 不会 重新加载,因此没有解决我的问题。在这个例子中

df = pd.read_csv('filename.csv', chunksize=1)
time.sleep(10)
df.get_chunk(5)

如果文件中的数据在休眠期间被修改,查询chunks时不会被捕获

有人知道我该怎么做吗?

提前致谢

总的来说:就是不要。如果您正在读取由另一个进程打开的文件,则无法确定您正在读取有效的 CSV(因为写入过程可能在该行的中途)。

如果您坚持,有几种方法可以做到。第一个是使用 watchdog。这个 Python 库可以“监听”文件更改。一旦文件发生变化(因为另一个进程写入了新行),您可以再次调用 pandas.read_csv

另一种方法是定期(例如每 10 秒)检查文件的内容。这是此类程序的示例:

import time
import pandas


def check_updates(n_rows_read: int) -> int:
    """ Check for updates in a file, starting from a specific row number.

    Args:
        n_rows_read (int): The number of rows in the file that are already read

    Returns:
        int: The total number of lines that are now processed in the file
    """

    # Read the file, starting from where we left off earlier
    df = pandas.read_csv("./test.csv", skiprows=n_rows_read)

    # TODO: Implement your logic here, and push to your SQL database
    # df.to_sql(...)

    return df.shape[0] + n_rows_read


if __name__ == "__main__":

    # In the begining we haven't read any lines yet
    n_rows_read = 0

    # Start an infinite loop
    while True:

        # Get updates, and keep track of how many rows of the file are processed
        n_rows_read = check_updates(n_rows_read)

        # Wait for a bit
        time.sleep(10)

注意:您需要根据自己的目的修改此代码