Building Robust Servers with Async Rust and Tokio
Applying some thoughts and things I learned about writing servers to Rust applications
Building multi-threaded servers
If you go to what feels like any network server building tutorial using Tokio you will probably see #[tokio(main)]
right there at the top. This is nice for getting started, but for building servers you should avoid this. Instead we should decide what we need threads for and what they need to do in our servers. Thinking about the uses of threads is a core part of designing a server and should be part of the initial work. These are some ideas based on what I learned working with servers over the last few years. This is going to go over coding a simple server. At the end we will have a running service.
What makes #[tokio(main)]
nice is it’s simplicity, you type the tiny macro and now all of your async
code will run with minimal input for you. The major drawback here is that all of your code will run in that single thread pool created. This is where we need to think about everything a server will actually do.
It is easy to over simplify this. We can use Memcached as an example, a fast, stable and multi-threaded piece of software, so it should be a good for this. A simple way of thinking about memcached is that it will just handle requests, check it's internal hashmap for data and then respond. Designing this we can do something simple like Redis and maybe say "there isn't much work being done lets do one thread for everything" (that works clearly, Redis is solid, but that’s avoiding the issue of multithreading for other tradeoffs) or instead we can go one more step and say we can make it multithreaded and have a bunch of threads do all this work. That would be the typical #[tokio(main)]
approach. Memcached has a document that explains what all of these threads actually do.
Designing a server
We can start with some requirements for a server:
Its needs to be high performance. That means consistent and low latency response times
It needs to be reliable and available
We will handle multiple persistent connections
It needs to be "production" ready. It meets some standard of operational requirements and polish
This example server will provide some kind of API that reads from some external data source, does something with the data, then responds.
Now we can list out everything a server will really need to do in detail:
Accept connections
It’s 2022, likely we need to establish a TLS connection
Maintain open connections
Parse the requests
Handle the request
Reach out to data stores
Process data and prepare it for the response
Respond to the client, write the data back
Log
Provide stats and metrics
Handle on the fly admin commands (optional but we'll include this)
Our list of responsibilities is long now, and in each bullet point something can go wrong. Having one big thread pool handle everything means the blast radius is the entire application.
Some Scenarios and Failures
We have 1000 established and busy connections, suddenly one of our API users needs to reconnect, all of the threads get used up on accepting connections, latency goes up for the users and now they are unhappy.
The backend data store this server relies on has some issue and goes slow. Threads are busy trying to get data to respond. Our external stat aggregator that works by polling the application can’t get a response now in time and we lose visibility into our metrics.
Someone needs to use the admin UI, their computationally expensive work causes a slow down, now request latency has gone over the acceptable limit
We can dream up more scenarios but hopefully by now its clear that one thread pool won't work which means simply using #[tokio(main)
won't meet our requirements for reliability and performance. I think there is a middle ground between using threads everywhere and using the nicer abstractions that an async runtime provides.
Before starting to code we need to think about where we can fail, what is acceptable and how we want to utilize threads in our server. There is no right or wrong answer here!
Starting to write code
We can start with three thread pools. (This is similar to the memcached doc in the beginning) One for accepting connections and doing the TLS handshake. One for handling requests, getting the data from the backing store, and responding. The last will be more low priority miscellaneous work (stats aggregation, logging, embedded HTTP server for admin function from the list above).
First we can just get a basic program up and running:
fn start_server() {
println!("start server");
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
// do stuff like parse cli args
start_server();
return Ok(());
}
In start_server
we will get the server going, first with creating all of our thread pools. Tokio calls these runtimes. We will use runtime::Builder to make these instead of the macro. This is partially what #[tokio(main)]
does for you.
// build runtimes
let acceptor_runtime = Builder::new_multi_thread()
.worker_threads(1)
.thread_name("acceptor-pool")
.thread_stack_size(3 * 1024 * 1024)
.enable_time()
.enable_io()
.build()
.unwrap();
let request_runtime = Builder::new_multi_thread()
.worker_threads(2)
.thread_name("request-pool")
.thread_stack_size(3 * 1024 * 1024)
.enable_time()
.enable_io()
.build()
.unwrap();
let utility_runtime = Builder::new_multi_thread()
.worker_threads(1)
.thread_name("utility-pool")
.thread_stack_size(3 * 1024 * 1024)
.enable_time()
.enable_io()
.build()
.unwrap();
To work with a Runtime
we can use the handle
function to pass them around easily if it is needed.
Next we need to bind
, listen
and loop for accept
, then we will be passing off the established connections to the request_runtime
.
We also add a new function stream_handler
. This where the established socket will be passed off to. It waits for data on the socket, handles it and responds.
async fn stream_handler(mut t: TcpStream) {
let mut buf = [0; 1024];
loop {
t.readable().await.unwrap();
let n = match t.read(&mut buf).await {
// socket closed
Ok(n) if n == 0 => return,
Ok(n) => n,
Err(e) => {
eprintln!("failed to read from socket; err = {:?}", e);
return;
}
};
let answer = from_utf8(&buf[0..n]).expect("some utf8 issue");
t.writable().await.unwrap();
t.try_write(format!("Back to you! {}", answer).as_bytes());
}
}
fn start_server() {
//... below runtime code
acceptor_runtime.block_on(async {
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
loop {
let (socket, _) = listener.accept().await.unwrap();
let _g = request_runtime.enter();
request_runtime.spawn(stream_handler(socket));
}
})
}
Important Note:
One unintuive line here is let _g = request_runtime.enter();
When some tokio types are created, part of the set up is registering to the current runtime. We want the socket created to be associated with the runtime that will be handling it, so not the acceptor runtime. We need to use enter
to set the correct runtime to use. I think the code is here where it uses the current runtime
The server at this point can be run and will respond to requests. We can see all of our thread pools if we use a tool like pidstat
.
$ pidstat -t -p $(ps aux | grep debug/server | head -1| awk '{print $2}')
Linux 5.4.0-131-generic (matthew-XPS-13-9365) 11/17/2022 _x86_64_ (4 CPU)
11:07:19 AM UID TGID TID %usr %system %guest %wait %CPU CPU Command
11:07:20 AM 1000 572076 - 0.00 0.00 0.00 0.00 0.00 0 server
11:07:20 AM 1000 - 572076 0.00 0.00 0.00 0.00 0.00 0 |__server
11:07:20 AM 1000 - 572077 0.00 0.00 0.00 0.00 0.00 0 |__acceptor-pool
11:07:20 AM 1000 - 572078 0.00 0.00 0.00 0.00 0.00 2 |__request-pool
11:07:20 AM 1000 - 572079 0.00 0.00 0.00 0.00 0.00 2 |__request-pool
11:07:20 AM 1000 - 572080 0.00 0.00 0.00 0.00 0.00 2 |__utility-pool
Then if you send some traffic using a persistent connection1, you can see only two threads are utilized.
Average: UID TGID TID %usr %system %guest %wait %CPU CPU Command
Average: 1000 581044 - 86.35 36.85 0.00 0.00 123.20 - server
Average: 1000 - 581044 0.00 0.00 0.00 0.00 0.00 - |__server
Average: 1000 - 581045 0.00 0.00 0.00 0.00 0.00 - |__acceptor-pool
Average: 1000 - 581046 43.17 18.20 0.00 1.55 61.38 - |__request-pool
Average: 1000 - 581047 43.06 18.65 0.00 1.55 61.71 - |__request-pool
Average: 1000 - 581048 0.00 0.00 0.00 0.00 0.00 - |__utility-pool
We are going to embed our admin http server, using Rocket. This is where things kind of get hard and it’s becasue every thing seems to assume you are using a simple set up.
This is the server set up. We make one route /admin
. We also take a runtime handle to spawn the server into. If you dig through the docs a bit you can find that a rocket server started with launch
will return a future that can be spawned on to a runtime.
#[get("/admin")]
fn http_admin() -> &'static str {
"<html><h1>Hello from admin!</h1></html>"
}
fn start_http(rt: &Handle) {
let figment = rocket::Config::figment()
.merge(("port", 9999))
.merge(("shutdown.ctrlc", false));
let rocket = rocket::custom(figment).mount("/", routes![http_admin]);
let http_server = rocket.launch();
rt.spawn(http_server);
}
Then in the start_server
function add the following ling before the `block_on` call.
start_http(utility_runtime.handle());
We have a seperate http server now running along side the main server, with its cpu resources isolated from the rest
$ curl localhost:9999/admin
<html><h1>Hello from admin!</h1></html>
We can make another script that makes HTTP requests in a loop and see only one thread increases in cpu use. Everything else is idle.
Average: UID TGID TID %usr %system %guest %wait %CPU CPU Command
Average: 1000 581950 - 17.50 2.80 0.00 0.00 20.30 - server
Average: 1000 - 581950 0.00 0.00 0.00 0.00 0.00 - |__server
Average: 1000 - 581951 0.00 0.00 0.00 0.00 0.00 - |__acceptor-pool
Average: 1000 - 581952 0.00 0.00 0.00 0.00 0.00 - |__request-pool
Average: 1000 - 581953 0.00 0.00 0.00 0.00 0.00 - |__request-pool
Average: 1000 - 581954 17.50 2.80 0.00 0.20 20.30 - |__utility-pool
I hope this was interesting and informative. I would love to hear feedback too! My email is matthew.tejo@gmail.com