Learning Rust Crates
libraries to investigate
rayon
install the crate, then use rayon::prelude::*
instantly speed up iterators with rayon by using par_iter() instead of iter()
serde
use #[derive(Serialize, Deserialize)]
enable structs to be enriched by serde
use serde_json:to_string
to serialize struct into json
tokio
tokio basics
tokio is a toolkit for async runtimes and networking applications
use spawn thread around socket process to create multithreading responses
use tokio::net::*;
use mini_redis::{Connection, Frame};
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("localhost:6379").await.unwrap();
println!("mini_redis listening on port {:?}", listener.local_addr());
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
process(socket).await;
});
}
}
async fn process(socket: TcpStream) {
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("{:?}", frame);
let response = Frame::Error("unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}
when passing process functions to tokio’s spawn, under the hood “tasks” are created.
tasks are very lightweight (64bytes of mem)
tasks will be scheduled on the same or other thread
feel free to spawn thousands / millions of tasks
tasks should not reference memory with a shorter lifetime than static
use move in the async block to make passed values get moved
tokio shared state with sync mutex
if you want to use data amongst threads, use synchronisation such as Arc
using a synchronous mutex from within asynchronous code is fine as long as “contention” remains low
contention in this case means different threads and tasks fighting to access the same (locked) value
mutex guards should die before calling new asynchronous await functions. We can achieve this by creating blocks for the mutex guard:
async fn increment_and_do_stuff(mutex: &Mutex<i32>) {
{
let mut lock: MutexGuard<i32> = mutex.lock().unwrap();
*lock += 1;
} // lock goes out of scope here
do_something_async().await;
}
the safest way to do something similar is to create a struct and implement a non async function that mutates the value:
tokio channels (message passing)
if we want to share sync resources amongst tokio tasks, we use a channel to manage exclusive access
its like a mini queueing system
variants of the channels:
mpsc: multi-producer, single-consumer
oneshot: single-producer, single-consumer
broadcast: multi-producer, multi-consumer (each receiver sees every value)
watch: multi-producer, multi-consumer (no history is kept, receivers only see most recent)
create a channel that can hold 32 messages with let (tx, mut rx) = mpsc::channel(32)
tokio stream piping
io::copy can be used to copy input from one stream into the other stream
the tcplistener implements both reader and writer, but we can’t borrow the reference to the listener twice.
We can use socket.split to separate the reader and writer and use io::copy to copy the stream to the output
we can also copy by hand, using socket.read()
and socket.write_all
don’t forget to handle EOF
tokio framing
framing is taking a byte stream and converting it into a stream of frames
if you are writing an exact data packet, you can use a write frame function on a struct that matches the frame and writes to a stream byte by byte
tokio timeouts
you can specify timeouts on tokio calls to avoid waiting to long for http requests for instance
log facade and env_logger logging
use facade log to log
log::info!("opened wordlist file: {:?}", wordlist_file);
use implementation env_logger
using env_logger::init();
use structured_logging
crate for structured logging, e.g. adding parseable values