使用数据流摄取 csv 文件列表的最佳方法
Best way to ingest a list of csv files with dataflow
我正在寻找一种方法来读取 csv 文件列表并将每一行转换为 json 格式。假设我无法事先获得 header 个名称,我必须确保每个工作人员都可以从一个 csv 文件的开头读取,否则我们不知道 header 个名称。
我的计划是使用FileIO.readMatches获取ReadableFile作为元素,对于每个元素,将第一行读取为header并将header与其他行组合成json 格式。我的问题是:
- 假设 ReadableFile 将始终包含整个文件而不是部分文件是否安全?
- 此方法是否需要工作内存大于文件大小?
- 还有其他更好的方法吗?
谢谢!
是的,ReadableFile 总是会给你一个完整的文件。
没有。当您逐行浏览文件时,首先读取一行以确定列,然后读取每一行以输出行 - 这应该可行!
这对我来说似乎是正确的方法,除非你有几个非常大的文件(GB、TB)。如果你有至少十几个或几十个文件,你应该没问题。
额外提示 - 在 CSV 解析器和下一个转换之间插入 apply(Reshuffle.viaRandomKey())
可能会很方便。这将允许您将每个文件的输出洗牌到下游的多个工作人员中 - 它会给您更多的下游并行性。
祝你好运!欢迎在评论中提出后续问题。
我正在寻找一种方法来读取 csv 文件列表并将每一行转换为 json 格式。假设我无法事先获得 header 个名称,我必须确保每个工作人员都可以从一个 csv 文件的开头读取,否则我们不知道 header 个名称。
我的计划是使用FileIO.readMatches获取ReadableFile作为元素,对于每个元素,将第一行读取为header并将header与其他行组合成json 格式。我的问题是:
- 假设 ReadableFile 将始终包含整个文件而不是部分文件是否安全?
- 此方法是否需要工作内存大于文件大小?
- 还有其他更好的方法吗?
谢谢!
是的,ReadableFile 总是会给你一个完整的文件。
没有。当您逐行浏览文件时,首先读取一行以确定列,然后读取每一行以输出行 - 这应该可行!
这对我来说似乎是正确的方法,除非你有几个非常大的文件(GB、TB)。如果你有至少十几个或几十个文件,你应该没问题。
额外提示 - 在 CSV 解析器和下一个转换之间插入 apply(Reshuffle.viaRandomKey())
可能会很方便。这将允许您将每个文件的输出洗牌到下游的多个工作人员中 - 它会给您更多的下游并行性。
祝你好运!欢迎在评论中提出后续问题。