WebSocket experimental

Servers

There are two ways to implement a WebSocket server with CAF. The first option is to create a standalone server that only accepts incoming WebSocket clients. The second option is to start a regular HTTP server and then use the caf::net::web_socket::switch_protocol factory to create a route for WebSocket clients.

In both cases, the server runs in the background and communicates asynchronously with the application logic over asynchronous buffer resource. Usually, this resource is used to create an observable on some user-defined actor that implements the server logic. The server passes connection events over this buffer, whereas each event then consists of two new buffer resources: one for receiving and one for sending messages from/to the WebSocket client.

Note

Closing either of the asynchronous resources will close the WebSocket connection. When only reading from a WebSocket connection, we can subscribe the output channel to a never observable. Likewise, we can pass std::ignore to the input observable for applications that are only interested in writing to the WebSocket connection.

Standalone Server

Note

For this API, include caf/net/web_socket/with.hpp.

Starting a WebSocket server has three distinct steps.

In the first step, we bind the server to an actor system and optionally configure SSL. The entry point is always calling the free function caf::net::web_socket::with that takes an actor_system as argument. Optionally, users may set the Trait template parameter (see Traits) of the function. When not setting this parameter, it defaults to caf::net::web_socket::default_trait. With this policy, the WebSocket sends and receives caf::net::web_socket::frame objects (see Frames).

On the result factory object, we can optionally call context to set an SSL context. Once we call accept, we enter the second phase of the setup. The accept function has multiple overloads:

  • accept(uint16_t port, std::string bind_address = "") for opening a new port. The bind_address optionally restricts which IP addresses may connect to the server. Passing an empty string (default) allows any client.
  • accept(tcp_accept_socket fd) for running the server on already configured TCP server socket.
  • accept(ssl::tcp_acceptor acc) for running the server on already configured TCP server socket with an SSL context. Using this overload after configuring an SSL context is an error, since CAF cannot use two SSL contexts on one server.

After calling accept, we enter the second step of configuring the server. Here, we can (optionally) fine-tune the server with these member functions:

  • do_on_error(F callback) installs a callback function that CAF calls if an error occurs while starting the server.
  • max_connections(size_t value) configures how many clients the server may allow to connect before blocking further connections attempts.
  • reuse_address(bool value) configures whether we create the server socket with SO_REUSEADDR. Has no effect if we have passed a socket or SSL acceptor to accept.

The second step is completed when calling on_request. This function require one argument: a function object that takes a net::web_socket::acceptor<Ts...>& argument, whereas the template parameter pack Ts... is freely chosen and may be empty. The types we select here allow us to pass any number of arguments to the connection event later on.

The third and final step is to call start with a function object that takes an asynchronous resource for processing connect events. The type is usually obtained from the configured trait class via Trait::acceptor_resource<Ts...>, whereas Ts... must be the same set of types we have used previously for the on_request handler.

After calling start, CAF returns an expected<disposble>. On success, the disposable is a handle to the launched server and calling dispose on it stops the server. On error, the result contains a human-readable description of what went wrong.

This example illustrates how the API looks when putting all the pieces together:

