如何使用 serde_json 从 JSON 数组中流式传输元素?
How can I stream elements from inside a JSON array using serde_json?
我有一个 5GB JSON 文件,它是一个具有固定结构的对象数组:
[
{
"first": "John",
"last": "Doe",
"email": "john.doe@yahoo.com"
},
{
"first": "Anne",
"last": "Ortha",
"email": "anne.ortha@hotmail.com"
},
....
]
我知道我可以尝试使用 :
中显示的代码解析此文件
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
struct User {
first: String,
last: String,
email: String,
}
let users: Vec<User> = serde_json::from_str(file)?;
存在多个问题:
- 先作为一个字符串整体读取
- 读取为字符串后,它将其转换为
User
结构的向量(我不想要那样)
我试过 但它在打印任何内容之前读取整个文件并在循环内一次打印整个结构。我在循环中一次期待一个对象:
理想情况下,(已解析的)用户对象的解析和处理应该在两个单独的 threads/tasks/routines 中或通过使用通道同时发生。
此 is not directly possible 自 serde_json 1.0.66 起。
一个 workaround suggested is to implement your own Visitor
使用频道。随着数组反序列化的进行,每个元素都被推入通道。然后通道的接收端可以抓取每个元素并进行处理,释放 space 用于反序列化以推入另一个值。
流式传输来自 JSON 数组的元素是可能的,但需要一些跑腿工作。您必须自己跳过前导 [
和断断续续的 ,
,并检测最后的 ]
。要解析单个数组元素,您需要使用 StreamDeserializer
并从中提取单个项目(这样您就可以删除它并重新获得对 IO reader 的控制)。例如:
use serde::de::DeserializeOwned;
use serde_json::{self, Deserializer};
use std::io::{self, Read};
fn read_skipping_ws(mut reader: impl Read) -> io::Result<u8> {
loop {
let mut byte = 0u8;
reader.read_exact(std::slice::from_mut(&mut byte))?;
if !byte.is_ascii_whitespace() {
return Ok(byte);
}
}
}
fn invalid_data(msg: &str) -> io::Error {
io::Error::new(io::ErrorKind::InvalidData, msg)
}
fn deserialize_single<T: DeserializeOwned, R: Read>(reader: R) -> io::Result<T> {
let next_obj = Deserializer::from_reader(reader).into_iter::<T>().next();
match next_obj {
Some(result) => result.map_err(Into::into),
None => Err(invalid_data("premature EOF")),
}
}
fn yield_next_obj<T: DeserializeOwned, R: Read>(
mut reader: R,
at_start: &mut bool,
) -> io::Result<Option<T>> {
if !*at_start {
*at_start = true;
if read_skipping_ws(&mut reader)? == b'[' {
// read the next char to see if the array is empty
let peek = read_skipping_ws(&mut reader)?;
if peek == b']' {
Ok(None)
} else {
deserialize_single(io::Cursor::new([peek]).chain(reader)).map(Some)
}
} else {
Err(invalid_data("`[` not found"))
}
} else {
match read_skipping_ws(&mut reader)? {
b',' => deserialize_single(reader).map(Some),
b']' => Ok(None),
_ => Err(invalid_data("`,` or `]` not found")),
}
}
}
pub fn iter_json_array<T: DeserializeOwned, R: Read>(
mut reader: R,
) -> impl Iterator<Item = Result<T, io::Error>> {
let mut at_start = false;
std::iter::from_fn(move || yield_next_obj(&mut reader, &mut at_start).transpose())
}
用法示例:
fn main() {
let data = r#"[
{
"first": "John",
"last": "Doe",
"email": "john.doe@yahoo.com"
},
{
"first": "Anne",
"last": "Ortha",
"email": "anne.ortha@hotmail.com"
}
]"#;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
struct User {
first: String,
last: String,
email: String,
}
for user in iter_json_array(io::Cursor::new(&data)) {
let user: User = user.unwrap();
println!("{:?}", user);
}
}
在生产中使用它时,您可以将其作为 File
打开,而不是将其读取为字符串。与往常一样,不要忘记将 File
包裹在 BufReader
.
中
我有一个 5GB JSON 文件,它是一个具有固定结构的对象数组:
[
{
"first": "John",
"last": "Doe",
"email": "john.doe@yahoo.com"
},
{
"first": "Anne",
"last": "Ortha",
"email": "anne.ortha@hotmail.com"
},
....
]
我知道我可以尝试使用
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
struct User {
first: String,
last: String,
email: String,
}
let users: Vec<User> = serde_json::from_str(file)?;
存在多个问题:
- 先作为一个字符串整体读取
- 读取为字符串后,它将其转换为
User
结构的向量(我不想要那样)
我试过
理想情况下,(已解析的)用户对象的解析和处理应该在两个单独的 threads/tasks/routines 中或通过使用通道同时发生。
此 is not directly possible 自 serde_json 1.0.66 起。
一个 workaround suggested is to implement your own Visitor
使用频道。随着数组反序列化的进行,每个元素都被推入通道。然后通道的接收端可以抓取每个元素并进行处理,释放 space 用于反序列化以推入另一个值。
流式传输来自 JSON 数组的元素是可能的,但需要一些跑腿工作。您必须自己跳过前导 [
和断断续续的 ,
,并检测最后的 ]
。要解析单个数组元素,您需要使用 StreamDeserializer
并从中提取单个项目(这样您就可以删除它并重新获得对 IO reader 的控制)。例如:
use serde::de::DeserializeOwned;
use serde_json::{self, Deserializer};
use std::io::{self, Read};
fn read_skipping_ws(mut reader: impl Read) -> io::Result<u8> {
loop {
let mut byte = 0u8;
reader.read_exact(std::slice::from_mut(&mut byte))?;
if !byte.is_ascii_whitespace() {
return Ok(byte);
}
}
}
fn invalid_data(msg: &str) -> io::Error {
io::Error::new(io::ErrorKind::InvalidData, msg)
}
fn deserialize_single<T: DeserializeOwned, R: Read>(reader: R) -> io::Result<T> {
let next_obj = Deserializer::from_reader(reader).into_iter::<T>().next();
match next_obj {
Some(result) => result.map_err(Into::into),
None => Err(invalid_data("premature EOF")),
}
}
fn yield_next_obj<T: DeserializeOwned, R: Read>(
mut reader: R,
at_start: &mut bool,
) -> io::Result<Option<T>> {
if !*at_start {
*at_start = true;
if read_skipping_ws(&mut reader)? == b'[' {
// read the next char to see if the array is empty
let peek = read_skipping_ws(&mut reader)?;
if peek == b']' {
Ok(None)
} else {
deserialize_single(io::Cursor::new([peek]).chain(reader)).map(Some)
}
} else {
Err(invalid_data("`[` not found"))
}
} else {
match read_skipping_ws(&mut reader)? {
b',' => deserialize_single(reader).map(Some),
b']' => Ok(None),
_ => Err(invalid_data("`,` or `]` not found")),
}
}
}
pub fn iter_json_array<T: DeserializeOwned, R: Read>(
mut reader: R,
) -> impl Iterator<Item = Result<T, io::Error>> {
let mut at_start = false;
std::iter::from_fn(move || yield_next_obj(&mut reader, &mut at_start).transpose())
}
用法示例:
fn main() {
let data = r#"[
{
"first": "John",
"last": "Doe",
"email": "john.doe@yahoo.com"
},
{
"first": "Anne",
"last": "Ortha",
"email": "anne.ortha@hotmail.com"
}
]"#;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
struct User {
first: String,
last: String,
email: String,
}
for user in iter_json_array(io::Cursor::new(&data)) {
let user: User = user.unwrap();
println!("{:?}", user);
}
}
在生产中使用它时,您可以将其作为 File
打开,而不是将其读取为字符串。与往常一样,不要忘记将 File
包裹在 BufReader
.