我如何将 .csv 文件转换为 .arrow 文件而不将其全部加载到内存中?
How would I go about converting a .csv to an .arrow file without loading it all into memory?
我在这里发现了一个类似的问题:Read CSV with PyArrow
在此答案中,它引用了 sys.stdin.buffer 和 sys.stdout.buffer,但我不确定如何使用它来编写 .arrow 文件或为其命名。
我似乎无法在 pyarrow 的文档中找到我正在寻找的确切信息。我的文件不会有任何 nans,但会有一个带时间戳的索引。该文件约为 100 GB,因此根本无法将其加载到内存中。我尝试更改代码,但正如我所假设的那样,代码最终会在每次循环时覆盖之前的文件。
***这是我的第一个 post。我要感谢所有贡献者,他们在我提出问题之前回答了我 99.9% 的其他问题。
import sys
import pandas as pd
import pyarrow as pa
SPLIT_ROWS = 1 ### used one line chunks for a small test
def main():
writer = None
for split in pd.read_csv(sys.stdin.buffer, chunksize=SPLIT_ROWS):
table = pa.Table.from_pandas(split)
# Write out to file
with pa.OSFile('test.arrow', 'wb') as sink: ### no append mode yet
with pa.RecordBatchFileWriter(sink, table.schema) as writer:
writer.write_table(table)
writer.close()
if __name__ == "__main__":
main()
下面是我在命令行中使用的代码
>cat data.csv | python test.py
正如@Pace 所建议的,您应该考虑将输出文件的创建移到读取循环之外。像这样:
import sys
import pandas as pd
import pyarrow as pa
SPLIT_ROWS = 1 ### used one line chunks for a small test
def main():
# Write out to file
with pa.OSFile('test.arrow', 'wb') as sink: ### no append mode yet
with pa.RecordBatchFileWriter(sink, table.schema) as writer:
for split in pd.read_csv('data.csv', chunksize=SPLIT_ROWS):
table = pa.Table.from_pandas(split)
writer.write_table(table)
if __name__ == "__main__":
main()
如果您希望指定特定的输入和输出文件,也不必使用 sys.stdin.buffer
。然后你可以 运行 脚本为:
python test.py
通过使用with
语句,writer
和sink
之后都会自动关闭(在本例中为main()
returns)。这意味着不需要包含显式 close()
调用。
改编自@Martin-Evans 代码的解决方案:
按照@Pace的建议在for循环后关闭文件
import sys
import pandas as pd
import pyarrow as pa
SPLIT_ROWS = 1000000
def main():
schema = pa.Table.from_pandas(pd.read_csv('Data.csv',nrows=2)).schema
### reads first two lines to define schema
with pa.OSFile('test.arrow', 'wb') as sink:
with pa.RecordBatchFileWriter(sink, schema) as writer:
for split in pd.read_csv('Data.csv',chunksize=SPLIT_ROWS):
table = pa.Table.from_pandas(split)
writer.write_table(table)
writer.close()
if __name__ == "__main__":
main()
我在这里发现了一个类似的问题:Read CSV with PyArrow
在此答案中,它引用了 sys.stdin.buffer 和 sys.stdout.buffer,但我不确定如何使用它来编写 .arrow 文件或为其命名。 我似乎无法在 pyarrow 的文档中找到我正在寻找的确切信息。我的文件不会有任何 nans,但会有一个带时间戳的索引。该文件约为 100 GB,因此根本无法将其加载到内存中。我尝试更改代码,但正如我所假设的那样,代码最终会在每次循环时覆盖之前的文件。
***这是我的第一个 post。我要感谢所有贡献者,他们在我提出问题之前回答了我 99.9% 的其他问题。
import sys
import pandas as pd
import pyarrow as pa
SPLIT_ROWS = 1 ### used one line chunks for a small test
def main():
writer = None
for split in pd.read_csv(sys.stdin.buffer, chunksize=SPLIT_ROWS):
table = pa.Table.from_pandas(split)
# Write out to file
with pa.OSFile('test.arrow', 'wb') as sink: ### no append mode yet
with pa.RecordBatchFileWriter(sink, table.schema) as writer:
writer.write_table(table)
writer.close()
if __name__ == "__main__":
main()
下面是我在命令行中使用的代码
>cat data.csv | python test.py
正如@Pace 所建议的,您应该考虑将输出文件的创建移到读取循环之外。像这样:
import sys
import pandas as pd
import pyarrow as pa
SPLIT_ROWS = 1 ### used one line chunks for a small test
def main():
# Write out to file
with pa.OSFile('test.arrow', 'wb') as sink: ### no append mode yet
with pa.RecordBatchFileWriter(sink, table.schema) as writer:
for split in pd.read_csv('data.csv', chunksize=SPLIT_ROWS):
table = pa.Table.from_pandas(split)
writer.write_table(table)
if __name__ == "__main__":
main()
如果您希望指定特定的输入和输出文件,也不必使用 sys.stdin.buffer
。然后你可以 运行 脚本为:
python test.py
通过使用with
语句,writer
和sink
之后都会自动关闭(在本例中为main()
returns)。这意味着不需要包含显式 close()
调用。
改编自@Martin-Evans 代码的解决方案:
按照@Pace的建议在for循环后关闭文件
import sys
import pandas as pd
import pyarrow as pa
SPLIT_ROWS = 1000000
def main():
schema = pa.Table.from_pandas(pd.read_csv('Data.csv',nrows=2)).schema
### reads first two lines to define schema
with pa.OSFile('test.arrow', 'wb') as sink:
with pa.RecordBatchFileWriter(sink, schema) as writer:
for split in pd.read_csv('Data.csv',chunksize=SPLIT_ROWS):
table = pa.Table.from_pandas(split)
writer.write_table(table)
writer.close()
if __name__ == "__main__":
main()