int caf_main(caf::actor_system& sys, const config& cfg) {
  namespace http = caf::net::http;
  namespace ssl = caf::net::ssl;
  namespace ws = caf::net::web_socket;
  // Read the configuration.
  auto port = caf::get_or(cfg, "port", default_port);
  auto pem = ssl::format::pem;
  auto key_file = caf::get_as<std::string>(cfg, "tls.key-file");
  auto cert_file = caf::get_as<std::string>(cfg, "tls.cert-file");
  auto max_connections = caf::get_or(cfg, "max-connections",
                                     default_max_connections);
  if (!key_file != !cert_file) {
    std::cerr << "*** inconsistent TLS config: declare neither file or both\n";
    return EXIT_FAILURE;
  }
  // Open up a TCP port for incoming connections and start the server.
  using trait = ws::default_trait;
  auto server
    = ws::with(sys)
        // Optionally enable TLS.
        .context(ssl::context::enable(key_file && cert_file)
                   .and_then(ssl::emplace_server(ssl::tls::v1_2))
                   .and_then(ssl::use_private_key_file(key_file, pem))
                   .and_then(ssl::use_certificate_file(cert_file, pem)))
        // Bind to the user-defined port.
        .accept(port)
        // Limit how many clients may be connected at any given time.
        .max_connections(max_connections)
        // Accept only requests for path "/".
        .on_request([](ws::acceptor<>& acc) {
          // The header parameter contains fields from the WebSocket handshake
          // such as the path and HTTP header fields..
          auto path = acc.header().path();
          std::cout << "*** new client request for path " << path << '\n';
          // Accept the WebSocket connection only if the path is "/".
          if (path == "/") {
            // Calling `accept` causes the server to acknowledge the client and
            // creates input and output buffers that go to the worker actor.
            acc.accept();
          } else {
            // Calling `reject` aborts the connection with HTTP status code 400
            // (Bad Request). The error gets converted to a string and sent to
            // the client to give some indication to the client why the request
            // was rejected.
            auto err = caf::make_error(caf::sec::invalid_argument,
                                       "unrecognized path, try '/'");
            acc.reject(std::move(err));
          }
          // Note: calling nothing on `acc` also rejects the connection.
        })
        // When started, run our worker actor to handle incoming connections.
        .start([&sys](trait::acceptor_resource<> events) {
          sys.spawn([events](caf::event_based_actor* self) {
            // For each buffer pair, we create a new flow ...
            self->make_observable()
              .from_resource(events) //
              .for_each([self](const trait::accept_event<>& ev) {
                // ... that simply pushes data back to the sender.
                auto [pull, push] = ev.data();
                pull.observe_on(self)
                  .do_on_next([](const ws::frame& x) {
                    if (x.is_binary()) {
                      std::cout
                        << "*** received a binary WebSocket frame of size "
                        << x.size() << '\n';
                    } else {
                      std::cout
                        << "*** received a text WebSocket frame of size "
                        << x.size() << '\n';
                    }
                  })
                  .subscribe(push);
              });
          });
        });
  // Report any error to the user.
  if (!server) {
    std::cerr << "*** unable to run at port " << port << ": "
              << to_string(server.error()) << '\n';
    return EXIT_FAILURE;
  }
  // Note: the actor system will keep the application running for as long as the
  // workers are still alive.
  return EXIT_SUCCESS;
}

Note

For details on ssl::context::enable, please see SSL.

HTTP Server with a WebSocket Route

Note

For this API, include caf/net/web_socket/switch_protocol.hpp.

Sometimes, we want a server to accept WebSocket connections only on a specific path and otherwise act as regular HTTP server. This use case is covered by caf::net::web_socket::switch_protocol. The function is similar to with in that it takes a template parameter for the trait, but it requires no arguments. In this version, we skip the accept step (since the HTTP server takes care of accepting connections) and call on_request as the first step. Then, we call on_start (instead of start) to add the WebSocket server logic when starting up the parent server.

