
Actix Websockets with Protobuf
Actix is a great library with many helpful modules for building RESTful applications with, for example, websockets. Actix has many good examples, including one for building an Actix based websockets echo server, as well as a client. Here we will create a simple Actix Websocket project using the Prost protocol buffers library for communicating over the websocket.
(What we will do is combine the Actix websocket example with the Prost example.)
Project Setup
The first thing we do is create a new cargo project:
$ cargo new --bin actix-ws-prost
Go into the project and open up the new Cargo.toml
file. Add the following to it:
[[bin]]
name = "server"
path = "src/server.rs"[[bin]]
name = "client"
path = "src/main.rs"[build-dependencies]
prost-build = { version = "0.5" }[dependencies]
bytes = "0.4"
prost = "0.5"actix = "0.8.3"
actix-codec = "0.1.2"
actix-web = "1.0.8"
actix-web-actors = "1.0.2"
actix-files = "0.1.6"
awc = "0.2.7"
env_logger = "0.7"
futures = "0.1"
In this project we will create two binaries: server and client. The server will be a simple websocket echo server which will echo whatever it will receive back to the client. You can copy the server code from the Actix websocket example almost verbatim. From the Actix example websocket project, copy over the examples/websocket/static
folder and place it in your project. Copy over the contents from the example’s examples/websocket/src/main.rs
and place it in a new file called src/server.rs
in your project.
Prost Setup
Now that the server part is complete, we can start building our client. First we set up Prost. You will have noticed in the Cargo.toml
that we use prost-build
and prost
itself. The prost-build
dependency takes care of compiling our proto
files into Rust code. Let’s create a proto
file first containing our probobuf message. Create a file called src/items.proto
with the following contents:
syntax = "proto3";
package snazzy.items;
// A snazzy new shirt!
message Shirt {
enum Size {
SMALL = 0;
MEDIUM = 1;
LARGE = 2;
}
string color = 1;
Size size = 2;
}
Now that we have our proto
file, we will need to tell Cargo to compile it. For this we will need to create a build.rs
file in the root of the project with the following contents:
fn main() {
prost_build::compile_protos(&["src/items.proto"],
&["src/"]).unwrap();
}
This instructs the prost-build
library to compile our src/items.proto
into rust code when we do the build.
Models Module
With Prost set up properly and our proto
file in place, let’s create a models
module containing our Shirt
model. Create src/models.rs
with the following contents:
use prost::Message as Message;
// Include the `items` module, which is generated from items.proto.
pub mod items {
include!(concat!(env!("OUT_DIR"), "/snazzy.items.rs"));
}
Next, let’s implement some functions to create a Shirt, serialize it, and then deserialize it. In the same file, add the following:
use std::io::Cursor;pub fn create_large_shirt(color: String) -> items::Shirt {
let mut shirt = items::Shirt::default();
shirt.color = color;
shirt.set_size(items::shirt::Size::Large);
shirt
}
pub fn serialize_shirt(shirt: &items::Shirt) -> Vec<u8> {
let mut buf = Vec::new();
buf.reserve(shirt.encoded_len());
shirt.encode(&mut buf).unwrap();
buf
}
pub fn deserialize_shirt(buf: &[u8]) ->
Result<items::Shirt, prost::DecodeError> {
items::Shirt::decode(&mut Cursor::new(buf))
}
To test our code out, we can add some unit tests in the same file:
#[cfg(test)]
mod tests {
use crate::models::*;
#[test]
fn create_shirt() {
let shirt = create_large_shirt("white".to_string());
println!("shirt is {:?}", &shirt);
assert_eq!(shirt.color, "white");
}
#[test]
fn serde_shirt() {
let shirt = create_large_shirt("white".to_string());
let serded = deserialize_shirt(&serialize_shirt(&shirt))
.expect("A shirt!");
println!("Serded {:?}", serded);
assert_eq!(serded, shirt);
}
}
With our models
module ready, we can add it to the src/main.rs
file by adding the following to the top of it:
pub mod models;
We can check everything is working by running the unit tests:
$ cargo test
Client Module
With our models
module ready, we can start creating our websocket client to communicate with the echo server. Open up the src/main.rs
file again and edit the top part so that it looks as follows:
pub mod models;
pub mod client;
Now create a file called src/client.rs
. With this we are creating a client
module. Add the following contents to this file:
use std::time::Duration;
use actix::*;
use actix::io::SinkWrite;
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use awc::{
error::WsProtocolError,
ws::{Codec, Frame, Message},
};
use futures::stream::SplitSink;
use std::time::Duration;
use actix::*;
use actix::io::SinkWrite;
use actix_codec::{AsyncRead, AsyncWrite, Framed};
use awc::{
error::WsProtocolError,
ws::{Codec, Frame, Message},
};
use futures::stream::SplitSink;
use bytes::Bytes;
pub struct WsClient<T>(pub SinkWrite<SplitSink<Framed<T, Codec>>>)
where
T: AsyncRead + AsyncWrite;
impl<T: 'static> Actor for WsClient<T>
where
T: AsyncRead + AsyncWrite,
{
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
// start heartbeats
self.hb(ctx)
}
fn stopped(&mut self, _: &mut Context<Self>) {
println!("Disconnected");
// Stop application on disconnect
System::current().stop();
}
}
impl<T: 'static> WsClient<T>
where
T: AsyncRead + AsyncWrite,
{
fn hb(&self, ctx: &mut Context<Self>) {
ctx.run_later(Duration::new(1, 0), |act, ctx| {
act.0.write(Message::Ping(String::new())).unwrap();
act.hb(ctx);
});
}
}
We created our WsClient
struct and implemented an Actix Actor on it with a basic heartbeat. Next we will add the implementation that will receive the shirt colour and send it over to the websocket server:
#[derive(Message)]
pub struct ClientCommand(pub String);
impl<T: 'static> Handler<ClientCommand> for WsClient<T>
where
T: AsyncRead + AsyncWrite,
{
type Result = ();
fn handle(
&mut self,
msg: ClientCommand, _
ctx: &mut Context<Self>) {
let shirt = super::models::create_large_shirt(msg.0);
let bytes = super::models::serialize_shirt(&shirt);
self.0.write(Message::Binary(Bytes::from(bytes))).unwrap();
}
}
We define a message struct that will contain the shirt colour which will be sent to our WsClient
actor. We implement a method that takes this message and sends the new shirt as a binary message to the echo websocket server.
Next we implement a stream handler for receiving messages from the websocket server:
impl<T: 'static> StreamHandler<Frame, WsProtocolError> for WsClient<T>
where
T: AsyncRead + AsyncWrite,
{
fn handle(&mut self, msg: Frame, _ctx: &mut Context<Self>) {
match msg {
Frame::Text(txt) => println!("Server text: {:?}", txt),
Frame::Binary(bin) => {
let bytes = bin.unwrap().to_vec();
let shirt = super::models::deserialize_shirt(
&bytes
);
println!("Server binary: {:?}", shirt);
},
_ => ()
}
}
fn started(&mut self, _ctx: &mut Context<Self>) {
println!("Connected");
}
fn finished(&mut self, ctx: &mut Context<Self>) {
println!("Server disconnected");
ctx.stop()
}
}
If we receive a text message from the websocket server, we just show that on console. If we receive a binary message we assume it is a shirt and try to deserialize it as such.
Lastly we implement a write handler for the websocket sink on our WsClient
:
impl<T: 'static> actix::io::WriteHandler<WsProtocolError> for WsClient<T> where
T: AsyncRead + AsyncWrite
{
}
Main Method
Our modules are now done, we can implement the main method that will launch our websocket client, connect to the echo websocket server, and send a shirt to it. Add the following to the src/main.rs
file:
use actix::*;
use actix::io::SinkWrite;
use awc::Client;
use futures::{
lazy,
stream::Stream,
Future,
};
use std::{io, thread};
fn main() {
println!("hello!");
::std::env::set_var("RUST_LOG", "actix_web=info");
env_logger::init();
let sys = actix::System::new("ws-client");
Arbiter::spawn(lazy(|| {
Client::new()
.ws("ws://localhost:8080/echo/")
.connect()
.map_err(|e| {
println!("Error: {}", e);
})
.map(|(response, framed)| {
println!("{:?}", response);
let (sink, stream) = framed.split();
let addr = client::WsClient::create(|ctx| {
client::WsClient::add_stream(stream, ctx);
client::WsClient(SinkWrite::new(sink, ctx))
});
// start console loop
thread::spawn(move || loop {
let mut cmd = String::new();
if io::stdin().read_line(&mut cmd).is_err() {
println!("error");
return;
}
addr.do_send(
client::ClientCommand(cmd.trim().to_string())
);
});
})
}));
let _ = sys.run();
}
With everything now in place, we can start the server first:
$ cargo run --bin server
The server should now be running. In another console, start the client:
$ cargo run --bin client
The client should now be running as well. Type in a shirt colour to send a shirt, and receive it back from the server, e.g.:
$ cargo run --bin client
Compiling actix-ws-prost v0.1.0 (/Users/jdoe/actix-ws-prost)
Finished dev [unoptimized + debuginfo] target(s) in 7.96s
Running `target/debug/client`
hello!ClientResponse HTTP/1.1 101 Switching Protocols
headers:
"date": "Sun, 27 Oct 2019 04:45:43 GMT"
"upgrade": "websocket"
"connection": "upgrade"
"sec-websocket-accept": "+q3Jhh4sJDwpydraj7rPlIT1XEk="
"transfer-encoding": "chunked"Connected
Red
Server binary: Ok(Shirt { color: "Red", size: Large })
This test project is also on github: actix-ws-prost.