WebSocket experimental

Servers

There are two ways to implement a WebSocket server with CAF. The first option is to create a standalone server that only accepts incoming WebSocket clients. The second option is to start a regular HTTP server and then use the caf::net::web_socket::switch_protocol factory to create a route for WebSocket clients.

In both cases, the server runs in the background and communicates asynchronously with the application logic over asynchronous buffer resource. Usually, this resource is used to create an observable on some user-defined actor that implements the server logic. The server passes connection events over this buffer, whereas each event then consists of two new buffer resources: one for receiving and one for sending messages from/to the WebSocket client.

Note

Closing either of the asynchronous resources will close the WebSocket connection. When only reading from a WebSocket connection, we can subscribe the output channel to a never observable. Likewise, we can pass std::ignore to the input observable for applications that are only interested in writing to the WebSocket connection.

Standalone Server

Note

For this API, include caf/net/web_socket/with.hpp.

Starting a WebSocket server has three distinct steps.

In the first step, we bind the server to an actor system and optionally configure SSL. The entry point is always calling the free function caf::net::web_socket::with that takes an actor_system as argument. The WebSocket API in CAF produces flows that operator on and receives caf::net::web_socket::frame objects (see Frames).

On the result factory object, we can optionally call context to set an SSL context. Once we call accept, we enter the second phase of the setup. The accept function has multiple overloads:

  • accept(uint16_t port, std::string bind_address = "") for opening a new port. The bind_address optionally restricts which IP addresses may connect to the server. Passing an empty string (default) allows any client.

  • accept(tcp_accept_socket fd) for running the server on already configured TCP server socket.

  • accept(ssl::tcp_acceptor acc) for running the server on already configured TCP server socket with an SSL context. Using this overload after configuring an SSL context is an error, since CAF cannot use two SSL contexts on one server.

After calling accept, we enter the second step of configuring the server. Here, we can (optionally) fine-tune the server with these member functions:

  • do_on_error(F callback) installs a callback function that CAF calls if an error occurs while starting the server.

  • max_connections(size_t value) configures how many clients the server may allow to connect before blocking further connections attempts.

  • reuse_address(bool value) configures whether we create the server socket with SO_REUSEADDR. Has no effect if we have passed a socket or SSL acceptor to accept.

The second step is completed when calling on_request. This function require one argument: a function object that takes a net::web_socket::acceptor<Ts...>& argument, whereas the template parameter pack Ts... is freely chosen and may be empty. The types we select here allow us to pass any number of arguments to the connection event later on.

The third and final step is to call start with a function object that takes an asynchronous resource for processing connect events. The type is usually obtained from the configured trait class via Trait::acceptor_resource<Ts...>, whereas Ts... must be the same set of types we have used previously for the on_request handler.

After calling start, CAF returns an expected<disposble>. On success, the disposable is a handle to the launched server and calling dispose on it stops the server. On error, the result contains a human-readable description of what went wrong.

This example illustrates how the API looks when putting all the pieces together:

namespace {

std::atomic<bool> shutdown_flag;

void set_shutdown_flag(int) {
  shutdown_flag = true;
}

} // namespace

