WebSocket experimental¶
Servers¶
There are two ways to implement a WebSocket server with CAF. The first option is
to create a standalone server that only accepts incoming WebSocket clients. The
second option is to start a regular HTTP server and then use the
caf::net::web_socket::switch_protocol
factory to create a route for
WebSocket clients.
In both cases, the server runs in the background and communicates asynchronously with the application logic over asynchronous buffer resource. Usually, this resource is used to create an observable on some user-defined actor that implements the server logic. The server passes connection events over this buffer, whereas each event then consists of two new buffer resources: one for receiving and one for sending messages from/to the WebSocket client.
Note
Closing either of the asynchronous resources will close the WebSocket
connection. When only reading from a WebSocket connection, we can subscribe
the output channel to a never
observable. Likewise, we can pass
std::ignore
to the input observable for applications that are only
interested in writing to the WebSocket connection.
Standalone Server¶
Note
For this API, include caf/net/web_socket/with.hpp
.
Starting a WebSocket server has three distinct steps.
In the first step, we bind the server to an actor system and optionally
configure SSL. The entry point is always calling the free function
caf::net::web_socket::with
that takes an actor_system
as argument.
The WebSocket API in CAF produces flows that operator on and receives
caf::net::web_socket::frame
objects (see Frames).
On the result factory object, we can optionally call context
to set an SSL
context. Once we call accept
, we enter the second phase of the setup.
The accept
function has multiple overloads:
accept(uint16_t port, std::string bind_address = "")
for opening a newport
. Thebind_address
optionally restricts which IP addresses may connect to the server. Passing an empty string (default) allows any client.accept(tcp_accept_socket fd)
for running the server on already configured TCP server socket.accept(ssl::tcp_acceptor acc)
for running the server on already configured TCP server socket with an SSL context. Using this overload after configuring an SSL context is an error, since CAF cannot use two SSL contexts on one server.
After calling accept
, we enter the second step of configuring the server.
Here, we can (optionally) fine-tune the server with these member functions:
do_on_error(F callback)
installs a callback function that CAF calls if an error occurs while starting the server.max_connections(size_t value)
configures how many clients the server may allow to connect before blocking further connections attempts.reuse_address(bool value)
configures whether we create the server socket withSO_REUSEADDR
. Has no effect if we have passed a socket or SSL acceptor toaccept
.
The second step is completed when calling on_request
. This function require
one argument: a function object that takes a
net::web_socket::acceptor<Ts...>&
argument, whereas the template parameter
pack Ts...
is freely chosen and may be empty. The types we select here allow
us to pass any number of arguments to the connection event later on.
The third and final step is to call start
with a function object that takes
an asynchronous resource for processing connect events. The type is usually
obtained from the configured trait class via
Trait::acceptor_resource<Ts...>
, whereas Ts...
must be the same set of
types we have used previously for the on_request
handler.
After calling start
, CAF returns an expected<disposble>
. On success, the disposable
is a handle to the launched server and calling dispose
on it stops the server. On error, the result contains a human-readable description of what went wrong.
This example illustrates how the API looks when putting all the pieces together:
namespace {
std::atomic<bool> shutdown_flag;
void set_shutdown_flag(int) {
shutdown_flag = true;
}
} // namespace
int caf_main(caf::actor_system& sys, const config& cfg) {
namespace http = caf::net::http;
namespace ssl = caf::net::ssl;
namespace ws = caf::net::web_socket;
// Do a regular shutdown for CTRL+C and SIGTERM.
signal(SIGTERM, set_shutdown_flag);
signal(SIGINT, set_shutdown_flag);
// Read the configuration.
auto port = caf::get_or(cfg, "port", default_port);
auto pem = ssl::format::pem;
auto key_file = caf::get_as<std::string>(cfg, "tls.key-file");
auto cert_file = caf::get_as<std::string>(cfg, "tls.cert-file");
auto max_connections = caf::get_or(cfg, "max-connections",
default_max_connections);
if (!key_file != !cert_file) {
sys.println("*** inconsistent TLS config: declare neither file or both");
return EXIT_FAILURE;
}
// Open up a TCP port for incoming connections and start the server.
auto server
= ws::with(sys)
// Optionally enable TLS.
.context(ssl::context::enable(key_file && cert_file)
.and_then(ssl::emplace_server(ssl::tls::v1_2))
.and_then(ssl::use_private_key_file(key_file, pem))
.and_then(ssl::use_certificate_file(cert_file, pem)))
// Bind to the user-defined port.
.accept(port)
// Limit how many clients may be connected at any given time.
.max_connections(max_connections)
// Accept only requests for path "/".
.on_request([&sys](ws::acceptor<>& acc) {
// The header parameter contains fields from the WebSocket handshake
// such as the path and HTTP header fields..
auto path = acc.header().path();
sys.println("*** new client request for path {}", path);
// Accept the WebSocket connection only if the path is "/".
if (path == "/") {
// Calling `accept` causes the server to acknowledge the client and
// creates input and output buffers that go to the worker actor.
acc.accept();
} else {
// Calling `reject` aborts the connection with HTTP status code 400
// (Bad Request). The error gets converted to a string and sent to
// the client to give some indication to the client why the request
// was rejected.
auto err = caf::make_error(caf::sec::invalid_argument,
"unrecognized path, try '/'");
acc.reject(std::move(err));
}
// Note: calling nothing on `acc` also rejects the connection.
})
// When started, run our worker actor to handle incoming connections.
.start([&sys](auto events) {
sys.spawn([events](caf::event_based_actor* self) {
// For each buffer pair, we create a new flow ...
self->make_observable()
.from_resource(events) //
.for_each([self](const auto& ev) {
// ... that simply pushes data back to the sender.
auto [pull, push] = ev.data();
pull.observe_on(self)
.do_on_error([self](const caf::error& what) { //
self->println("*** connection closed: {}", what);
})
.do_on_complete([self] { //
self->println("*** connection closed");
})
.do_on_next([self](const ws::frame& x) {
if (x.is_binary()) {
self->println(
"*** received a binary WebSocket frame of size {}",
x.size());
} else {
self->println(
"*** received a text WebSocket frame of size {}",
x.size());
}
})
.subscribe(push);
});
});
});
// Report any error to the user.
if (!server) {
sys.println("*** unable to run at port {}: {}", port, server.error());
return EXIT_FAILURE;
}
// Wait for CTRL+C or SIGTERM.
while (!shutdown_flag)
std::this_thread::sleep_for(250ms);
sys.println("*** shutting down");
server->dispose();
return EXIT_SUCCESS;
}
Note
For details on ssl::context::enable
, please see SSL.
HTTP Server with a WebSocket Route¶
Note
For this API, include caf/net/web_socket/switch_protocol.hpp
.
Sometimes, we want a server to accept WebSocket connections only on a specific
path and otherwise act as regular HTTP server. This use case is covered by
caf::net::web_socket::switch_protocol
. The function is similar to with
in that it takes a template parameter for the trait, but it requires no
arguments. In this version, we skip the accept
step (since the HTTP server
takes care of accepting connections) and call on_request
as the first step.
Then, we call on_start
(instead of start
) to add the WebSocket server
logic when starting up the parent server.
The following snippet showcases the setup for adding a WebSocket route to an
HTTP server (you can find the full example under
examples/web_socket/quote-server.cpp
):
// On "/ws/quotes/<arg>", we switch the protocol to WebSocket.
.route("/ws/quotes/<arg>", http::method::get,
ws::switch_protocol()
// Check that the client asks for a known philosopher.
.on_request(
[](ws::acceptor<caf::cow_string>& acc, std::string name) {
auto quotes = quotes_by_name(name);
if (quotes.empty()) {
auto err = make_error(caf::sec::invalid_argument,
not_found_str(name));
acc.reject(std::move(err));
} else {
// Forward the name to the WebSocket worker.
acc.accept(caf::cow_string{std::move(name)});
}
})
// Spawn a worker for the WebSocket clients.
.on_start([&sys](auto events) {
// Spawn a worker that reads from `events`.
sys.spawn([events](caf::event_based_actor* self) {
// Each WS connection has a pull/push buffer pair.
self->make_observable()
.from_resource(events) //
.for_each([self](const auto& ev) mutable {
// Forward the quotes to the client.
auto [pull, push, name] = ev.data();
auto quotes = quotes_by_name(name);
self->make_observable()
.from_container(quotes)
.map([](std::string_view quote) {
return ws::frame{quote};
})
.subscribe(push);
// We ignore whatever the client may send to us.
pull.observe_on(self).subscribe(std::ignore);
});
});
}))
Clients¶
Clients use the same entry point as standalone servers, i.e.,
caf::net::web_socket::with
. However, instead of calling accept
, we call
connect
. Like accept
, CAF offers multiple overloads for this function:
connect(std::string host, uint16_t port)
for connecting to the server athost
on the givenport
.connect(uri endpoint)
for connecting to the given server. The URI scheme must be eitherws
orwss
.connect(expected<uri> endpoint)
like the previous overload. Forwards the error in caseendpoint
does not represent an actual value. Useful for passing the result ofmake_uri
toconnect
directly without having to check the result first.connect(stream_socket fd)
for establishing a WebSocket connection on an already connected TCP socket.connect(ssl::tcp_acceptor acc)
for establishing a WebSocket connection on an already established SSL connection.
Note
When configuring an SSL context prior to calling connect
, the URI scheme
must be wss
. When not configuring an SSL prior to calling connect
and passing an URI with scheme wss
, CAF will automatically create a SSL
context for the connection.
After calling connect, we can configure various parameters:
do_on_error(F callback)
installs a callback function that CAF calls if an error occurs while starting the server.retry_delay(timespan)
to set the delay between connection retries when a connection attempt fails.connection_timeout(timespan)
to set the maximum amount of time to wait for a connection attempt to succeed before giving up.max_retry_count(size_t)
to set the maximum number of times to retry a connection attempt before giving up.
Finally, we call start
to launch the client. The function takes a function
object that must take two parameters: the input and output resources for reading
from and writing to the WebSocket connection. The types may be obtained from the
trait class (caf::net::web_socket::default_trait
unless passing a different
type to with
) via input_resource
and output_resource
. However, the
function object (lambda) may also take the two parameters as auto
for
brevity, as shown in the example below.
int caf_main(caf::actor_system& sys, const config& cfg) {
namespace ws = caf::net::web_socket;
// Sanity checking.
auto server = caf::get_as<caf::uri>(cfg, "server");
if (!server) {
sys.println("*** mandatory argument 'server' missing or invalid");
return EXIT_FAILURE;
}
// Ask the user for the hello message.
std::string hello;
std::cout << "Please enter a hello message for the server: " << std::flush;
std::getline(std::cin, hello);
// Try to establish a connection to the server and send the hello message.
auto conn
= ws::with(sys)
// Connect to the given URI.
.connect(server)
// If we don't succeed at first, try up to 10 times with 1s delay.
.retry_delay(1s)
.max_retry_count(9)
// On success, spin up a worker to manage the connection.
.start([&sys, hello](auto pull, auto push) {
sys.spawn([hello, pull, push](caf::event_based_actor* self) {
// Open the pull handle.
pull
.observe_on(self)
// Print errors to stderr.
.do_on_error([self](const caf::error& what) {
self->println("*** error while reading from the WebSocket: {}",
what);
})
// Restrict how many messages we receive if the user configured
// a limit.
.compose([self](auto in) {
if (auto limit = caf::get_as<size_t>(self->config(), "max")) {
return std::move(in).take(*limit).as_observable();
} else {
return std::move(in).as_observable();
}
})
// Print a bye-bye message if the server closes the connection.
.do_on_complete([self] { //
self->println("Server has closed the connection");
})
// Print everything from the server to stdout.
.for_each([self](const ws::frame& msg) {
if (msg.is_text()) {
self->println("Server: {}", msg.as_text());
} else if (msg.is_binary()) {
self->println("Server: [binary message of size {}]",
msg.as_binary().size());
}
});
// Send our hello message and wait until the server closes the
// socket. We keep the connection alive by never closing the write
// channel.
self->make_observable()
.just(ws::frame{hello})
.concat(self->make_observable().never<ws::frame>())
.subscribe(push);
});
});
if (!conn) {
sys.println("*** unable to connect to {}: {}", server->str(), conn.error());
return EXIT_FAILURE;
}
// Note: the actor system will keep the application running for as long as the
// workers are still alive.
return EXIT_SUCCESS;
}
Frames¶
The WebSocket protocol operates on so-called frames. A frame contains a single
text or binary message. The class caf::net::web_socket::frame
is an
implicitly-shared handle type that represents a single WebSocket frame (binary
or text).
The most commonly used member functions are as follows:
size_t size() const noexcept
returns the size of the frame in bytes.bool empty() const noexcept
queries whether the frame has a size of 0.bool is_binary() const noexcept
queries whether the frame contains raw Bytes.bool is_text() const noexcept
queries whether the frame contains a text message.const_byte_span as_binary() const noexcept
accesses the bytes of a binary message.std::string_view as_text() const noexcept
accesses the characters of a text message.
For the full class interface, please refer to the Doxygen documentation.