Learning Rust Crates
libraries to investigate
rayon
- install the crate, then
use rayon::prelude::*
- instantly speed up iterators with rayon by using par_iter() instead of iter()
serde
- use
#[derive(Serialize, Deserialize)]
enable structs to be enriched by serde
- use
serde_json:to_string
to serialize struct into json
tokio
tokio basics
- tokio is a toolkit for async runtimes and networking applications
- use spawn thread around socket process to create multithreading responses
use tokio::net::*;
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("localhost:6379").await.unwrap();
println!("mini_redis listening on port {:?}", listener.local_addr());
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
process(socket).await;
});
}
}
async fn process(socket: TcpStream) {
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("{:?}", frame);
let response = Frame::Error("unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}
- when passing process functions to tokio’s spawn, under the hood “tasks” are created.
- tasks are very lightweight (64bytes of mem)
- tasks will be scheduled on the same or other thread
- feel free to spawn thousands / millions of tasks
- tasks should not reference memory with a shorter lifetime than
static
- use move in the async block to make passed values get moved
tokio shared state with sync mutex
- if you want to use data amongst threads, use synchronisation such as Arc
- using a synchronous mutex from within asynchronous code is fine as long as “contention” remains low
- contention in this case means different threads and tasks fighting to access the same (locked) value
- mutex guards should die before calling new asynchronous await functions. We can achieve this by creating blocks for the mutex guard:
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
{
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
} // lock goes out of scope here
do_something_async().await;
}
- the safest way to do something similar is to create a struct and implement a non async function that mutates the value:
tokio channels (message passing)
- if we want to share sync resources amongst tokio tasks, we use a channel to manage exclusive access
- its like a mini queueing system
- variants of the channels:
- mpsc: multi-producer, single-consumer
- oneshot: single-producer, single-consumer
- broadcast: multi-producer, multi-consumer (each receiver sees every value)
- watch: multi-producer, multi-consumer (no history is kept, receivers only see most recent)
- create a channel that can hold 32 messages with
let (tx, mut rx) = mpsc::channel(32)
tokio stream piping
- io::copy can be used to copy input from one stream into the other stream
- the tcplistener implements both reader and writer, but we can’t borrow the reference to the listener twice.
- We can use socket.split to separate the reader and writer and use io::copy to copy the stream to the output
- we can also copy by hand, using
socket.read()
and socket.write_all
- don’t forget to handle EOF
tokio framing
- framing is taking a byte stream and converting it into a stream of frames
- if you are writing an exact data packet, you can use a write frame function on a struct that matches the frame and writes to a stream byte by byte
tokio timeouts
- you can specify timeouts on tokio calls to avoid waiting to long for http requests for instance
log facade and env_logger logging
- use facade log to log
log::info!("opened wordlist file: {:?}", wordlist_file);
- use implementation
env_logger
using env_logger::init();
- use
structured_logging
crate for structured logging, e.g. adding parseable values