int caf_main(caf::actor_system& sys, const config& cfg) {
  namespace http = caf::net::http;
  namespace ssl = caf::net::ssl;
  namespace ws = caf::net::web_socket;
  // Do a regular shutdown for CTRL+C and SIGTERM.
  signal(SIGTERM, set_shutdown_flag);
  signal(SIGINT, set_shutdown_flag);
  // Read the configuration.
  auto port = caf::get_or(cfg, "port", default_port);
  auto pem = ssl::format::pem;
  auto key_file = caf::get_as<std::string>(cfg, "tls.key-file");
  auto cert_file = caf::get_as<std::string>(cfg, "tls.cert-file");
  auto max_connections = caf::get_or(cfg, "max-connections",
                                     default_max_connections);
  if (!key_file != !cert_file) {
    sys.println("*** inconsistent TLS config: declare neither file or both");
    return EXIT_FAILURE;
  }
  // Open up a TCP port for incoming connections and start the server.
  auto server
    = ws::with(sys)
        // Optionally enable TLS.
        .context(ssl::context::enable(key_file && cert_file)
                   .and_then(ssl::emplace_server(ssl::tls::v1_2))
                   .and_then(ssl::use_private_key_file(key_file, pem))
                   .and_then(ssl::use_certificate_file(cert_file, pem)))
        // Bind to the user-defined port.
        .accept(port)
        // Limit how many clients may be connected at any given time.
        .max_connections(max_connections)
        // Accept only requests for path "/".
        .on_request([&sys](ws::acceptor<>& acc) {
          // The header parameter contains fields from the WebSocket handshake
          // such as the path and HTTP header fields..
          auto path = acc.header().path();
          sys.println("*** new client request for path {}", path);
          // Accept the WebSocket connection only if the path is "/".
          if (path == "/") {
            // Calling `accept` causes the server to acknowledge the client and
            // creates input and output buffers that go to the worker actor.
            acc.accept();
          } else {
            // Calling `reject` aborts the connection with HTTP status code 400
            // (Bad Request). The error gets converted to a string and sent to
            // the client to give some indication to the client why the request
            // was rejected.
            auto err = caf::make_error(caf::sec::invalid_argument,
                                       "unrecognized path, try '/'");
            acc.reject(std::move(err));
          }
          // Note: calling nothing on `acc` also rejects the connection.
        })
        // When started, run our worker actor to handle incoming connections.
        .start([&sys](auto events) {
          sys.spawn([events](caf::event_based_actor* self) {
            // For each buffer pair, we create a new flow ...
            self->make_observable()
              .from_resource(events) //
              .for_each([self](const auto& ev) {
                // ... that simply pushes data back to the sender.
                auto [pull, push] = ev.data();
                pull.observe_on(self)
                  .do_on_error([self](const caf::error& what) { //
                    self->println("*** connection closed: {}", what);
                  })
                  .do_on_complete([self] { //
                    self->println("*** connection closed");
                  })
                  .do_on_next([self](const ws::frame& x) {
                    if (x.is_binary()) {
                      self->println(
                        "*** received a binary WebSocket frame of size {}",
                        x.size());
                    } else {
                      self->println(
                        "*** received a text WebSocket frame of size {}",
                        x.size());
                    }
                  })
                  .subscribe(push);
              });
          });
        });
  // Report any error to the user.
  if (!server) {
    sys.println("*** unable to run at port {}: {}", port, server.error());
    return EXIT_FAILURE;
  }
  // Wait for CTRL+C or SIGTERM.
  while (!shutdown_flag)
    std::this_thread::sleep_for(250ms);
  sys.println("*** shutting down");
  server->dispose();
  return EXIT_SUCCESS;
}

Note

For details on ssl::context::enable, please see SSL.

HTTP Server with a WebSocket Route

Note

For this API, include caf/net/web_socket/switch_protocol.hpp.

Sometimes, we want a server to accept WebSocket connections only on a specific path and otherwise act as regular HTTP server. This use case is covered by caf::net::web_socket::switch_protocol. The function is similar to with in that it takes a template parameter for the trait, but it requires no arguments. In this version, we skip the accept step (since the HTTP server takes care of accepting connections) and call on_request as the first step. Then, we call on_start (instead of start) to add the WebSocket server logic when starting up the parent server.

The following snippet showcases the setup for adding a WebSocket route to an HTTP server (you can find the full example under examples/web_socket/quote-server.cpp):

        // On "/ws/quotes/<arg>", we switch the protocol to WebSocket.
        .route("/ws/quotes/<arg>", http::method::get,
               ws::switch_protocol()
                 // Check that the client asks for a known philosopher.
                 .on_request(
                   [](ws::acceptor<caf::cow_string>& acc, std::string name) {
                     auto quotes = quotes_by_name(name);
                     if (quotes.empty()) {
                       auto err = make_error(caf::sec::invalid_argument,
                                             not_found_str(name));
                       acc.reject(std::move(err));
                     } else {
                       // Forward the name to the WebSocket worker.
                       acc.accept(caf::cow_string{std::move(name)});
                     }
                   })
                 // Spawn a worker for the WebSocket clients.
                 .on_start([&sys](auto events) {
                   // Spawn a worker that reads from `events`.
                   sys.spawn([events](caf::event_based_actor* self) {
                     // Each WS connection has a pull/push buffer pair.
                     self->make_observable()
                       .from_resource(events) //
                       .for_each([self](const auto& ev) mutable {
                         // Forward the quotes to the client.
                         auto [pull, push, name] = ev.data();
                         auto quotes = quotes_by_name(name);
                         self->make_observable()
                           .from_container(quotes)
                           .map([](std::string_view quote) {
                             return ws::frame{quote};
                           })
                           .subscribe(push);
                         // We ignore whatever the client may send to us.
                         pull.observe_on(self).subscribe(std::ignore);
                       });
                   });
                 }))

Clients