The following snippet showcases the setup for adding a WebSocket route to an HTTP server (you can find the full example under examples/web_socket/quote-server.cpp):

        // On "/ws/quotes/<arg>", we switch the protocol to WebSocket.
        .route("/ws/quotes/<arg>", http::method::get,
               ws::switch_protocol()
                 // Check that the client asks for a known philosopher.
                 .on_request(
                   [](ws::acceptor<caf::cow_string>& acc, std::string name) {
                     auto quotes = quotes_by_name(name);
                     if (quotes.empty()) {
                       auto err = make_error(caf::sec::invalid_argument,
                                             not_found_str(name));
                       acc.reject(std::move(err));
                     } else {
                       // Forward the name to the WebSocket worker.
                       acc.accept(caf::cow_string{std::move(name)});
                     }
                   })
                 // Spawn a worker for the WebSocket clients.
                 .on_start(
                   [&sys](trait::acceptor_resource<caf::cow_string> events) {
                     // Spawn a worker that reads from `events`.
                     using event_t = trait::accept_event<caf::cow_string>;
                     sys.spawn([events](caf::event_based_actor* self) {
                       // Each WS connection has a pull/push buffer pair.
                       self->make_observable()
                         .from_resource(events) //
                         .for_each([self](const event_t& ev) mutable {
                           // Forward the quotes to the client.
                           auto [pull, push, name] = ev.data();
                           auto quotes = quotes_by_name(name);
                           assert(!quotes.empty()); // Checked in on_request.
                           self->make_observable()
                             .from_container(quotes)
                             .map([](std::string_view quote) {
                               return ws::frame{quote};
                             })
                             .subscribe(push);
                           // We ignore whatever the client may send to us.
                           pull.observe_on(self).subscribe(std::ignore);
                         });
                     });
                   }))
        .route("/status", http::method::get,
               [](http::responder& res) {
                 res.respond(http::status::no_content);
               })

Clients

Clients use the same entry point as standalone servers, i.e., caf::net::web_socket::with. However, instead of calling accept, we call connect. Like accept, CAF offers multiple overloads for this function:

  • connect(std::string host, uint16_t port) for connecting to the server at host on the given port.
  • connect(uri endpoint) for connecting to the given server. The URI scheme must be either ws or wss.
  • connect(expected<uri> endpoint) like the previous overload. Forwards the error in case endpoint does not represent an actual value. Useful for passing the result of make_uri to connect directly without having to check the result first.
  • connect(stream_socket fd) for establishing a WebSocket connection on an already connected TCP socket.
  • connect(ssl::tcp_acceptor acc) for establishing a WebSocket connection on an already established SSL connection.

Note

When configuring an SSL context prior to calling connect, the URI scheme must be wss. When not configuring an SSL prior to calling connect and passing an URI with scheme wss, CAF will automatically create a SSL context for the connection.

After calling connect, we can configure various parameters:

  • do_on_error(F callback) installs a callback function that CAF calls if an error occurs while starting the server.
  • retry_delay(timespan) to set the delay between connection retries when a connection attempt fails.
  • connection_timeout(timespan) to set the maximum amount of time to wait for a connection attempt to succeed before giving up.
  • max_retry_count(size_t) to set the maximum number of times to retry a connection attempt before giving up.

Finally, we call start to launch the client. The function takes a function object that must take two parameters: the input and output resources for reading from and writing to the WebSocket connection. The types may be obtained from the trait class (caf::net::web_socket::default_trait unless passing a different type to with) via input_resource and output_resource. However, the function object (lambda) may also take the two parameters as auto for brevity, as shown in the example below.

int caf_main(caf::actor_system& sys, const config& cfg) {
  namespace ws = caf::net::web_socket;
  // Sanity checking.
  auto server = caf::get_as<caf::uri>(cfg, "server");
  if (!server) {
    std::cerr << "*** mandatory argument server missing or invalid\n";
    return EXIT_FAILURE;
  }
  // Ask the user for the hello message.
  std::string hello;
  std::cout << "Please enter a hello message for the server: " << std::flush;
  std::getline(std::cin, hello);
  // Try to establish a connection to the server and send the hello message.
  auto conn
    = ws::with(sys)
        // Connect to the given URI.
        .connect(server)
        // If we don't succeed at first, try up to 10 times with 1s delay.
        .retry_delay(1s)
        .max_retry_count(9)
        // On success, spin up a worker to manage the connection.
        .start([&sys, hello](auto pull, auto push) {
          sys.spawn([hello, pull, push](caf::event_based_actor* self) {
            // Open the pull handle.
            pull
              .observe_on(self)
              // Print errors to stderr.
              .do_on_error([](const caf::error& what) {
                std::cerr << "*** error while reading from the WebSocket: "
                          << to_string(what) << '\n';
              })
              // Restrict how many messages we receive if the user configured
              // a limit.
              .compose([self](auto in) {
                if (auto limit = caf::get_as<size_t>(self->config(), "max")) {
                  return std::move(in).take(*limit).as_observable();
                } else {
                  return std::move(in).as_observable();
                }
              })
              // Print a bye-bye message if the server closes the connection.
              .do_on_complete([] { //
                std::cout << "Server has closed the connection\n";
              })
              // Print everything from the server to stdout.
              .for_each([](const ws::frame& msg) {
                if (msg.is_text()) {
                  std::cout << "Server: " << msg.as_text() << '\n';
                } else if (msg.is_binary()) {
                  std::cout << "Server: [binary message of size "
                            << msg.as_binary().size() << "]\n";
                }
              });
            // Send our hello message and wait until the server closes the
            // socket. We keep the connection alive by never closing the write
            // channel.
            self->make_observable()
              .just(ws::frame{hello})
              .concat(self->make_observable().never<ws::frame>())
              .subscribe(push);
          });
        });
  if (!conn) {
    std::cerr << "*** unable to connect to " << server->str() << ": "
              << to_string(conn.error()) << '\n';
    return EXIT_FAILURE;
  }
  // Note: the actor system will keep the application running for as long as the
  // workers are still alive.
  return EXIT_SUCCESS;
}

