Rust 反序列化 JSON
Rust Deserializing JSON
我在反序列化从我的客户端发送的 json 数据时遇到问题。
server.rs
use std::collections::HashMap;
use std::sync::{Arc,Mutex};
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncWriteExt, AsyncReadExt};
use serde_json::{ Value};
/*
The type Arc<T> provides shared ownership of a value of type T, allocated in the heap. Invoking clone on Arc produces a new Arc instance, which points to the same allocation on the heap as the source Arc, while increasing a reference count. When the last Arc pointer to a given allocation is destroyed, the value stored in that allocation (often referred to as “inner value”) is also dropped.
*/
// creating a type alias for user to socket map
// Arc points top
type UserToSocket = Arc<Mutex<HashMap<String,TcpStream>>>;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:9090").await;
// creating a threadsafe hashmap mutex
let local_db: UserToSocket = Arc::new(Mutex::new(HashMap::new()));
let listener = match listener{
Result::Ok(value) => {value},
Result::Err(_)=> {panic!("ERROR OCCURED")},
};
println!("[+] Listener has been started");
loop {
// now waiting for connection
println!("[+] Listening for connection");
let (socket,addr) = listener.accept().await.unwrap();
println!("[+] A connection accepted from {:?}, spawwning a new task for it",addr);
// cloning does not actually clone, but rather just increases counter to it
let ld = Arc::clone(&local_db);
// spawning a new task
tokio::spawn(
async move {
handler(socket,ld).await;
}
);
}
}
// a handler for new connection
async fn handler(mut socket: TcpStream, _db: UserToSocket) {
socket.write_all(b"[+] Hello Friend, Welcome to my program\r\n").await.unwrap();
let mut buf = vec![0; 1024];
loop {
// n holds the number of bytes read i think
match socket.read(&mut buf).await {
Ok(0) => {
println!("Client Closed connection");
return;
}
// getting some data
Ok(_n) => {
// ownership is transferred so need to clone it
let bufc = buf.clone();
// unmarshalling json
//let parsed:Value = serde_json::from_slice(&bufc).unwrap();
// obtaining string
match String::from_utf8(bufc) {
Ok(val) => {
println!("[+] So the parsed value is {}",val);
//let temp = val.as_str();
let parsed:Value = serde_json::from_str(&val).unwrap();
println!("{:?}",parsed);
socket.write_all(b"So yeah thanks for sending this\r\n").await.unwrap();
continue;
}
Err(err) => {
println!("ERROR Could not convert to string {:?}",err);
continue;
}
};
//socket.write_all(b"Vnekai bujena\r\n").await.unwrap();
}
Err(_) => {
println!("Unhandeled error occured");
return;
}
}
}
}
client.rs
use tokio::net::{TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::{thread,time};
#[tokio::main]
async fn main() {
let sleep_time = time::Duration::from_secs(2);
let socket = TcpStream::connect("127.0.0.1:9090").await;
let mut socket = match socket {
Ok(v) => {
println!("[+] Successfully connected");
v
}
Err(_) => {
println!("ERROR could not connect to the server");
std::process::exit(-1);
}
};
let mut buf = vec![0;1024];
//let mut user_input = String::new();
loop {
thread::sleep(sleep_time);
match socket.read(&mut buf).await {
Ok(0) => {
println!("[+] Connection with server has been closed");
std::process::exit(1);
}
Ok(_n) => {
let bc = buf.clone();
let res = String::from_utf8(bc).unwrap();
println!("[+] Server responded with {}",res);
}
Err(_) => {
panic!("[-] Some fatal error occured");
}
}
println
!("You want to say: ");
/*let _v = match io::stdin().read_line(&mut user_input){
Ok(val) => {val}
Err(_) => panic!("ERROR"),
};*/
let val = "{\"name\": \"John Doe\",\"age\": 43,\"phones\": [\"+44 1234567\",\"+44 2345678\"]}\r\n";
socket.write(val.as_bytes()).await.unwrap();
}
}
当我向服务器发送 json 数据时,我收到一个错误。
线程 'tokio-runtime-worker' 在 'called Result::unwrap()
on an Err
value: Error("trailing characters", line: 2, column: 1)'、src\bin\simple_server 处恐慌。rs:79:71
当我尝试直接对 json 字符串进行除菌时,不会出现此错误。只有当我通过网络发送数据时才会出现。
因为你的 JSON 是以换行符结尾的,你应该使用类似 read_line()
的东西来阅读它。 (而且你永远不应该发送格式化的 JSON,因为它会包含换行符 - 但 serde_json 默认创建非格式化的 JSON。)
例如,此编译并应按预期工作
use serde_json::Value;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufStream};
use tokio::net::{TcpListener, TcpStream};
type UserToSocket = Arc<Mutex<HashMap<String, TcpStream>>>;
// ... main unchanged from your implementation ...
async fn handler(socket: TcpStream, _db: UserToSocket) {
let mut socket = BufStream::new(socket);
socket
.write_all(b"[+] Hello Friend, Welcome to my program\r\n")
.await
.unwrap();
socket.flush().await.unwrap();
let mut line = vec![];
loop {
line.clear();
if let Err(e) = socket.read_until(b'\n', &mut line).await {
println!("Unhandled error occured: {}", e);
return;
}
if line.is_empty() {
println!("Client Closed connection");
return;
}
println!(
"[+] So the received value is {}",
String::from_utf8_lossy(&line)
);
let parsed: Value = serde_json::from_slice(&line).unwrap();
println!("{:?}", parsed);
socket
.write_all(b"So yeah thanks for sending this\r\n")
.await
.unwrap();
socket.flush().await.unwrap();
continue;
}
}
我在反序列化从我的客户端发送的 json 数据时遇到问题。
server.rs
use std::collections::HashMap;
use std::sync::{Arc,Mutex};
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncWriteExt, AsyncReadExt};
use serde_json::{ Value};
/*
The type Arc<T> provides shared ownership of a value of type T, allocated in the heap. Invoking clone on Arc produces a new Arc instance, which points to the same allocation on the heap as the source Arc, while increasing a reference count. When the last Arc pointer to a given allocation is destroyed, the value stored in that allocation (often referred to as “inner value”) is also dropped.
*/
// creating a type alias for user to socket map
// Arc points top
type UserToSocket = Arc<Mutex<HashMap<String,TcpStream>>>;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:9090").await;
// creating a threadsafe hashmap mutex
let local_db: UserToSocket = Arc::new(Mutex::new(HashMap::new()));
let listener = match listener{
Result::Ok(value) => {value},
Result::Err(_)=> {panic!("ERROR OCCURED")},
};
println!("[+] Listener has been started");
loop {
// now waiting for connection
println!("[+] Listening for connection");
let (socket,addr) = listener.accept().await.unwrap();
println!("[+] A connection accepted from {:?}, spawwning a new task for it",addr);
// cloning does not actually clone, but rather just increases counter to it
let ld = Arc::clone(&local_db);
// spawning a new task
tokio::spawn(
async move {
handler(socket,ld).await;
}
);
}
}
// a handler for new connection
async fn handler(mut socket: TcpStream, _db: UserToSocket) {
socket.write_all(b"[+] Hello Friend, Welcome to my program\r\n").await.unwrap();
let mut buf = vec![0; 1024];
loop {
// n holds the number of bytes read i think
match socket.read(&mut buf).await {
Ok(0) => {
println!("Client Closed connection");
return;
}
// getting some data
Ok(_n) => {
// ownership is transferred so need to clone it
let bufc = buf.clone();
// unmarshalling json
//let parsed:Value = serde_json::from_slice(&bufc).unwrap();
// obtaining string
match String::from_utf8(bufc) {
Ok(val) => {
println!("[+] So the parsed value is {}",val);
//let temp = val.as_str();
let parsed:Value = serde_json::from_str(&val).unwrap();
println!("{:?}",parsed);
socket.write_all(b"So yeah thanks for sending this\r\n").await.unwrap();
continue;
}
Err(err) => {
println!("ERROR Could not convert to string {:?}",err);
continue;
}
};
//socket.write_all(b"Vnekai bujena\r\n").await.unwrap();
}
Err(_) => {
println!("Unhandeled error occured");
return;
}
}
}
}
client.rs
use tokio::net::{TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::{thread,time};
#[tokio::main]
async fn main() {
let sleep_time = time::Duration::from_secs(2);
let socket = TcpStream::connect("127.0.0.1:9090").await;
let mut socket = match socket {
Ok(v) => {
println!("[+] Successfully connected");
v
}
Err(_) => {
println!("ERROR could not connect to the server");
std::process::exit(-1);
}
};
let mut buf = vec![0;1024];
//let mut user_input = String::new();
loop {
thread::sleep(sleep_time);
match socket.read(&mut buf).await {
Ok(0) => {
println!("[+] Connection with server has been closed");
std::process::exit(1);
}
Ok(_n) => {
let bc = buf.clone();
let res = String::from_utf8(bc).unwrap();
println!("[+] Server responded with {}",res);
}
Err(_) => {
panic!("[-] Some fatal error occured");
}
}
println
!("You want to say: ");
/*let _v = match io::stdin().read_line(&mut user_input){
Ok(val) => {val}
Err(_) => panic!("ERROR"),
};*/
let val = "{\"name\": \"John Doe\",\"age\": 43,\"phones\": [\"+44 1234567\",\"+44 2345678\"]}\r\n";
socket.write(val.as_bytes()).await.unwrap();
}
}
当我向服务器发送 json 数据时,我收到一个错误。
线程 'tokio-runtime-worker' 在 'called Result::unwrap()
on an Err
value: Error("trailing characters", line: 2, column: 1)'、src\bin\simple_server 处恐慌。rs:79:71
当我尝试直接对 json 字符串进行除菌时,不会出现此错误。只有当我通过网络发送数据时才会出现。
因为你的 JSON 是以换行符结尾的,你应该使用类似 read_line()
的东西来阅读它。 (而且你永远不应该发送格式化的 JSON,因为它会包含换行符 - 但 serde_json 默认创建非格式化的 JSON。)
例如,此编译并应按预期工作
use serde_json::Value;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufStream};
use tokio::net::{TcpListener, TcpStream};
type UserToSocket = Arc<Mutex<HashMap<String, TcpStream>>>;
// ... main unchanged from your implementation ...
async fn handler(socket: TcpStream, _db: UserToSocket) {
let mut socket = BufStream::new(socket);
socket
.write_all(b"[+] Hello Friend, Welcome to my program\r\n")
.await
.unwrap();
socket.flush().await.unwrap();
let mut line = vec![];
loop {
line.clear();
if let Err(e) = socket.read_until(b'\n', &mut line).await {
println!("Unhandled error occured: {}", e);
return;
}
if line.is_empty() {
println!("Client Closed connection");
return;
}
println!(
"[+] So the received value is {}",
String::from_utf8_lossy(&line)
);
let parsed: Value = serde_json::from_slice(&line).unwrap();
println!("{:?}", parsed);
socket
.write_all(b"So yeah thanks for sending this\r\n")
.await
.unwrap();
socket.flush().await.unwrap();
continue;
}
}