Clients use the same entry point as standalone servers, i.e., caf::net::web_socket::with. However, instead of calling accept, we call connect. Like accept, CAF offers multiple overloads for this function:

  • connect(std::string host, uint16_t port) for connecting to the server at host on the given port.

  • connect(uri endpoint) for connecting to the given server. The URI scheme must be either ws or wss.

  • connect(expected<uri> endpoint) like the previous overload. Forwards the error in case endpoint does not represent an actual value. Useful for passing the result of make_uri to connect directly without having to check the result first.

  • connect(stream_socket fd) for establishing a WebSocket connection on an already connected TCP socket.

  • connect(ssl::tcp_acceptor acc) for establishing a WebSocket connection on an already established SSL connection.

Note

When configuring an SSL context prior to calling connect, the URI scheme must be wss. When not configuring an SSL prior to calling connect and passing an URI with scheme wss, CAF will automatically create a SSL context for the connection.

After calling connect, we can configure various parameters:

  • do_on_error(F callback) installs a callback function that CAF calls if an error occurs while starting the server.

  • retry_delay(timespan) to set the delay between connection retries when a connection attempt fails.

  • connection_timeout(timespan) to set the maximum amount of time to wait for a connection attempt to succeed before giving up.

  • max_retry_count(size_t) to set the maximum number of times to retry a connection attempt before giving up.

Finally, we call start to launch the client. The function takes a function object that must take two parameters: the input and output resources for reading from and writing to the WebSocket connection. The types may be obtained from the trait class (caf::net::web_socket::default_trait unless passing a different type to with) via input_resource and output_resource. However, the function object (lambda) may also take the two parameters as auto for brevity, as shown in the example below.

int caf_main(caf::actor_system& sys, const config& cfg) {
  namespace ws = caf::net::web_socket;
  // Sanity checking.
  auto server = caf::get_as<caf::uri>(cfg, "server");
  if (!server) {
    sys.println("*** mandatory argument 'server' missing or invalid");
    return EXIT_FAILURE;
  }
  // Ask the user for the hello message.
  std::string hello;
  std::cout << "Please enter a hello message for the server: " << std::flush;
  std::getline(std::cin, hello);
  // Try to establish a connection to the server and send the hello message.
  auto conn
    = ws::with(sys)
        // Connect to the given URI.
        .connect(server)
        // If we don't succeed at first, try up to 10 times with 1s delay.
        .retry_delay(1s)
        .max_retry_count(9)
        // On success, spin up a worker to manage the connection.
        .start([&sys, hello](auto pull, auto push) {
          sys.spawn([hello, pull, push](caf::event_based_actor* self) {
            // Open the pull handle.
            pull
              .observe_on(self)
              // Print errors to stderr.
              .do_on_error([self](const caf::error& what) {
                self->println("*** error while reading from the WebSocket: {}",
                              what);
              })
              // Restrict how many messages we receive if the user configured
              // a limit.
              .compose([self](auto in) {
                if (auto limit = caf::get_as<size_t>(self->config(), "max")) {
                  return std::move(in).take(*limit).as_observable();
                } else {
                  return std::move(in).as_observable();
                }
              })
              // Print a bye-bye message if the server closes the connection.
              .do_on_complete([self] { //
                self->println("Server has closed the connection");
              })
              // Print everything from the server to stdout.
              .for_each([self](const ws::frame& msg) {
                if (msg.is_text()) {
                  self->println("Server: {}", msg.as_text());
                } else if (msg.is_binary()) {
                  self->println("Server: [binary message of size {}]",
                                msg.as_binary().size());
                }
              });
            // Send our hello message and wait until the server closes the
            // socket. We keep the connection alive by never closing the write
            // channel.
            self->make_observable()
              .just(ws::frame{hello})
              .concat(self->make_observable().never<ws::frame>())
              .subscribe(push);
          });
        });
  if (!conn) {
    sys.println("*** unable to connect to {}: {}", server->str(), conn.error());
    return EXIT_FAILURE;
  }
  // Note: the actor system will keep the application running for as long as the
  // workers are still alive.
  return EXIT_SUCCESS;
}

Frames

The WebSocket protocol operates on so-called frames. A frame contains a single text or binary message. The class caf::net::web_socket::frame is an implicitly-shared handle type that represents a single WebSocket frame (binary or text).

The most commonly used member functions are as follows:

  • size_t size() const noexcept returns the size of the frame in bytes.

  • bool empty() const noexcept queries whether the frame has a size of 0.

  • bool is_binary() const noexcept queries whether the frame contains raw Bytes.

  • bool is_text() const noexcept queries whether the frame contains a text message.

  • const_byte_span as_binary() const noexcept accesses the bytes of a binary message.

  • std::string_view as_text() const noexcept accesses the characters of a text message.

For the full class interface, please refer to the Doxygen documentation.