Frames

The WebSocket protocol operates on so-called frames. A frame contains a single text or binary message. The class caf::net::web_socket::frame is an implicitly-shared handle type that represents a single WebSocket frame (binary or text).

The most commonly used member functions are as follows:

  • size_t size() const noexcept returns the size of the frame in bytes.
  • bool empty() const noexcept queries whether the frame has a size of 0.
  • bool is_binary() const noexcept queries whether the frame contains raw Bytes.
  • bool is_text() const noexcept queries whether the frame contains a text message.
  • const_byte_span as_binary() const noexcept accesses the bytes of a binary message.
  • std::string_view as_text() const noexcept accesses the characters of a text message.

For the full class interface, please refer to the Doxygen documentation.

Traits

A trait translates between text or binary frames on the network and the application by defining C++ types for reading and writing from/to the WebSocket connection. The trait class also binds these types to the asynchronous resources that connect the WebSocket in the background to the application logic.

The interface of the default looks as follows:

/// Configures a WebSocket server or client to operate on the granularity of
/// regular WebSocket frames.
class CAF_NET_EXPORT default_trait {
public:
  /// The input type of the application, i.e., what that flows from the
  /// WebSocket to the application layer.
  using input_type = frame;

  /// The output type of the application, i.e., what flows from the application
  /// layer to the WebSocket.
  using output_type = frame;

  /// A resource for consuming input_type elements.
  using input_resource = async::consumer_resource<input_type>;

  /// A resource for producing output_type elements.
  using output_resource = async::producer_resource<output_type>;

  /// An accept event from the server to transmit read and write handles.
  template <class... Ts>
  using accept_event = cow_tuple<input_resource, output_resource, Ts...>;

  /// A resource for consuming accept events.
  template <class... Ts>
  using acceptor_resource = async::consumer_resource<accept_event<Ts...>>;

  /// Queries whether `x` should be serialized as binary frame (`true`) or text
  /// frame (`false`).
  bool converts_to_binary(const output_type& x);

  /// Serializes an output to raw bytes for sending a binary frame
  /// (`converts_to_binary` returned `true`).
  bool convert(const output_type& x, byte_buffer& bytes);

  /// Serializes an output to ASCII (or UTF-8) characters for sending a text
  /// frame (`converts_to_binary` returned `false`).
  bool convert(const output_type& x, std::vector<char>& text);

  /// Converts the raw bytes from a binary frame to the input type.
  bool convert(const_byte_span bytes, input_type& x);

  /// Converts the characters from a text frame to the input type.
  bool convert(std::string_view text, input_type& x);

  /// Returns a description of the error if any of the `convert` functions
  /// returned `false`.
  error last_error();
};

Users may implement custom trait types by providing the same member functions and type aliases.