CAF User Manual

C++ Actor Framework version 0.19.3.

Contents

Introduction

Before diving into the API of CAF, we discuss the concepts behind it and explain the terminology used in this manual.

Actor Model

The actor model describes concurrent entities—actors—that do not share state and communicate only via asynchronous message passing. Decoupling concurrently running software components via message passing avoids race conditions by design. Actors can create—spawn—new actors and monitor each other to build fault-tolerant, hierarchical systems. Since message passing is network transparent, the actor model applies to both concurrency and distribution.

Implementing applications on top of low-level primitives such as mutexes and semaphores has proven challenging and error-prone. In particular when trying to implement applications that scale up to many CPU cores. Queueing, starvation, priority inversion, and false sharing are only a few of the issues that can decrease performance significantly in mutex-based concurrency models. In the extreme, an application written with the standard toolkit can run slower when adding more cores.

The actor model has gained momentum over the last decade due to its high level of abstraction and its ability to scale dynamically from one core to many cores and from one node to many nodes. However, the actor model has not yet been widely adopted in the native programming domain. With CAF, we contribute a library for actor programming in C++ as open-source software to ease native development of concurrent as well as distributed systems. In this regard, CAF follows the C++ philosophy building the highest abstraction possible without sacrificing performance.

Terminology

CAF is inspired by other implementations based on the actor model such as Erlang or Akka. It aims to provide a modern C++ API allowing for type-safe as well as dynamically typed messaging. While there are similarities to other implementations, we made many different design decisions that lead to slight differences when comparing CAF to other actor frameworks.

Dynamically Typed Actor

A dynamically typed actor accepts any kind of message and dispatches on its content dynamically at the receiver. This is the traditional messaging style found in implementations like Erlang or Akka. The upside of this approach is (usually) faster prototyping and less code. This comes at the cost of requiring excessive testing.

Statically Typed Actor

CAF achieves static type-checking for actors by defining abstract messaging interfaces. Since interfaces define both input and output types, CAF is able to verify messaging protocols statically. The upside of this approach is much higher robustness to code changes and fewer possible runtime errors. This comes at an increase in required source code, as developers have to define and use messaging interfaces.

Actor References

CAF uses reference counting for actors. The three ways to store a reference to an actor are addresses, handles, and pointers. Note that address does not refer to a memory region in this context.

Address

Each actor has a (network-wide) unique logical address. This identifier is represented by actor_addr, which allows to identify and monitor an actor. Unlike other actor frameworks, CAF does not allow users to send messages to addresses. This limitation is due to the fact that the address does not contain any type information. Hence, it would not be safe to send it a message, because the receiving actor might use a statically typed interface that does not accept the given message. Because an actor_addr fills the role of an identifier, it has weak reference semantics (see Reference Counting).

Handle

An actor handle contains the address of an actor along with its type information and is required for sending messages to actors. The distinction between handles and addresses—which is unique to CAF when comparing it to other actor systems—is a consequence of the design decision to enforce static type checking for all messages. Dynamically typed actors use actor handles, while statically typed actors use typed_actor<...> handles. Both types have strong reference semantics (see Reference Counting).

Pointer

In a few instances, CAF uses strong_actor_ptr to refer to an actor using strong reference semantics (see Reference Counting) without knowing the proper handle type. Pointers must be converted to a handle via actor_cast (see Converting Actor References with actor_cast) prior to sending messages. A strong_actor_ptr can be null.

Spawning

Spawning an actor means to create and run a new actor.

Monitor

A monitored actor sends a down message (see Down Handler) to all actors monitoring it as part of its termination. This allows actors to supervise other actors and to take actions when one of the supervised actors fails, i.e., terminates with a non-normal exit reason.

Experimental Features

Sections that discuss experimental features are highlighted with experimental. The API of such features is not stable. This means even minor updates to CAF can come with breaking changes to the API or even remove a feature completely. However, we encourage developers to extensively test such features and to start discussions to uncover flaws, report bugs, or tweaking the API in order to improve a feature or streamline it to cover certain use cases.

Overview

Compiling CAF requires CMake and a recent C++ compiler. To get and compile the sources on UNIX-like systems, type the following in a terminal:

git clone https://github.com/actor-framework/actor-framework
cd actor-framework
./configure
make -C build
make -C build install [as root, optional]

Running configure is not a mandatory step. The script merely automates the CMake setup and makes setting build options slightly more convenient. On Windows, use CMake directly to generate an MSVC project file.

Features

  • Lightweight, fast and efficient actor implementations
  • Network transparent messaging
  • Error handling based on Erlang’s failure model
  • Pattern matching for messages as internal DSL to ease development
  • Thread-mapped actors for soft migration of existing applications
  • Publish/subscribe group communication

Supported Operating Systems

  • Linux
  • Windows
  • macOS
  • FreeBSD

Hello World Example

#include "caf/actor_ostream.hpp"
#include "caf/actor_system.hpp"
#include "caf/caf_main.hpp"
#include "caf/event_based_actor.hpp"

#include <iostream>
#include <string>

using namespace caf;

behavior mirror(event_based_actor* self) {
  // return the (initial) actor behavior
  return {
    // a handler for messages containing a single string
    // that replies with a string
    [=](const std::string& what) -> std::string {
      // prints "Hello World!" via aout (thread-safe cout wrapper)
      aout(self) << what << std::endl;
      // reply "!dlroW olleH"
      return std::string{what.rbegin(), what.rend()};
    },
  };
}

void hello_world(event_based_actor* self, const actor& buddy) {
  // send "Hello World!" to our buddy ...
  self->request(buddy, std::chrono::seconds(10), "Hello World!")
    .then(
      // ... wait up to 10s for a response ...
      [=](const std::string& what) {
        // ... and print it
        aout(self) << what << std::endl;
      });
}

void caf_main(actor_system& sys) {
  // create a new actor that calls 'mirror()'
  auto mirror_actor = sys.spawn(mirror);
  // create another actor that calls 'hello_world(mirror_actor)';
  sys.spawn(hello_world, mirror_actor);
  // the system will wait until both actors are done before exiting the program
}

// creates a main function for us that calls our caf_main
CAF_MAIN()

Type Inspection

We designed CAF with distributed systems in mind. Hence, all message types must be serializable. Using a message type that is not serializable causes a compiler error unless explicitly listed as unsafe message type by the user (see Unsafe Message Types). Any unsafe message type may be used only for messages that remain local, i.e., never cross the wire.

Data Model

Type inspection in CAF uses a hierarchical data model with the following building blocks:

built-in types
  • Signed and unsigned integer types for 8, 16, 32 and 64 bit
  • The floating point types float, double and long double
  • Bytes, booleans, and strings
lists
Dynamically-sized container types such as std::vector.
tuples
Fixed-sized container types such as std::tuple or std::array as well as built-in C array types.
maps
Dynamically-sized container types with key/value pairs such as std::map.
objects
User-defined types. An object has one or more fields. Fields have a name and may be optional. Further, fields may take on a fixed number of different types.

To see how this maps to C++ types, consider the following type definition:

struct test {
  variant<string, double> x1;
  optional<tuple<double, double>> x2;
  vector<string> x3;
};

Here, field x1 is either a string or a double at runtime. The field x2 is optional and may contain a fixed-size tuple with two elements (built-in types). Lastly, field x3 contains any number of string values at runtime.

Inspecting Objects

The inspection API allows CAF to deconstruct C++ objects. Users can either provide free functions named inspect that CAF picks up via ADL or specialize caf::inspector_access.

In both cases, users call members and member functions on an Inspector that provides a domain-specific language (DSL) for describing the structure of a C++ object.

After listing a custom type T in a type ID block and either providing a free inspect function overload or specializing inspector_access, CAF is able to:

  • Serialize and deserialize objects of type T to/from Byte sequences.
  • Render objects of type T as a human-readable string via caf::deep_to_string.
  • Read objects of type T from a configuration file.

In the remainder of this section, we use the following Plain Old Data (POD) type point_3d in our code examples. Since all member variables of POD types are public, writing custom inspection code is straightforward and we can focus on the inspection API.

struct point_3d {
  int32_t x;
  int32_t y;
  int32_t z;
};

Note

We strongly recommend using the fixed-width integer types in all user-defined messaging types. Consistently using these types over short, int, long, etc. avoids bugs in heterogeneous environments that are hard to debug.

Writing inspect Overloads

Adding overloads for inspect generally provides the simplest way to teach CAF how to serialize and deserialize custom data types. We recommend this way of adding inspection support whenever possible, since it adds the least amount of boilerplate code.

For our POD type point_3d, we simply pass all member variables as fields to the inspector:

template <class Inspector>
bool inspect(Inspector& f, point_3d& x) {
  return f.object(x).fields(f.field("x", x.x),
                            f.field("y", x.y),
                            f.field("z", x.z));
}

As mentioned in the section on the data model, objects are containers for fields that in turn contain values. When providing an inspect overload, CAF recursively traverses all fields.

Not every type needs to expose itself as object, though. For example, consider the following ID type that simply wraps a string:

struct id { std::string value; };

template <class Inspector>
bool inspect(Inspector& f, id& x) {
  return f.object(x).fields(f.field("value", x.value));
}

The type id is basically a strong typedef to improve type safety when writing code. To a type inspector, ID objects look as follows:

object(type: "id") {
  field(name: "value") {
    value(type: "string") {
      ...
    }
  }
}

Now, this type has little use on its own. Usually, we would use such a type to compose other types such as the following type person:

struct person { std::string name; id key; };

template <class Inspector>
bool inspect(Inspector& f, person& x) {
  return f.object(x).fields(f.field("name", x.name), f.field("key", x.key));
}

By providing the inspect overload for ID, inspectors can recursively visit an id as an object. Hence, the above implementations work as expected. When using person in human-readable data formats such as CAF configurations, however, allowing CAF to look “inside” a strong typedef can simplify working with such types.

With the current implementation, we could read the key manager.ceo from a configuration file with this content:

manager {
  ceo {
    name = "Bob"
    key = {
      value = "TWFuIGlz"
    }
  }
}

This clearly appears more verbose than it needs to be. Users generally need not care about such internal types like id that only exist as a safeguard during programming.

Hence, we generally recommend making such types transparent to CAF inspectors. For our id type, the inspect overload may instead look as follows:

template <class Inspector>
bool inspect(Inspector& f, id& x) {
  return f.apply(x.value);
}

In contrast to the previous implementation, inspectors now simply read or write the strings as values whenever they encounter an id. This simplifies our config file from before and thus gives a much cleaner interface to users:

manager {
  ceo {
    name = "Bob"
    key = "TWFuIGlz"
  }
}

Specializing inspector_access

Working with 3rd party libraries usually rules out adding free functions for existing classes, because the namespace belongs to a another project. Hence, CAF also allows specializing inspector_access instead. This requires writing more boilerplate code but allows customizing every step of the inspection process.

The full interface of inspector_access looks as follows:

template <class T>
struct inspector_access {
  template <class Inspector>
  static bool apply(Inspector& f, T& x);

  template <class Inspector>
  static bool save_field(Inspector& f, string_view field_name, T& x);

  template <class Inspector, class IsPresent, class Get>
  static bool save_field(Inspector& f, string_view field_name,
                         IsPresent& is_present, Get& get);

  template <class Inspector, class IsValid, class SyncValue>
  static bool load_field(Inspector& f, string_view field_name, T& x,
                         IsValid& is_valid, SyncValue& sync_value);

  template <class Inspector, class IsValid, class SyncValue, class SetFallback>
  static bool load_field(Inspector& f, string_view field_name, T& x,
                         IsValid& is_valid, SyncValue& sync_value,
                         SetFallback& set_fallback);
};

The static member function apply has the same role as the free inspect function. For most types, we can implement only apply and use a default implementation for the other member functions. For example, specializing inspector_access for our point_3d would look as follows:

namespace caf {

template <>
struct inspector_access<point_3d> : inspector_access_base<point_3d> {
  template <class Inspector>
  static bool apply(Inspector& f, point_3d& x) {
    return f.object(x).fields(f.field("x", x.x),
                              f.field("y", x.y),
                              f.field("z", x.z));
  }
};

} // namespace caf

By inheriting from inspector_access_base, we use the default implementations for save_field and load_field. Customizing this set of functions only becomes necessary when integration custom types that have semantics similar to tuple, variant, or optional.

Note

Please refer to the Doxygen documentation for more details on save_field and load_field.

Types with Getter and Setter Access

Types that declare their fields private and only grant access via getter and setter cannot pass references to the member variables to the inspector. Instead, they can pass a pair of function objects to the inspector to read and write the field.

Consider the following non-POD type foobar:

class foobar {
public:
  const std::string& foo() {
    return foo_;
  }

  void foo(std::string value) {
    foo_ = std::move(value);
  }

  const std::string& bar() {
    return bar_;
  }

  void bar(std::string value) {
    bar_ = std::move(value);
  }

private:
  std::string foo_;
  std::string bar_;
};

Since foo_ and bar_ are not accessible from outside the class, the inspector has to use the getter and setter functions. However, C++ has no formalized API for getters and setters. Moreover, not all setters are so trivial as in the example above. Setters may enforce invariants, for example, and thus may fail.

In order to work with any flair of getter and setter functions, CAF requires users to wrap these member functions calls into two function objects. The first one wraps the getter, takes no arguments, and returns the underlying value (either by reference or by value). The second one wraps the setter, takes exactly one argument (the new value), and returns a bool that indicates whether the operation succeeded (by returning true) or failed (by returning false).

The example below shows a possible inspect implementation for the fobar class shown before:

template <class Inspector>
bool inspect(Inspector& f, foobar& x) {
  auto get_foo = [&x]() -> decltype(auto) { return x.foo(); };
  auto set_foo = [&x](std::string value) {
    x.foo(std::move(value));
    return true;
  };
  auto get_bar = [&x]() -> decltype(auto) { return x.bar(); };
  auto set_bar = [&x](std::string value) {
    x.bar(std::move(value));
    return true;
  };
  return f.object(x).fields(f.field("foo", get_foo, set_foo),
                            f.field("bar", get_bar, set_bar));
}

Note

For classes that lie in the responsibility of the same developers that implement the inspect function, implementing inspect as friend function inside the class usually can avoid going through the getter and setter functions.

Fallbacks and Invariants

For each field, we may provide a fallback value for optional fields or a predicate that checks invariants on the data (or both). For example, consider the following class duration and its implementation for inspect:

struct duration {
  string unit;
  double count;
};

bool valid_time_unit(const string& unit) {
  return unit == "seconds" || unit == "minutes";
}

template <class Inspector>
bool inspect(Inspector& f, duration& x) {
  return f.object(x).fields(
    f.field("unit", x.unit).fallback("seconds").invariant(valid_time_unit),
    f.field("count", x.count));
}

In “real code”, we probably would not use a string to store the time unit. However, with the fallback, we have enabled CAF to use "seconds" whenever the input contains no value for the unit field. Further, the invariant makes sure that we verify our input before accepting it.

With this implementation for inspect, we could use duration in a configuration files as follows (assuming a parameter named example-app.request-timeout):

# example 1: ok, falls back to "seconds"
example-app {
  request-timeout {
    count = 1.3
  }
}

# example 2: ok, explicit definition of the time unit
example-app {
  request-timeout {
    count = 1.3
    unit = "minutes"
  }
}

# example 3: error, "parsecs" is not a time unit (invariant does not hold)
example-app {
  request-timeout {
    count = 12
    unit = "parsecs"
  }
}

Splitting Save and Load

When writing custom inspect functions, providing a single overload for all inspectors may result in undesired tradeoffs or convoluted code. Sometimes, inspection code can benefit from splitting it into a save and a load function. For this reason, all inspector provide a static constant called is_loading. This allows delegating to custom functions via enable_if or if constexpr:

template <class Inspector>
bool inspect(Inspector& f, my_class& x) {
  if constexpr (Inspector:is_loading)
    return load(f, x);
  else
    return save(f, x);
}

Specializing on the Data Format

Much like is_loading allows client code to dispatch based on the mode of an inspector, the member function has_human_readable_format() allows client code to dispatch based on the data format.

The canonical example for choosing a different data representation for human-readable input and output is the enum type. When generating data for machine-to-machine communication, using the underlying integer representation gives the best performance. However, using the constant names results in a much better user experience in all other cases.

The following code illustrates how to provide a string representation for inspectors that operate on human-readable data representations while operating directly on the underlying type of the enum class otherwise.

enum class weekday : uint8_t {
  monday,
  tuesday,
  wednesday,
  thursday,
  friday,
  saturday,
  sunday,
};

std::string to_string(weekday);

bool parse(std::string_view input, weekday& dest);

template <class Inspector>
bool inspect(Inspector& f, weekday& x) {
  if (f.has_human_readable_format()) {
    auto get = [&x] { return to_string(x); };
    auto set = [&x](std::string str) { return parse(str, x); };
    return f.apply(get, set);
  } else {
    auto get = [&x] { return static_cast<uint8_t>(x); };
    auto set = [&x](uint8_t val) {
      if (val < 7) {
        x = static_cast<weekday>(val);
        return true;
      } else {
        return false;
      }
    };
    return f.apply(get, set);
  }
}

When inspecting an object of type weekday, we treat is as if it were a string for inspectors with human-readable data formats. Otherwise, we treat the weekday as if it were an integer between 0 and 6.

Unsafe Message Types

Message types that do not provide serialization code cause compile time errors when used in actor communication. When using CAF for concurrency only, this errors can be suppressed by explicitly allowing types via CAF_ALLOW_UNSAFE_MESSAGE_TYPE. The macro is defined as follows.

#define CAF_ALLOW_UNSAFE_MESSAGE_TYPE(type_name)                             \
  namespace caf {                                                            \
  template <>                                                                \
  struct allowed_unsafe_message_type<type_name> : std::true_type {};         \
  }

Keep in mind that unsafe means that your program runs into undefined behavior (or segfaults) when you break your promise and try to serialize messages that contain unsafe message types.

Note

Even unsafe messages types still require a type ID.

Message Handlers

Actors can store a set of callbacks—usually implemented as lambda expressions—using either behavior or message_handler. The former stores an optional timeout, while the latter is composable.

Definition and Composition

As the name implies, a behavior defines the response of an actor to messages it receives. The optional timeout allows an actor to dynamically change its behavior when not receiving message after a certain amount of time.

message_handler x1{
  [](int32_t i) { /*...*/ },
  [](double db) { /*...*/ },
  [](int32_t a, int32_t b, int32_t c) { /*...*/ }
};

In our first example, x1 models a behavior accepting messages that consist of either exactly one int, or one double, or three int values. Any other message is not matched and gets forwarded to the default handler (see Default Handler).

message_handler x2{
  [](double db) { /*...*/ },
  [](double db) { /* - unreachable - */ }
};

Our second example illustrates an important characteristic of the matching mechanism. Each message is matched against the callbacks in the order they are defined. The algorithm stops at the first match. Hence, the second callback in x2 is unreachable.

message_handler x3 = x1.or_else(x2);
message_handler x4 = x2.or_else(x1);

Message handlers can be combined using or_else. This composition is not commutative, as our third examples illustrates. The resulting message handler will first try to handle a message using the left-hand operand and will fall back to the right-hand operand if the former did not match. Thus, x3 behaves exactly like x1. This is because the second callback in x1 will consume any message with a single double and both callbacks in x2 are thus unreachable. The handler x4 will consume messages with a single double using the first callback in x2, essentially overriding the second callback in x1.

Atoms

Defining message handlers in terms of callbacks is convenient, but requires a simple way to annotate messages with meta data. Imagine an actor that provides a mathematical service for integers. It receives two integers, performs a user-defined operation and returns the result. Without additional context, the actor cannot decide whether it should multiply or add the integers. Thus, the operation must be encoded into the message. The Erlang programming language introduced an approach to use non-numerical constants, so-called atoms, which have an unambiguous, special-purpose type and do not have the runtime overhead of string constants.

Atoms in CAF are tag types, i.e., usually defined as en empty struct. These types carry no data on their own and only exist to annotate messages. For example, we could create the two tag types add_atom and multiply_atom for implementing a simple math actor like this:

CAF_BEGIN_TYPE_ID_BLOCK(my_project, caf::first_custom_type_id)

  CAF_ADD_ATOM(my_project, add_atom)
  CAF_ADD_ATOM(my_project, multiply_atom)

CAF_END_TYPE_ID_BLOCK(my_project)

 behavior do_math{
   [](add_atom, int32_t a, int32_t b) {
     return a + b;
   },
   [](multiply_atom, int32_t a, int32_t b) {
     return a * b;
   }
 };


 // caller side: send(math_actor, add_atom_v, int32_t{1}, int32_t{2})

The macro CAF_ADD_ATOM defined an empty struct with the given name as well as a constexpr variable for conveniently creating a value of that type that uses the type name plus a _v suffix. In the example above, atom_value is the type name and atom_value_v is the constant.

Actors

Actors in CAF are a lightweight abstraction for units of computations. They are active objects in the sense that they own their state and do not allow others to access it. The only way to modify the state of an actor is sending messages to it.

CAF provides several actor implementations, each covering a particular use case. The available implementations differ in three characteristics: (1) dynamically or statically typed, (2) class-based or function-based, and (3) using asynchronous event handlers or blocking receives. These three characteristics can be combined freely, with one exception: statically typed actors are always event-based. For example, an actor can have dynamically typed messaging, implement a class, and use blocking receives. The common base class for all user-defined actors is called local_actor.

Dynamically typed actors are more familiar to developers coming from Erlang or Akka. They (usually) enable faster prototyping but require extensive unit testing. Statically typed actors require more source code but enable the compiler to verify communication between actors. Since CAF supports both, developers can freely mix both kinds of actors to get the best of both worlds. A good rule of thumb is to make use of static type checking for actors that are visible across multiple translation units.

Actors that utilize the blocking receive API always require an exclusive thread of execution. Event-based actors, on the other hand, are usually scheduled cooperatively and are very lightweight with a memory footprint of only few hundred bytes. Developers can exclude—detach—event-based actors that potentially starve others from the cooperative scheduling while spawning it. A detached actor lives in its own thread of execution.

Environment / Actor Systems

All actors live in an actor_system representing an actor environment including Scheduler, Registry, and optional components such as a Middleman. A single process can have multiple actor_system instances, but this is usually not recommended (a use case for multiple systems is to strictly separate two or more sets of actors by running them in different schedulers). For configuration and fine-tuning options of actor systems see Configuring Actor Applications. A distributed CAF application consists of two or more connected actor systems. We also refer to interconnected actor_system instances as a distributed actor system.

Common Actor Base Types

The following pseudo-UML depicts the class diagram for actors in CAF. Irrelevant member functions and classes as well as mixins are omitted for brevity. Selected individual classes are presented in more detail in the following sections.

Actor Types in CAF

Class local_actor

The class local_actor is the root type for all user-defined actors in CAF. It defines all common operations. However, users of the library usually do not inherit from this class directly. Proper base classes for user-defined actors are event_based_actor or blocking_actor. The following table also includes member function inherited from monitorable_actor and abstract_actor.

Types  
mailbox_type A concurrent, many-writers-single-reader queue type.
   
Constructors  
(actor_config&) Constructs the actor using a config.
   
Observers  
actor_addr address() Returns the address of this actor.
actor_system& system() Returns context()->system().
actor_system& home_system() Returns the system that spawned this actor.
execution_unit* context() Returns underlying thread or current scheduler worker.
   
Customization Points  
on_exit() Can be overridden to perform cleanup code.
const char* name() Returns a debug name for this actor type.
   
Actor Management  
link_to(other) Links to other (see Link).
unlink_from(other) Remove the link to other.
monitor(other) Adds a monitor to other (see Monitor).
demonitor(other) Removes a monitor from whom.
spawn(F fun, xs...) Spawns a new actor from fun.
spawn<T>(xs...) Spawns a new actor of type T.
   
Message Processing  
T make_response_promise<Ts...>() Allows an actor to delay its response message.
T response(xs...) Convenience function for creating fulfilled promises.

Class scheduled_actor

All scheduled actors inherit from scheduled_actor. This includes statically and dynamically typed event-based actors as well as brokers Network I/O with Brokers.

Types  
pointer scheduled_actor*
exception_handler function<error (pointer, std::exception_ptr&)>
default_handler function<result<message> (pointer, message_view&)>
error_handler function<void (pointer, error&)>
down_handler function<void (pointer, down_msg&)>
exit_handler function<void (pointer, exit_msg&)>
   
Constructors  
(actor_config&) Constructs the actor using a config.
   
Termination  
quit() Stops this actor with normal exit reason.
quit(error x) Stops this actor with error x.
   
Special-purpose Handlers  
set_exception_handler(F f) Installs f for converting exceptions to errors (see Errors).
set_down_handler(F f) Installs f to handle down messages (see Down Handler).
set_exit_handler(F f) Installs f to handle exit messages (see Exit Handler).
set_error_handler(F f) Installs f to handle error messages (see Error Handler).
set_default_handler(F f) Installs f as fallback message handler (see Default Handler).

Class blocking_actor

A blocking actor always lives in its own thread of execution. They are not as lightweight as event-based actors and thus do not scale up to large numbers. The primary use case for blocking actors is to use a scoped_actor for ad-hoc communication to selected actors. Unlike scheduled actors, CAF does not dispatch system messages to special-purpose handlers. A blocking actor receives all messages regularly through its mailbox. A blocking actor is considered done only after it returned from act (or from the implementation in function-based actors). A scoped_actor sends its exit messages as part of its destruction.

Constructors  
(actor_config&) Constructs the actor using a config.
   
Customization Points  
void act() Implements the behavior of the actor.
   
Termination  
const error& fail_state() Returns the current exit reason.
fail_state(error x) Sets the current exit reason.
   
Actor Management  
wait_for(Ts... xs) Blocks until all actors xs... are done.
await_all_other_actors_done() Blocks until all other actors are done.
   
Message Handling  
receive(Ts... xs) Receives a message using the callbacks xs....
receive_for(T& begin, T end) See receive-loop.
receive_while(F stmt) See receive-loop.
do_receive(Ts... xs) See receive-loop.

Messaging Interfaces

Statically typed actors require abstract messaging interfaces to allow the compiler to type-check actor communication. Interfaces in CAF are defined using the variadic template typed_actor<...>, which defines the proper actor handle at the same time. Each template parameter defines one input/output pair via function signature syntax with the return type wrapped in a result. For example, typed_actor<result<string, string>(double)>. Also, the arguments must not use any cv-qualifiers.

In the same way functions cannot be overloaded only by their return type, interfaces cannot accept one input twice (possibly mapping it to different outputs). The example below defines a messaging interface for a simple calculator.

using calculator_actor
  = typed_actor<result<int32_t>(add_atom, int32_t, int32_t),
                result<int32_t>(sub_atom, int32_t, int32_t)>;

It is not required to create a type alias such as calculator_actor, but it makes dealing with statically typed actors much easier. Also, a central alias definition eases refactoring later on.

Interfaces have set semantics. This means the following two type aliases i1 and i2 are considered equal by CAF:

using i1 = typed_actor<replies_to<A>::with<B>, replies_to<C>::with<D>>;
using i2 = typed_actor<replies_to<C>::with<D>, replies_to<A>::with<B>>;

Further, actor handles of type A are assignable to handles of type B as long as B is a subset of A.

For convenience, the class typed_actor<...> defines the member types shown below to grant access to derived types.

Types  
behavior_type A statically typed set of message handlers.
base Base type for actors, i.e., typed_event_based_actor<...>.
pointer A pointer of type base*.
stateful_impl<T> See stateful-actor.
stateful_pointer<T> A pointer of type stateful_impl<T>*.
extend<Ts...> Extend this typed actor with Ts....
extend_with<Other> Extend this typed actor with all cases from Other.

Spawning Actors

Both statically and dynamically typed actors are spawned from an actor_system using the member function spawn. The function either takes a function as first argument or a class as first template parameter. For example, the following functions and classes represent actors.

behavior calculator_fun(event_based_actor* self);
void blocking_calculator_fun(blocking_actor* self);
calculator_actor::behavior_type typed_calculator_fun();
class calculator;
class blocking_calculator;
class typed_calculator;

Spawning an actor for each implementation is illustrated below.

  auto a1 = sys.spawn(blocking_calculator_fun);
  auto a2 = sys.spawn(calculator_fun);
  auto a3 = sys.spawn(typed_calculator_fun);
  auto a4 = sys.spawn<blocking_calculator>();
  auto a5 = sys.spawn<calculator>();
  auto a6 = sys.spawn<typed_calculator>();

Additional arguments to spawn are passed to the constructor of a class or used as additional function arguments, respectively. In the example above, none of the three functions takes any argument other than the implicit but optional self pointer.

Function-based Actors

When using a function or function object to implement an actor, the first argument can be used to capture a pointer to the actor itself. The type of this pointer is usually event_based_actor* or blocking_actor*. The proper pointer type for any typed_actor handle T can be obtained via T::pointer interface.

Blocking actors simply implement their behavior in the function body. The actor is done once it returns from that function.

Event-based actors can either return a behavior (see Message Handlers) that is used to initialize the actor or explicitly set the initial behavior by calling self->become(...). Due to the asynchronous, event-based nature of this kind of actor, the function usually returns immediately after setting a behavior (message handler) for the next incoming message. Hence, variables on the stack will be out of scope once a message arrives. Managing state in function-based actors can be done either via rebinding state with become, using heap-located data referenced via std::shared_ptr or by using the stateful actor abstraction (see Stateful Actors).

The following three functions implement the prototypes shown in spawn and illustrate one blocking actor and two event-based actors (statically and dynamically typed).

// function-based, dynamically typed, event-based API
behavior calculator_fun(event_based_actor*) {
  return {
    [](add_atom, int32_t a, int32_t b) { return a + b; },
    [](sub_atom, int32_t a, int32_t b) { return a - b; },
  };
}

// function-based, dynamically typed, blocking API
void blocking_calculator_fun(blocking_actor* self) {
  bool running = true;
  self->receive_while(running)( //
    [](add_atom, int32_t a, int32_t b) { return a + b; },
    [](sub_atom, int32_t a, int32_t b) { return a - b; },
    [&](exit_msg& em) {
      if (em.reason) {
        self->fail_state(std::move(em.reason));
        running = false;
      }
    });
}

// function-based, statically typed, event-based API
calculator_actor::behavior_type typed_calculator_fun() {
  return {
    [](add_atom, int32_t a, int32_t b) { return a + b; },
    [](sub_atom, int32_t a, int32_t b) { return a - b; },
  };
}

Class-based Actors

Implementing an actor using a class requires the following:

  • Provide a constructor taking a reference of type actor_config& as first argument, which is forwarded to the base class. The config is passed implicitly to the constructor when calling spawn, which also forwards any number of additional arguments to the constructor.
  • Override make_behavior for event-based actors and act for blocking actors.

Implementing actors with classes works for all kinds of actors and allows simple management of state via member variables. However, composing states via inheritance can get quite tedious. For dynamically typed actors, composing states is particularly hard, because the compiler cannot provide much help.

The following three classes implement the prototypes shown in spawn by delegating to the function-based implementations we have seen before:

// class-based, dynamically typed, event-based API
class calculator : public event_based_actor {
public:
  calculator(actor_config& cfg) : event_based_actor(cfg) {
    // nop
  }

  behavior make_behavior() override {
    return calculator_fun(this);
  }
};

// class-based, dynamically typed, blocking API
class blocking_calculator : public blocking_actor {
public:
  blocking_calculator(actor_config& cfg) : blocking_actor(cfg) {
    // nop
  }

  void act() override {
    blocking_calculator_fun(this);
  }
};

// class-based, statically typed, event-based API
class typed_calculator : public calculator_actor::base {
public:
  typed_calculator(actor_config& cfg) : calculator_actor::base(cfg) {
    // nop
  }

  behavior_type make_behavior() override {
    return typed_calculator_fun();
  }
};

Stateful Actors

The stateful actor API makes it easy to maintain state in function-based actors. It is also safer than putting state in member variables, because the state ceases to exist after an actor is done and is not delayed until the destructor runs. For example, if two actors hold a reference to each other via member variables, they produce a cycle and neither will get destroyed. Using stateful actors instead breaks the cycle, because references are destroyed when an actor calls self->quit() (or is killed externally). The following example illustrates how to implement stateful actors with static typing as well as with dynamic typing.

using cell = typed_actor<
  // 'put' updates the value of the cell.
  result<void>(put_atom, int32_t),
  // 'get' queries the value of the cell.
  result<int32_t>(get_atom)>;

struct cell_state {
  int32_t value = 0;
  static inline const char* name = "example.cell";
};

cell::behavior_type type_checked_cell(cell::stateful_pointer<cell_state> self) {
  return {
    [=](put_atom, int32_t val) { self->state.value = val; },
    [=](get_atom) { return self->state.value; },
  };
}

behavior unchecked_cell(stateful_actor<cell_state>* self) {
  return {
    [=](put_atom, int32_t val) { self->state.value = val; },
    [=](get_atom) { return self->state.value; },
  };
}

Stateful actors are spawned in the same way as any other function-based actor function-based.

  // Create one cell for each implementation.
  auto cell1 = system.spawn(type_checked_cell);
  auto cell2 = system.spawn(unchecked_cell);

Attaching Cleanup Code to Actors

Users can attach cleanup code to actors. This code is executed immediately if the actor has already exited. Otherwise, the actor will execute it as part of its termination. The following example attaches a function object to actors for printing a custom string on exit.

// Utility function to print an exit message with custom name.
void print_on_exit(const actor& hdl, const std::string& name) {
  hdl->attach_functor([=](const error& reason) {
    cout << name << " exited: " << to_string(reason) << endl;
  });
}

It is possible to attach code to remote actors. However, the cleanup code will run on the local machine.

Blocking Actors

Blocking actors always run in a separate thread and are not scheduled by CAF. Unlike event-based actors, blocking actors have explicit, blocking receive functions. Further, blocking actors do not handle system messages automatically via special-purpose callbacks (see Default and System Message Handlers). This gives users full control over the behavior of blocking actors. However, blocking actors still should follow conventions of the actor system. For example, actors should unconditionally terminate after receiving an exit_msg with reason exit_reason::kill.

Receiving Messages

The function receive sequentially iterates over all elements in the mailbox beginning with the first. It takes a message handler that is applied to the elements in the mailbox until an element was matched by the handler. An actor calling receive is blocked until it successfully dequeued a message from its mailbox or an optional timeout occurs. Messages that are not matched by the behavior are automatically skipped and remain in the mailbox.

self->receive (
  [](int x) { /* ... */ }
);

Catch-all Receive Statements

Blocking actors can use inline catch-all callbacks instead of setting a default handler (see Default Handler). A catch-all case must be the last callback before the optional timeout, as shown in the example below.

self->receive(
  [&](float x) {
    // ...
  },
  [&](const down_msg& x) {
    // ...
  },
  [&](const exit_msg& x) {
    // ...
  },
  others >> [](message& x) -> skippable_result {
    // report unexpected message back to client
    return sec::unexpected_message;
  }
);

Receive Loops

Message handler passed to receive are temporary object at runtime. Hence, calling receive inside a loop creates an unnecessary amount of short-lived objects. CAF provides predefined receive loops to allow for more efficient code.

// BAD
std::vector<int> results;
for (size_t i = 0; i < 10; ++i)
  receive (
    [&](int value) {
      results.push_back(value);
    }
  );

// GOOD
std::vector<int> results;
size_t i = 0;
receive_for(i, 10) (
  [&](int value) {
    results.push_back(value);
  }
);
// BAD
size_t received = 0;
while (received < 10) {
  receive (
    [&](int) {
      ++received;
    }
  );
} ;

// GOOD
size_t received = 0;
receive_while([&] { return received < 10; }) (
  [&](int) {
    ++received;
  }
);
// BAD
size_t received = 0;
do {
  receive (
    [&](int) {
      ++received;
    }
  );
} while (received < 10);

// GOOD
size_t received = 0;
do_receive (
  [&](int) {
    ++received;
  }
).until([&] { return received >= 10; });

The examples above illustrate the correct usage of the three loops receive_for, receive_while and do_receive(...).until. It is possible to nest receives and receive loops.

bool running = true;
self->receive_while([&] { return running; }) (
  [&](int value1) {
    self->receive (
      [&](float value2) {
        aout(self) << value1 << " => " << value2 << endl;
      }
    );
  },
  // ...
);

Scoped Actors

The class scoped_actor offers a simple way of communicating with CAF actors from non-actor contexts. It overloads operator-> to return a blocking_actor*. Hence, it behaves like the implicit self pointer in functor-based actors, only that it ceases to exist at scope end.

void test(actor_system& system) {
  scoped_actor self{system};
  // spawn some actor
  auto aut = self->spawn(my_actor_impl);
  self->send(aut, "hi there");
  // self will be destroyed automatically here; any
  // actor monitoring it will receive down messages etc.
}

Message Passing

The messaging layer of CAF has three primitives for sending messages: send, request, and delegate. The former simply enqueues a message to the mailbox of the receiver. The latter two are discussed in more detail in Requests and Delegating Messages. Before we go into the details of the message passing API itself, we first discuss the building blocks that enable message passing in the first place.

Structure of Mailbox Elements

When enqueuing a message to the mailbox of an actor, CAF wraps the content of the message into a mailbox_element (shown below) to add meta data and processing paths.

UML class diagram for ``mailbox_element``

The sender is stored as a strong_actor_ptr (see Pointer) and denotes the origin of the message. The message ID is either 0—invalid—or a positive integer value that allows the sender to match a response to its request. The stages vector stores the path of the message. Response messages, i.e., the returned values of a message handler, are sent to stages.back() after calling stages.pop_back(). This allows CAF to build pipelines of arbitrary size. If no more stage is left, the response reaches the sender. Finally, payload is the actual content of the message.

Mailbox elements are created by CAF automatically and are usually invisible to the programmer. However, understanding how messages are processed internally helps understanding the behavior of the message passing layer.

Copy on Write

CAF allows multiple actors to implicitly share message contents, as long as no actor performs writes. This allows sending the same message to multiple receivers without copying overhead, as long as all receivers only read the content of the message.

Actors copy message contents whenever other actors hold references to it and if one or more arguments of a message handler take a mutable reference.

Requirements for Message Types

Message types in CAF must meet the following requirements:

  1. Inspectable (see Type Inspection)
  2. Default constructible
  3. Copy constructible

A type T is inspectable if it provides a free function inspect(Inspector&, T&) or specializes inspector_access. Requirement 2 is a consequence of requirement 1, because CAF needs to be able to create an object for T when deserializing incoming messages. Requirement 3 allows CAF to implement Copy on Write (see Copy on Write).

Default and System Message Handlers

CAF has three system-level message types (down_msg, exit_msg, and error) that all actors should handle regardless of their current state. Consequently, event-based actors handle such messages in special-purpose message handlers. Additionally, event-based actors have a fallback handler for unmatched messages. Note that blocking actors have neither of those special-purpose handlers (see Blocking Actors).

Down Handler

Actors can monitor the lifetime of other actors by calling self->monitor(other). This will cause the runtime system of CAF to send a down_msg for other if it dies. Actors drop down messages unless they provide a custom handler via set_down_handler(f), where f is a function object with signature void (down_msg&) or void (scheduled_actor*, down_msg&). The latter signature allows users to implement down message handlers as free function.

Exit Handler

Bidirectional monitoring with a strong lifetime coupling is established by calling self->link_to(other). This will cause the runtime to send an exit_msg if either this or other dies. Per default, actors terminate after receiving an exit_msg unless the exit reason is exit_reason::normal. This mechanism propagates failure states in an actor system. Linked actors form a sub system in which an error causes all actors to fail collectively. Actors can override the default handler via set_exit_handler(f), where f is a function object with signature void (exit_message&) or void (scheduled_actor*, exit_message&).

Error Handler

Actors send error messages to others by returning an error (see Errors) from a message handler. Similar to exit messages, error messages usually cause the receiving actor to terminate, unless a custom handler was installed via set_error_handler(f), where f is a function object with signature void (error&) or void (scheduled_actor*, error&). Additionally, request accepts an error handler as second argument to handle errors for a particular request (see Error Handling in Requests). The default handler is used as fallback if request is used without error handler.

Default Handler

The default handler is called whenever the behavior of an actor did not match the input. Actors can change the default handler by calling set_default_handler. The expected signature of the function object is result<message> (scheduled_actor*, message_view&), whereas the self pointer can again be omitted. The default handler can return a response message or cause the runtime to skip the input message to allow an actor to handle it in a later state. CAF provides the following built-in implementations: reflect, reflect_and_quit, print_and_drop, drop, and skip. The former two are meant for debugging and testing purposes and allow an actor to simply return an input. The next two functions drop unexpected messages with or without printing a warning beforehand. Finally, skip leaves the input message in the mailbox. The default is print_and_drop.

Note: print_and_drop and drop return an error message that is delivered to the sender of the unexpected message. If that actor does not have an explicit handler for error messages it will terminate.

Requests

A main feature of CAF is its ability to couple input and output types via the type system. For example, a typed_actor<result<int32_t>(int32_t)> essentially behaves like a function. It receives a single int32_t as input and responds with another int32_t. CAF embraces this functional take on actors by simply creating response messages from the result of message handlers. This allows CAF to match request to response messages and to provide a convenient API for this style of communication.

Sending Requests and Handling Responses

Actors send request messages by calling request(receiver, timeout, content...). This function returns an intermediate object that allows an actor to set a one-shot handler for the response message. Event-based actors can use either request(...).then or request(...).await. The former multiplexes the one-shot handler with the regular actor behavior and handles requests as they arrive. The latter suspends the regular actor behavior until all awaited responses arrive and handles requests in LIFO order. Blocking actors always use request(...).receive, which blocks until the one-shot handler was called. Actors receive a sec::request_timeout (see Default Error Codes) error message (see Error Handler) if a timeout occurs. Users can set the timeout to infinite for unbound operations. This is only recommended if the receiver is known to run locally.

In our following example, we use the simple cell actor shown below as communication endpoint.

using cell
  = typed_actor<result<void>(put_atom, int32_t), // 'put' writes to the cell
                result<int32_t>(get_atom)>;      // 'get 'reads from the cell

struct cell_state {
  static constexpr inline const char* name = "cell";

  cell::pointer self;

  int32_t value;

  cell_state(cell::pointer ptr, int32_t val) : self(ptr), value(val) {
    // nop
  }

  cell_state(const cell_state&) = delete;

  cell_state& operator=(const cell_state&) = delete;

  cell::behavior_type make_behavior() {
    return {
      [this](put_atom, int32_t val) { value = val; },
      [this](get_atom) { return value; },
    };
  }
};

using cell_impl = cell::stateful_impl<cell_state>;

To showcase the slight differences in API and processing order, we implement three testee actors that all operate on a list of cell actors.

void waiting_testee(event_based_actor* self, vector<cell> cells) {
  for (auto& x : cells)
    self->request(x, seconds(1), get_atom_v).await([self, x](int32_t y) {
      aout(self) << "cell #" << x.id() << " -> " << y << endl;
    });
}

void multiplexed_testee(event_based_actor* self, vector<cell> cells) {
  for (auto& x : cells)
    self->request(x, seconds(1), get_atom_v).then([self, x](int32_t y) {
      aout(self) << "cell #" << x.id() << " -> " << y << endl;
    });
}

void blocking_testee(blocking_actor* self, vector<cell> cells) {
  for (auto& x : cells)
    self->request(x, seconds(1), get_atom_v)
      .receive(
        [&](int32_t y) {
          aout(self) << "cell #" << x.id() << " -> " << y << endl;
        },
        [&](error& err) {
          aout(self) << "cell #" << x.id() << " -> " << to_string(err) << endl;
        });
}

Our caf_main for the examples spawns five cells and assign the initial values 0, 1, 4, 9, and 16. Then it spawns one instance for each of our testee implementations and we can observe the different outputs.

Our waiting_testee actor will always print:

cell #9 -> 16
cell #8 -> 9
cell #7 -> 4
cell #6 -> 1
cell #5 -> 0

This is because await puts the one-shots handlers onto a stack and enforces LIFO order by re-ordering incoming response messages as necessary.

The multiplexed_testee implementation does not print its results in a predicable order. Response messages arrive in arbitrary order and are handled immediately.

Finally, the blocking_testee has a deterministic output again. This is because it blocks on each request until receiving the result before making the next request.

cell #5 -> 0
cell #6 -> 1
cell #7 -> 4
cell #8 -> 9
cell #9 -> 16

Both event-based approaches send all requests, install a series of one-shot handlers, and then return from the implementing function. In contrast, the blocking function waits for a response before sending another request.

Sending Multiple Requests

Sending the same message to a group of workers is a common work flow in actor applications. Usually, a manager maintains a set of workers. On request, the manager fans-out the request to all of its workers and then collects the results. The function fan_out_request combined with the merge policy select_all streamlines this exact use case.

In the following snippet, we have a matrix actor self that stores worker actors for each cell (each simply storing an integer). For computing the average over a row, we use fan_out_request. The result handler passed to then now gets called only once with a vector holding all collected results. Using a response promise promise further allows us to delay responding to the client until we have collected all worker results.

    [=](get_atom get, average_atom, column_atom, int column) {
      assert(column < columns);
      std::vector<cell> columns;
      columns.reserve(rows);
      auto& rows_vec = self->state.rows;
      for (int row = 0; row < rows; ++row)
        columns.emplace_back(rows_vec[row][column]);
      auto rp = self->make_response_promise<double>();
      self->fan_out_request<policy::select_all>(columns, infinite, get)
        .then(
          [=](std::vector<int> xs) mutable {
            assert(xs.size() == static_cast<size_t>(rows));
            rp.deliver(std::accumulate(xs.begin(), xs.end(), 0.0) / rows);
          },
          [=](error& err) mutable { rp.deliver(std::move(err)); });
      return rp;
    },

The policy select_any models a second common use case: sending a request to multiple receivers but only caring for the first arriving response.

Error Handling in Requests

Requests allow CAF to unambiguously correlate request and response messages. This is also true if the response is an error message. Hence, CAF allows to add an error handler as optional second parameter to then and await (this parameter is mandatory for receive). If no such handler is defined, the default error handler (see Error Handler) is used as a fallback in scheduled actors.

As an example, we consider a simple divider that returns an error on a division by zero. This examples uses a custom error category (see Errors).

using divider = typed_actor<result<double>(div_atom, double, double)>;

divider::behavior_type divider_impl() {
  return {
    [](div_atom, double x, double y) -> result<double> {
      if (y == 0.0)
        return math_error::division_by_zero;
      return x / y;
    },
  };
}

When sending requests to the divider, we can use a custom error handlers to report errors to the user like this:

  auto div = system.spawn(divider_impl);
  scoped_actor self{system};
  self->request(div, std::chrono::seconds(10), div_atom_v, x, y)
    .receive(
      [&](double z) { aout(self) << x << " / " << y << " = " << z << endl; },
      [&](const error& err) {
        aout(self) << "*** cannot compute " << x << " / " << y << " => "
                   << to_string(err) << endl;
      });

Delaying Messages

Messages can be delayed by using the function delayed_send, as illustrated in the following time-based loop example.

// uses a message-based loop to iterate over all animation steps
behavior dancing_kirby(event_based_actor* self) {
  using namespace std::literals::chrono_literals;
  // let's get it started
  self->send(self, update_atom_v, size_t{0});
  return {
    [=](update_atom, size_t step) {
      if (step == sizeof(animation_step)) {
        // we've printed all animation steps (done)
        cout << endl;
        self->quit();
        return;
      }
      // print given step
      draw_kirby(animation_steps[step]);
      // schedule next animation step
      self->delayed_send(self, 150ms, update_atom_v, step + 1);
    },
  };
}

Delayed send schedules messages based on relative timeouts. For absolute timeouts, use scheduled_send instead.

Delegating Messages

Actors can transfer responsibility for a request by using delegate. This enables the receiver of the delegated message to reply as usual—simply by returning a value from its message handler—and the original sender of the message will receive the response. The following diagram illustrates request delegation from actor B to actor C.

A                  B                  C
|                  |                  |
| ---(request)---> |                  |
|                  | ---(delegate)--> |
|                  X                  |---\
|                                     |   | compute
|                                     |   | result
|                                     |<--/
| <-------------(reply)-------------- |
|                                     X
X

Returning the result of delegate(...) from a message handler, as shown in the example below, suppresses the implicit response message and allows the compiler to check the result type when using statically typed actors.

using adder_actor = typed_actor<result<int32_t>(add_atom, int32_t, int32_t)>;

adder_actor::behavior_type worker_impl() {
  return {
    [](add_atom, int32_t x, int32_t y) { return x + y; },
  };
}
adder_actor::behavior_type server_impl(adder_actor::pointer self,
                                       adder_actor worker) {
  return {
    [=](add_atom add, int32_t x, int32_t y) {
      return self->delegate(worker, add, x, y);
    },
  };
}

void client_impl(event_based_actor* self, adder_actor adder, int32_t x,
                 int32_t y) {
  using namespace std::literals::chrono_literals;
  self->request(adder, 10s, add_atom_v, x, y).then([=](int32_t result) {
    aout(self) << x << " + " << y << " = " << result << std::endl;
  });
}

void caf_main(actor_system& sys) {
  auto worker = sys.spawn(worker_impl);
  auto server = sys.spawn(server_impl, sys.spawn(worker_impl));
  sys.spawn(client_impl, server, 1, 2);
}

Response Promises

Response promises allow an actor to send and receive other messages prior to replying to a particular request. Actors create a response promise using self->make_response_promise<Ts...>(), where Ts is a template parameter pack describing the promised return type. Dynamically typed actors simply call self->make_response_promise(). After retrieving a promise, an actor can fulfill it by calling the member function deliver(...), as shown in the following example.

using adder_actor = typed_actor<result<int32_t>(add_atom, int32_t, int32_t)>;

adder_actor::behavior_type worker_impl() {
  return {
    [](add_atom, int32_t x, int32_t y) { return x + y; },
  };
}
adder_actor::behavior_type server_impl(adder_actor::pointer self,
                                       adder_actor worker) {
  return {
    [=](add_atom, int32_t y, int32_t z) {
      auto rp = self->make_response_promise<int32_t>();
      self->request(worker, infinite, add_atom_v, y, z)
        .then([rp](int32_t result) mutable { rp.deliver(result); },
              [rp](error& err) mutable { rp.deliver(std::move(err)); });
      return rp;
    },
  };
}

void client_impl(event_based_actor* self, adder_actor adder, int32_t x,
                 int32_t y) {
  using namespace std::literals::chrono_literals;
  self->request(adder, 10s, add_atom_v, x, y).then([=](int32_t result) {
    aout(self) << x << " + " << y << " = " << result << std::endl;
  });
}

void caf_main(actor_system& sys) {
  auto worker = sys.spawn(worker_impl);
  auto server = sys.spawn(server_impl, sys.spawn(worker_impl));
  sys.spawn(client_impl, server, 1, 2);
}

This example is almost identical to the example for delegating messages. However, there is a big difference in the flow of messages. In our first version, the worker (C) directly responded to the client (A). This time, the worker sends the result to the server (B), which then fulfills the promise and thereby sends the result to the client:

A                  B                  C
|                  |                  |
| ---(request)---> |                  |
|                  | ---(request)---> |
|                  |                  |---\
|                  |                  |   | compute
|                  |                  |   | result
|                  |                  |<--/
|                  | <----(reply)---- |
|                  |                  X
| <----(reply)---- |
|                  X
X

Message Priorities

By default, all messages have the default priority, i.e., message_priority::normal. Actors can send urgent messages by setting the priority explicitly: send<message_priority::high>(dst, ...). Urgent messages are put into a different queue of the receiver’s mailbox. Hence, long wait delays can be avoided for urgent communication.

Scheduler

The CAF runtime maps N actors to M threads on the local machine. Applications built with CAF scale by decomposing tasks into many independent steps that are spawned as actors. In this way, sequential computations performed by individual actors are small compared to the total runtime of the application, and the attainable speedup on multi-core hardware is maximized in agreement with Amdahl’s law.

Decomposing tasks implies that actors are often short-lived. Assigning a dedicated thread to each actor would not scale well. Instead, CAF includes a scheduler that dynamically assigns actors to a pre-dimensioned set of worker threads. Actors are modeled as lightweight state machines. Whenever a waiting actor receives a message, it changes its state to ready and is scheduled for execution. CAF cannot interrupt running actors because it is implemented in user space. Consequently, actors that use blocking system calls such as I/O functions can suspend threads and create an imbalance or lead to starvation. Such “uncooperative” actors can be explicitly detached by the programmer by using the detached spawn option, e.g., system.spawn<detached>(my_actor_fun).

The performance of actor-based applications depends on the scheduling algorithm in use and its configuration. Different application scenarios require different trade-offs. For example, interactive applications such as shells or GUIs want to stay responsive to user input at all times, while batch processing applications demand only to perform a given task in the shortest possible time.

Aside from managing actors, the scheduler bridges actor and non-actor code. For this reason, the scheduler distinguishes between external and internal events. An external event occurs whenever an actor is spawned from a non-actor context or an actor receives a message from a thread that is not under the control of the scheduler. Internal events are send and spawn operations from scheduled actors.

Policies

The scheduler consists of a single coordinator and a set of workers. The coordinator is needed by the public API to bridge actor and non-actor contexts, but is not necessarily an active software entity.

The scheduler of CAF is fully customizable by using a policy-based design. The following class shows a concept class that lists all required member types and member functions. A policy provides the two data structures coordinator_data and worker_data that add additional data members to the coordinator and its workers respectively, e.g., work queues. This grants developers full control over the state of the scheduler.

struct scheduler_policy {
  struct coordinator_data;
  struct worker_data;
  void central_enqueue(Coordinator* self, resumable* job);
  void external_enqueue(Worker* self, resumable* job);
  void internal_enqueue(Worker* self, resumable* job);
  void resume_job_later(Worker* self, resumable* job);
  resumable* dequeue(Worker* self);
  void before_resume(Worker* self, resumable* job);
  void after_resume(Worker* self, resumable* job);
  void after_completion(Worker* self, resumable* job);
};

Whenever a new work item is scheduled—usually by sending a message to an idle actor—, one of the functions central_enqueue, external_enqueue, and internal_enqueue is called. The first function is called whenever non-actor code interacts with the actor system. For example when spawning an actor from main. Its first argument is a pointer to the coordinator singleton and the second argument is the new work item—usually an actor that became ready. The function external_enqueue is never called directly by CAF. It models the transfer of a task to a worker by the coordinator or another worker. Its first argument is the worker receiving the new task referenced in the second argument. The third function, internal_enqueue, is called whenever an actor interacts with other actors in the system. Its first argument is the current worker and the second argument is the new work item.

Actors reaching the maximum number of messages per run are re-scheduled with resume_job_later and workers acquire new work by calling dequeue. The two functions before_resume and after_resume allow programmers to measure individual actor runtime, while after_completion allows to execute custom code whenever a work item has finished execution by changing its state to done, but before it is destroyed. In this way, the last three functions enable developers to gain fine-grained insight into the scheduling order and individual execution times.

Work Stealing

The default policy in CAF is work stealing. The key idea of this algorithm is to remove the bottleneck of a single, global work queue. The original algorithm was developed for fully strict computations by Blumofe et al in 1994. It schedules any number of tasks to P workers, where P is the number of processors available.

Stealing of work items

Each worker dequeues work items from an individual queue until it is drained. Once this happens, the worker becomes a thief. It picks one of the other workers—usually at random—as a victim and tries to steal a work item. As a consequence, tasks (actors) are bound to workers by default and only migrate between threads as a result of stealing. This strategy minimizes communication between threads and maximizes cache locality. Work stealing has become the algorithm of choice for many frameworks. For example, Java’s Fork-Join (which is used by Akka), Intel’s Threading Building Blocks, several OpenMP implementations, etc.

CAF uses a double-ended queue for its workers, which is synchronized with two spinlocks. One downside of a decentralized algorithm such as work stealing is, that idle states are hard to detect. Did only one worker run out of work items or all? Since each worker has only local knowledge, it cannot decide when it could safely suspend itself. Likewise, workers cannot resume if new job items arrived at one or more workers. For this reason, CAF uses three polling intervals. Once a worker runs out of work items, it tries to steal items from others. First, it uses the aggressive polling interval. It falls back to a moderate interval after a predefined number of trials. After another predefined number of trials, it will finally use a relaxed interval.

Per default, the aggressive strategy performs 100 steal attempts with no sleep interval in between. The moderate strategy tries to steal 500 times with 50 microseconds sleep between two steal attempts. Finally, the relaxed strategy runs indefinitely but sleeps for 10 milliseconds between two attempts. These defaults can be overridden via system config at startup (see Configuring Actor Applications).

Work Sharing

Work sharing is an alternative scheduler policy in CAF that uses a single, global work queue. This policy uses a mutex and a condition variable on the central queue. Thus, the policy supports only limited concurrency but does not need to poll. Using this policy can be a good fit for low-end devices where power consumption is an important metric.

Registry

The actor registry in CAF keeps track of the number of running actors and allows to map actors to their ID or a custom atom (see Atoms) representing a name. The registry does not contain all actors. Actors have to be stored in the registry explicitly. Users can access the registry through an actor system by calling system.registry(). The registry stores actors using strong_actor_ptr (see Pointer).

Users can use the registry to make actors system-wide available by name. The Middleman uses the registry to keep track of all actors known to remote nodes in order to serialize and deserialize them. Actors are removed automatically when they terminate.

It is worth mentioning that the registry is not synchronized between connected actor system. Each actor system has its own, local registry in a distributed setting.

Types  
name_map unordered_map<atom_value, strong_actor_ptr>
   
Observers  
strong_actor_ptr get(actor_id) Returns the actor associated to given ID.
strong_actor_ptr get(atom_value) Returns the actor associated to given name.
name_map named_actors() Returns all name mappings.
size_t running() Returns the number of currently running actors.
   
Modifiers  
void put(actor_id, strong_actor_ptr) Maps an actor to its ID.
void erase(actor_id) Removes an ID mapping from the registry.
void put(atom_value, strong_actor_ptr) Maps an actor to a name.
void erase(atom_value) Removes a name mapping from the registry.

Reference Counting

Actors systems can span complex communication graphs that make it hard to decide when actors are no longer needed. As a result, manually managing lifetime of actors is merely impossible. For this reason, CAF implements a garbage collection strategy for actors based on weak and strong reference counts.

Shared Ownership in C++

The C++ standard library already offers shared_ptr and weak_ptr to manage objects with complex shared ownership. The standard implementation is a solid general purpose design that covers most use cases. Weak and strong references to an object are stored in a control block. However, CAF uses a slightly different design. The reason for this is twofold. First, we need the control block to store the identity of an actor. Second, we wanted a design that requires less indirections, because actor handles are used extensively copied for messaging, and this overhead adds up.

Before discussing the approach to shared ownership in CAF, we look at the design of shared pointers in the C++ standard library.

Shared pointer design in the C++ standard library

The figure above depicts the default memory layout when using shared pointers. The control block is allocated separately from the data and thus stores a pointer to the data. This is when using manually-allocated objects, for example shared_ptr<int> iptr{new int}. The benefit of this design is that one can destroy T independently from its control block. While irrelevant for small objects, it can become an issue for large objects. Notably, the shared pointer stores two pointers internally. Otherwise, dereferencing it would require to get the data location from the control block first.

Memory layout when using ``std::make_shared``

When using make_shared or allocate_shared, the standard library can store reference count and data in a single memory block as shown above. However, shared_ptr still has to store two pointers, because it is unaware where the data is allocated.

Memory layout with ``std::enable_shared_from_this``

Finally, the design of the standard library becomes convoluted when an object should be able to hand out a shared_ptr to itself. Classes must inherit from std::enable_shared_from_this to navigate from an object to its control block. This additional navigation path is required, because std::shared_ptr needs two pointers. One to the data and one to the control block. Programmers can still use make_shared for such objects, in which case the object is again stored along with the control block.

Smart Pointers to Actors

In CAF, we use a different approach than the standard library because (1) we always allocate actors along with their control block, (2) we need additional information in the control block, and (3) we can store only a single raw pointer internally instead of the two raw pointers std::shared_ptr needs. The following figure summarizes the design of smart pointers to actors.

Shared pointer design in CAF

CAF uses strong_actor_ptr instead of std::shared_ptr<...> and weak_actor_ptr instead of std::weak_ptr<...>. Unlike the counterparts from the standard library, both smart pointer types only store a single pointer.

Also, the control block in CAF is not a template and stores the identity of an actor (actor_id plus node_id). This allows CAF to access this information even after an actor died. The control block fits exactly into a single cache line (64 Bytes). This makes sure no false sharing occurs between an actor and other actors that have references to it. Since the size of the control block is fixed and CAF guarantees the memory layout enforced by actor_storage, CAF can compute the address of an actor from the pointer to its control block by offsetting it by 64 Bytes. Likewise, an actor can compute the address of its control block.

The smart pointer design in CAF relies on a few assumptions about actor types. Most notably, the actor object is placed 64 Bytes after the control block. This starting address is cast to abstract_actor*. Hence, T* must be convertible to abstract_actor* via reinterpret_cast. In practice, this means actor subclasses must not use virtual inheritance, which is enforced in CAF with a static_assert.

Strong and Weak References

A strong reference manipulates the strong refs counter as shown above. An actor is destroyed if there are zero strong references to it. If two actors keep strong references to each other via member variable, neither actor can ever be destroyed because they produce a cycle (see Breaking Cycles Manually). Strong references are formed by strong_actor_ptr, actor, and typed_actor<...> (see Actor References).

A weak reference manipulates the weak refs counter. This counter keeps track of how many references to the control block exist. The control block is destroyed if there are zero weak references to an actor (which cannot occur before strong refs reached zero as well). No cycle occurs if two actors keep weak references to each other, because the actor objects themselves can get destroyed independently from their control block. A weak reference is only formed by actor_addr (see Address).

Converting Actor References with actor_cast

The function actor_cast converts between actor pointers and handles. The first common use case is to convert a strong_actor_ptr to either actor or typed_actor<...> before being able to send messages to an actor. The second common use case is to convert actor_addr to strong_actor_ptr to upgrade a weak reference to a strong reference. Note that casting actor_addr to a strong actor pointer or handle can result in invalid handles. The syntax for actor_cast resembles builtin C++ casts. For example, actor_cast<actor>(x) converts x to an handle of type actor.

Breaking Cycles Manually

Cycles can occur only when using class-based actors when storing references to other actors via member variable. Stateful actors (see Stateful Actors) break cycles by destroying the state when an actor terminates, before the destructor of the actor itself runs. This means an actor releases all references to others automatically after calling quit. However, class-based actors have to break cycles manually, because references to others are not released until the destructor of an actor runs. Two actors storing references to each other via member variable produce a cycle and neither destructor can ever be called.

Class-based actors can break cycles manually by overriding on_exit() and calling destroy(x) on each handle (see Handle). Using a handle after destroying it is undefined behavior, but it is safe to assign a new value to the handle.

Errors

Errors in CAF have a code and a category, similar to std::error_code and std::error_condition. Unlike its counterparts from the C++ standard library, error is platform-neutral and serializable.

Class Interface

Constructors  
(Enum code) Constructs an error with given error code.
(Enum code, message context) Constructs an error with given error code and additional context.
   
Observers  
uint8_t code() Returns the error code as 8-bit integer.
type_id_t category() Returns the type ID of the Enum type used to construct this error.
message context() Returns additional context information
explicit operator bool() Returns code() != 0

Add Custom Error Categories

Adding custom error categories requires these steps:

  • Declare an enum class of type uint8_t with error codes starting at 1. CAF always interprets the value 0 as no error.
  • Assign a type ID to your enum type.
  • Specialize caf::is_error_code_enum for your enum type. For this step, CAF offers the macro CAF_ERROR_CODE_ENUM to generate the boilerplate code necessary.

The following example illustrates all these steps for a custom error code enum called math_error.

enum class math_error : uint8_t {
  division_by_zero = 1,
};

std::string to_string(math_error x) {
  switch (x) {
    case math_error::division_by_zero:
      return "division_by_zero";
    default:
      return "-unknown-error-";
  }
}

bool from_string(std::string_view in, math_error& out) {
  if (in == "division_by_zero") {
    out = math_error::division_by_zero;
    return true;
  } else {
    return false;
  }
}

bool from_integer(uint8_t in, math_error& out) {
  if (in == 1) {
    out = math_error::division_by_zero;
    return true;
  } else {
    return false;
  }
}

template <class Inspector>
bool inspect(Inspector& f, math_error& x) {
  return caf::default_enum_inspect(f, x);
}

CAF_BEGIN_TYPE_ID_BLOCK(divider, first_custom_type_id)

  CAF_ADD_TYPE_ID(divider, (math_error))

CAF_END_TYPE_ID_BLOCK(divider)

CAF_ERROR_CODE_ENUM(math_error)

Default Error Codes

The enum type sec (for System Error Code) provides many error codes for common failures in actor systems:

/// SEC stands for "System Error Code". This enum contains error codes for
/// ::actor_system and its modules.
enum class sec : uint8_t {
  /// No error.
  none = 0,
  /// Indicates that an actor dropped an unexpected message.
  unexpected_message = 1,
  /// Indicates that a response message did not match the provided handler.
  unexpected_response,
  /// Indicates that the receiver of a request is no longer alive.
  request_receiver_down,
  /// Indicates that a request message timed out.
  request_timeout,
  /// Indicates that requested group module does not exist.
  no_such_group_module = 5,
  /// Unpublishing or connecting failed: no actor bound to given port.
  no_actor_published_at_port,
  /// Connecting failed because a remote actor had an unexpected interface.
  unexpected_actor_messaging_interface,
  /// Migration failed because the state of an actor is not serializable.
  state_not_serializable,
  /// An actor received an unsupported key for `('sys', 'get', key)` messages.
  unsupported_sys_key,
  /// An actor received an unsupported system message.
  unsupported_sys_message = 10,
  /// A remote node disconnected during CAF handshake.
  disconnect_during_handshake,
  /// Tried to forward a message via BASP to an invalid actor handle.
  cannot_forward_to_invalid_actor,
  /// Tried to forward a message via BASP to an unknown node ID.
  no_route_to_receiving_node,
  /// Middleman could not assign a connection handle to a broker.
  failed_to_assign_scribe_from_handle,
  /// Middleman could not assign an acceptor handle to a broker.
  failed_to_assign_doorman_from_handle = 15,
  /// User requested to close port 0 or to close a port not managed by CAF.
  cannot_close_invalid_port,
  /// Middleman could not connect to a remote node.
  cannot_connect_to_node,
  /// Middleman could not open requested port.
  cannot_open_port,
  /// A C system call in the middleman failed.
  network_syscall_failed,
  /// A function received one or more invalid arguments.
  invalid_argument = 20,
  /// A network socket reported an invalid network protocol family.
  invalid_protocol_family,
  /// Middleman could not publish an actor because it was invalid.
  cannot_publish_invalid_actor,
  /// A remote spawn failed because the provided types did not match.
  cannot_spawn_actor_from_arguments,
  /// Serialization failed because there was not enough data to read.
  end_of_stream,
  /// Serialization failed because no CAF context is available.
  no_context = 25,
  /// Serialization failed because CAF misses run-time type information.
  unknown_type,
  /// Serialization of actors failed because no proxy registry is available.
  no_proxy_registry,
  /// An exception was thrown during message handling.
  runtime_error,
  /// Linking to a remote actor failed because actor no longer exists.
  remote_linking_failed,
  /// Subscribing to a stream failed because it was invalid.
  invalid_stream = 30,
  /// Subscribing to a stream failed because it can only be subscribed to once.
  cannot_resubscribe_stream,
  /// A function view was called without assigning an actor first.
  bad_function_call = 40,
  /// Feature is disabled in the actor system config.
  feature_disabled,
  /// Failed to open file.
  cannot_open_file,
  /// A socket descriptor argument is invalid.
  socket_invalid,
  /// A socket became disconnected from the remote host (hang up).
  socket_disconnected,
  /// An operation on a socket (e.g. `poll`) failed.
  socket_operation_failed = 45,
  /// A resource is temporarily unavailable or would block.
  unavailable_or_would_block,
  /// Connection refused because of incompatible CAF versions.
  incompatible_versions,
  /// Connection refused because of incompatible application IDs.
  incompatible_application_ids,
  /// Received a malformed message from another node.
  malformed_message,
  /// The middleman closed a connection because it failed to serialize or
  /// deserialize a payload.
  serializing_basp_payload_failed = 50,
  /// The middleman closed a connection to itself or an already connected node.
  redundant_connection,
  /// Resolving a path on a remote node failed.
  remote_lookup_failed,
  /// Serialization failed because actor_system::tracing_context is null.
  no_tracing_context,
  /// No request produced a valid result.
  all_requests_failed,
  /// Deserialization failed because an invariant got violated after reading
  /// the content of a field.
  field_invariant_check_failed = 55,
  /// Deserialization failed because a setter rejected the input.
  field_value_synchronization_failed,
  /// Deserialization failed because the source announced an invalid type.
  invalid_field_type,
  /// Serialization failed because a type was flagged as unsafe message type.
  unsafe_type,
  /// Serialization failed because a save callback returned `false`.
  save_callback_failed,
  /// Deserialization failed because a load callback returned `false`.
  load_callback_failed = 60,
  /// Converting between two types failed.
  conversion_failed,
  /// A network connection was closed by the remote side.
  connection_closed,
  /// An operation failed because run-time type information diverged from the
  /// expected type.
  type_clash,
  /// An operation failed because the callee does not implement this
  /// functionality.
  unsupported_operation,
  /// A key lookup failed.
  no_such_key = 65,
  /// An destroyed a response promise without calling deliver or delegate on it.
  broken_promise,
  /// Disconnected from a BASP node after reaching the connection timeout.
  connection_timeout,
  /// Signals that an actor fell behind a periodic action trigger. After raising
  /// this error, an @ref actor_clock stops scheduling the action.
  action_reschedule_failed,
  /// Attaching to an observable failed because the target is invalid.
  invalid_observable,
  /// Attaching to an observable failed because the target already reached its
  /// maximum observer count.
  too_many_observers = 70,
  /// Signals that an operation failed because the target has been disposed.
  disposed,
  /// Failed to open a resource.
  cannot_open_resource,
  /// Received malformed data.
  protocol_error,
  /// Encountered faulty logic in the program.
  logic_error,
};

Default Exit Reasons

A special kind of error codes are exit reasons of actors. These errors are usually fail states set by the actor system itself. The two exceptions are exit_reason::user_shutdown and exit_reason::kill. The former is used in CAF to signalize orderly, user-requested shutdown and can be used by programmers in the same way. The latter terminates an actor unconditionally when used in send_exit, even for actors that override the default handler (see Exit Handler).

/// This error category represents fail conditions for actors.
enum class exit_reason : uint8_t {
  /// Indicates that an actor finished execution without error.
  normal = 0,
  /// Indicates that the exit reason for this actor is unknown, i.e.,
  /// the actor has been terminated and no longer exists.
  unknown,
  /// Indicates that an actor pool unexpectedly ran out of workers.
  out_of_workers,
  /// Indicates that an actor was forced to shutdown by a user-generated event.
  user_shutdown,
  /// Indicates that an actor was killed unconditionally.
  kill,
  /// Indicates that an actor finished execution because a connection
  /// to a remote link was closed unexpectedly.
  remote_link_unreachable,
  /// Indicates that an actor was killed because it became unreachable.
  unreachable
};

Configuring Actor Applications

CAF configures applications at startup using an actor_system_config or a user-defined subclass of that type. The config objects allow users to add custom types, to load modules, and to fine-tune the behavior of loaded modules with command line options or configuration files system-config-options.

The following code example is a minimal CAF application with a Middleman but without any custom configuration options.

void caf_main(actor_system& system) {
  // ...
}
CAF_MAIN(io::middleman)

The compiler expands this example code to the following.

void caf_main(actor_system& system) {
  // ...
}
int main(int argc, char** argv) {
  return exec_main<io::middleman>(caf_main, argc, argv);
}

The function exec_main performs several steps:

  1. Initialize all meta objects for the type ID blocks listed in CAF_MAIN.
  2. Create a config object. If caf_main has two arguments, then CAF assumes that the second argument is the configuration and the type gets derived from that argument. Otherwise, CAF uses actor_system_config.
  3. Parse command line arguments and configuration file.
  4. Load all modules requested in CAF_MAIN.
  5. Create an actor system.
  6. Call caf_main with the actor system and optionally with config.

When implementing the steps performed by CAF_MAIN by hand, the main function would resemble the following (pseudo) code:

int main(int argc, char** argv) {
  // Initialize the global type information before anything else.
  init_global_meta_objects<...>();
  core::init_global_meta_objects();
  // Create the config.
  actor_system_config cfg;
  // Read CLI options.
  cfg.parse(argc, argv);
  // Return immediately if a help text was printed.
  if (cfg.cli_helptext_printed)
    return 0;
  // Load modules.
  cfg.load<...>();
  // Create the actor system.
  actor_system sys{cfg};
  // Run user-defined code.
  caf_main(sys, cfg);
}

Using CAF_MAIN simply automates that boilerplate code. A minimal example with a custom type ID block as well as a custom configuration class with the I/O module loaded looks as follows:

CAF_BEGIN_TYPE_ID_BLOCK(my, first_custom_type_id)

  // ...

CAF_END_TYPE_ID_BLOCK(my)


class my_config : public actor_system_config {
public:
  my_config() {
    // ...
  }
};

void caf_main(actor_system& system, const my_config& cfg) {
  // ...
}

CAF_MAIN(id_block::my, io::middleman)

Loading Modules

The simplest way to load modules is to use the macro CAF_MAIN and to pass a list of all requested modules, as shown below.

void caf_main(actor_system& system) {
  // ...
}
CAF_MAIN(mod1, mod2, ...)

Alternatively, users can load modules in user-defined config classes.

class my_config : public actor_system_config {
public:
  my_config() {
    load<mod1>();
    load<mod2>();
    // ...
  }
};

The third option is to simply call x.load<mod1>() on a config object before initializing an actor system with it.

Program Options

CAF organizes program options in categories and parses CLI arguments as well as configuration files. CLI arguments override values in the configuration file which override hard-coded defaults. Users can add any number of custom program options by implementing a subtype of actor_system_config. The example below adds three options to the global category.

class config : public actor_system_config {
public:
  uint16_t port = 0;
  std::string host = "localhost";
  bool server_mode = false;

  config() {
    opt_group{custom_options_, "global"}
      .add(port, "port,p", "set port")
      .add(host, "host,H", "set host (ignored in server mode)")
      .add(server_mode, "server-mode,s", "enable server mode");
  }
};

We create a new global category in custom_options_. Each following call to add then appends individual options to the category. The first argument to add is the associated variable. The second argument is the name for the parameter, optionally suffixed with a comma-separated single-character short name. The short name is only considered for CLI parsing and allows users to abbreviate commonly used option names. The third and final argument to add is a help text.

The custom config class allows end users to set the port for the application to 42 with either -p 42 (short name) or --port=42 (long name). The long option name is prefixed by the category when using a different category than global. For example, adding the port option to the category foo means end users have to type --foo.port=42 when using the long name. Short names are unaffected by the category, but have to be unique.

Boolean options do not require arguments. The member variable server_mode is set to true if the command line contains either --server-mode or -s.

The example uses member variables for capturing user-provided settings for simplicity. However, this is not required. For example, add<bool>(...) allows omitting the first argument entirely. All values of the configuration are accessible with get_or. Note that all global options can omit the "global." prefix.

CAF adds the program options help (with short names -h and -?) as well as long-help to the global category.

Configuration Files

The default name for the configuration file is caf-application.conf. Users can change the file path by passing --config-file=<path> on the command line.

The syntax for the configuration files provides a clean JSON-like grammar that is similar to other commonly used configuration formats. In a nutshell, instead of writing:

{
  "my-category" : {
    "first" : 1,
    "second" : 2
  }
}

you can reduce the noise by writing:

my-category {
  first = 1
  second = 2
}

Note

CAF will accept both of the examples above and will produce the same result. We recommend using the second style, mostly because it reduces syntax noise.

Unlike regular JSON, CAF’s configuration format supports a couple of additional syntax elements such as comments (comments start with # and end at the end of the line) and, most notably, does not accept null.

The parses uses the following syntax for writing key-value pairs:

key=true is a boolean
key=1 is an integer
key=1.0 is an floating point number
key=1ms is an timespan
key='foo' is a string
key="foo" is a string
key=[0, 1, ...] is as a list
key={a=1, b=2, ...} is a dictionary (map)

The following example configuration file lists all standard options in CAF and their default value. Note that some options such as scheduler.max-threads are usually detected at runtime and thus have no hard-coded default.

# This file shows all possible parameters with defaults. For some values, CAF
# computes a value at runtime if the configuration does not provide a value. For
# example, "caf.scheduler.max-threads" has no hard-coded default and instead
# adjusts to the number of cores available.
caf {
  # Parameters selecting a default scheduler.
  scheduler {
    # Use the work stealing implementation. Accepted alternative: "sharing".
    policy = "stealing"
    # Maximum number of messages actors can consume in single run (int64 max).
    max-throughput = 9223372036854775807
    # # Maximum number of threads for the scheduler. No hardcoded default.
    # max-threads = ... (detected at runtime)
  }
  # Parameters for the work stealing scheduler. Only takes effect if
  # caf.scheduler.policy is set to "stealing".
  work-stealing {
    # Number of zero-sleep-interval polling attempts.
    aggressive-poll-attempts = 100
    # Frequency of steal attempts during aggressive polling.
    aggressive-steal-interval = 10
    # Number of moderately aggressive polling attempts.
    moderate-poll-attempts = 500
    # Frequency of steal attempts during moderate polling.
    moderate-steal-interval = 5
    # Sleep interval between poll attempts.
    moderate-sleep-duration = 50us
    # Frequency of steal attempts during relaxed polling.
    relaxed-steal-interval = 1
    # Sleep interval between poll attempts.
    relaxed-sleep-duration = 10ms
  }
  # Parameters for the I/O module.
  middleman {
    # Configures whether MMs try to span a full mesh.
    enable-automatic-connections = false
    # Application identifiers of this node, prevents connection to other CAF
    # instances with incompatible identifiers.
    app-identifiers = ["generic-caf-app"]
    # Maximum number of consecutive I/O reads per broker.
    max-consecutive-reads = 50
    # Heartbeat message interval in ms (0 disables heartbeating).
    heartbeat-interval = 0ms
    # Configures whether the MM attaches its internal utility actors to the
    # scheduler instead of dedicating individual threads (needed only for
    # deterministic testing).
    attach-utility-actors = false
    # Configures whether the MM starts a background thread for I/O activity.
    # Setting this to true allows fully deterministic execution in unit test and
    # requires the user to trigger I/O manually.
    manual-multiplexing = false
    # # Configures how many background workers are spawned for deserialization.
    # # No hardcoded default.
    # workers = ... (detected at runtime)
  }
  # Parameters for logging.
  logger {
    # # Note: File logging is disabled unless a 'file' section exists that
    # # contains a setting for 'verbosity'.
    # file {
    #   # File name template for output log files.
    #   path = "actor_log_[PID]_[TIMESTAMP]_[NODE].log"
    #   # Format for rendering individual log file entries.
    #   format = "%r %c %p %a %t %C %M %F:%L %m%n"
    #   # Minimum severity of messages that are written to the log. One of:
    #   # 'quiet', 'error', 'warning', 'info', 'debug', or 'trace'.
    #   verbosity = "trace"
    #   # A list of components to exclude in file output.
    #   excluded-components = []
    # }
    # # Note: Console output is disabled unless a 'console' section exists that
    # # contains a setting for 'verbosity'.
    # console {
    #   # Enabled colored output when writing to a TTY if set to true.
    #   colored = true
    #   # Format for printing log lines (implicit newline at the end).
    #   format = "[%c:%p] %d %m"
    #   # Minimum severity of messages that are written to the console. One of:
    #   # 'quiet', 'error', 'warning', 'info', 'debug', or 'trace'.
    #   verbosity = "trace"
    #   # A list of components to exclude in console output.
    #   excluded-components = []
    # }
  }
}

Adding Custom Message Types

CAF requires serialization support for all of its message types (see Type Inspection). However, CAF also needs a mapping of unique type IDs to user-defined types at runtime. This is required to deserialize arbitrary messages from the network.

The type IDs are assigned by listing all custom types in a type ID block. CAF assigns ascending IDs to each type by in the block as well as storing the type name. In the following example, we forward-declare the types foo and foo2 and register them to CAF in a type ID block. The name of the type ID block is arbitrary, but it must be a valid C++ identifier.

struct foo;
struct foo2;

CAF_BEGIN_TYPE_ID_BLOCK(custom_types_1, first_custom_type_id)

  CAF_ADD_TYPE_ID(custom_types_1, (foo))
  CAF_ADD_TYPE_ID(custom_types_1, (foo2))
  CAF_ADD_TYPE_ID(custom_types_1, (std::pair<int32_t, int32_t>) )

CAF_END_TYPE_ID_BLOCK(custom_types_1)

Aside from a type ID, CAF also requires an inspect overload in order to be able to serialize objects. As an introductory example, we (again) use the following POD type foo.

struct foo {
  std::vector<int> a;
  int b;
};

template <class Inspector>
bool inspect(Inspector& f, foo& x) {
  return f.object(x).fields(f.field("a", x.a), f.field("b", x.b));
}

By assigning type IDs and providing inspect overloads, we provide static and compile-time information for all our types. However, CAF also needs some information at run-time for deserializing received data. The function init_global_meta_objects takes care of registering all the state we need at run-time. This function usually gets called by CAF_MAIN automatically. When not using this macro, users must call init_global_meta_objects before any other CAF function.

Adding Custom Error Types

Adding a custom error type to the system is a convenience feature to allow improve the string representation. Error types can be added by implementing a render function and passing it to add_error_category, as shown in Add Custom Error Categories.

Adding Custom Actor Types experimental

Adding actor types to the configuration allows users to spawn actors by their name. In particular, this enables spawning of actors on a different node (see Remote Spawning of Actors). For our example configuration, we consider the following simple calculator actor.

using calculator
  = caf::typed_actor<caf::result<int32_t>(caf::add_atom, int32_t, int32_t),
                     caf::result<int32_t>(caf::sub_atom, int32_t, int32_t)>;

Adding the calculator actor type to our config is achieved by calling add_actor_type. After calling this in our config, we can spawn the calculator anywhere in the distributed actor system (assuming all nodes use the same config). Note that the handle type still requires a type ID (see custom-message-types).

Our final example illustrates how to spawn a calculator locally by using its type name. Because the dynamic type name lookup can fail and the construction arguments passed as message can mismatch, this version of spawn returns expected<T>.

auto x = system.spawn<calculator>("calculator", make_message());
if (! x) {
  std::cerr << "*** unable to spawn calculator: " << to_string(x.error())
            << std::endl;
  return;
}
calculator c = std::move(*x);

Adding dynamically typed actors to the config is achieved in the same way. When spawning a dynamically typed actor in this way, the template parameter is simply actor. For example, spawning an actor “foo” which requires one string is created with:

auto worker = system.spawn<actor>("foo", make_message("bar"));

Because constructor (or function) arguments for spawning the actor are stored in a message, only actors with appropriate input types are allowed. For example, pointer types are illegal. Hence users need to replace C-strings with std::string.

Log Output

Logging is disabled in CAF per default. It can be enabled by setting the --with-log-level= option of the configure script to one of error, warning, info, debug, or trace (from least output to most). Alternatively, setting the CMake variable CAF_LOG_LEVEL to one of these values has the same effect.

All logger-related configuration options listed here and in system-config-options are silently ignored if logging is disabled.

File

File output is disabled per default. Setting caf.logger.file.verbosity to a valid severity level causes CAF to print log events to the file specified in caf.logger.file.path.

The caf.logger.file.path may contain one or more of the following placeholders:

Variable Output
[PID] The OS-specific process ID.
[TIMESTAMP] The UNIX timestamp on startup.
[NODE] The node ID of the CAF system.

Console

Console output is disabled per default. Setting caf.logger.console.verbosity to a valid severity level causes CAF to print log events to std::clog.

Format Strings

CAF uses log4j-like format strings for configuring printing of individual events via caf.logger.file.format and caf.logger.console.format. Note that format modifiers are not supported at the moment. The recognized field identifiers are:

Character Output
c The category/component.
C The full qualifier of the current function. For example, the qualifier of void ns::foo::bar() is printed as ns.foo.
d The date in ISO 8601 format, i.e., "YYYY-MM-DDThh:mm:ss".
F The file name.
L The line number.
m The user-defined log message.
M The name of the current function. For example, the name of void ns::foo::bar() is printed as bar.
n A newline.
p The priority (severity level).
r Elapsed time since starting the application in milliseconds.
t ID of the current thread.
a ID of the current actor (or actor0 when not logging inside an actor).
% A single percent sign.

Filtering

The two configuration options caf.logger.file.excluded-components and caf.logger.console.excluded-components reduce the amount of generated log events in addition to the minimum severity level. These parameters are lists of component names that shall be excluded from any output.

Managing Groups of Workers experimental

When managing a set of workers, a central actor often dispatches requests to a set of workers. For this purpose, the class actor_pool implements a lightweight abstraction for managing a set of workers using a dispatching policy. Unlike groups, pools usually own their workers.

Pools are created using the static member function make, which takes either one argument (the policy) or three (number of workers, factory function for workers, and dispatching policy). After construction, one can add new workers via messages of the form ('SYS', 'PUT', worker), remove workers with ('SYS', 'DELETE', worker), and retrieve the set of workers as vector<actor> via ('SYS', 'GET').

An actor pool takes ownership of its workers. When forced to quit, it sends an exit messages to all of its workers, forcing them to quit as well. The pool also monitors all of its workers.

Pools do not cache messages, but enqueue them directly in a workers mailbox. Consequently, a terminating worker loses all unprocessed messages. For more advanced caching strategies, such as reliable message delivery, users can implement their own dispatching policies.

Dispatching Policies

A dispatching policy is a functor with the following signature:

using uplock = upgrade_lock<detail::shared_spinlock>;
using policy = std::function<void (actor_system& sys,
                                   uplock& guard,
                                   const actor_vec& workers,
                                   mailbox_element_ptr& ptr,
                                   execution_unit* host)>;

The argument guard is a shared lock that can be upgraded for unique access if the policy includes a critical section. The second argument is a vector containing all workers managed by the pool. The argument ptr contains the full message as received by the pool. Finally, host is the current scheduler context that can be used to enqueue workers into the corresponding job queue.

The actor pool class comes with a set predefined policies, accessible via factory functions, for convenience.

actor_pool::policy actor_pool::round_robin();

This policy forwards incoming requests in a round-robin manner to workers. There is no guarantee that messages are consumed, i.e., work items are lost if the worker exits before processing all of its messages.

actor_pool::policy actor_pool::broadcast();

This policy forwards each message to all workers. Synchronous messages to the pool will be received by all workers, but the client will only recognize the first arriving response message—or error—and discard subsequent messages. Note that this is not caused by the policy itself, but a consequence of forwarding synchronous messages to more than one actor.

actor_pool::policy actor_pool::random();

This policy forwards incoming requests to one worker from the pool chosen uniformly at random. Analogous to round_robin, this policy does not cache or redispatch messages.

using join = function<void (T&, message&)>;
using split = function<void (vector<pair<actor, message>>&, message&)>;
template <class T>
static policy split_join(join jf, split sf = ..., T init = T());

This policy models split/join or scatter/gather work flows, where a work item is split into as many tasks as workers are available and then the individuals results are joined together before sending the full result back to the client.

The join function is responsible for “glueing” all result messages together to create a single result. The function is called with the result object (initialed using init) and the current result messages from a worker.

The first argument of a split function is a mapping from actors (workers) to tasks (messages). The second argument is the input message. The default split function is a broadcast dispatching, sending each worker the original request.

Data Flows

Data flows, or streams, are potentially unbound sequences of values. The flow API in CAF makes it easy to generate, transform, and consume observable sequences with a ReactiveX-style interface.

Data flows in CAF use backpressure to make sure that fast senders cannot overwhelm receivers.

We do not assume prior experience with ReactiveX for this chapter and there are some key differences to ReactiveX implementations that we point out in Key Differences to ReactiveX.

Introduction

The fundamental building blocks of the flow API are observable, observer and subscription.

observable
Emits a potentially unbound sequence of values to observers that have subscribed to the observable. Offers the single member function subscribe to add more observers.
observer
Subscribes to and consumes values from an observable. This interface bundles callbacks for the observable, namely on_subscribe, on_next, on_complete and on_error.
subscription
Manages the flow of items between an observable and an observer. An observer calls request to ask for more items or dispose to stop receiving data.

When working with data flows, these interfaces usually remain hidden and applications leverage high-level operators that either generate or transform an observable. For example, the code snippet below illustrates a trivial data flow for integers inside a single actor that only uses the high-level composition API without any manual setup for observers or subscriptions:

void caf_main(caf::actor_system& sys, const config& cfg) {
  sys.spawn([n = cfg.n](caf::event_based_actor* self) {
    self
      // Get an observable factory.
      ->make_observable()
      // Produce an integer sequence starting at 1, i.e., 1, 2, 3, ...
      .iota(1)
      // Only take the requested number of items from the infinite sequence.
      .take(n)
      // Print each integer.
      .for_each([](int x) { std::cout << x << std::endl; });
  });
}

In the concurrency model of CAF, all of these building blocks describe processing steps that happen inside of an actor. The actor owns all of its observer, observable and subscription objects and they cannot outlive their parent actor. This means that an observable must not be shared with others.

To move data from one actor to another, CAF provides asynchronous buffers. The following figure depicts the general architecture when working with data flows across actor boundaries:

CAF flow running through two actors.

Here, actor A creates an observbale and then applies the flat_map and filter operators to it. The resulting items then flow into an asynchronous buffer that connects to actor B. Actor B then applies the map and filter operators to the incoming data before terminating the data flow, e.g., by printing each value.

Concurrent Processing

Flows that only run inside a single actors are of course quite useless outside of toy examples. For running different parts of a data flow on different actors, CAF offers two APIs: one for setting up a processing chain declaratively and one for setting up processing chains dynamically.

Declarative Setup: observe_on

If the entire processing chain is known at coding time, observe_on provides the easiest way to assign work to individual actors. The following example revisits our first example, but this time generates the numbers on one actor and then prints them on another.

void caf_main(caf::actor_system& sys, const config& cfg) {
  // Create two actors without actually running them yet.
  using actor_t = caf::event_based_actor;
  auto [src, launch_src] = sys.spawn_inactive<actor_t>();
  auto [snk, launch_snk] = sys.spawn_inactive<actor_t>();
  // Define our data flow: generate data on `src` and print it on `snk`.
  src
    // Get an observable factory.
    ->make_observable()
    // Produce an integer sequence starting at 1, i.e., 1, 2, 3, ...
    .iota(1)
    // Only take the requested number of items from the infinite sequence.
    .take(cfg.n)
    // Switch to `snk` for further processing.
    .observe_on(snk)
    // Print each integer.
    .for_each([](int x) { std::cout << x << std::endl; });
  // Allow the actors to run. After this point, we may no longer dereference
  // the `src` and `snk` pointers! Calling these manually is optional. When
  // removing these two lines, CAF automatically launches the actors at scope
  // exit.
  launch_src();
  launch_snk();
}

Please note that calling observe_on requires that the target actor is inactive. Otherwise, this function call results in unsynchronized state access.

Dynamic Setup: Asynchronous Buffers

Our second option for spanning data flows across multiple actors is using SPSC (Single Producer Single Consumer) buffers. This option is more general. In fact, observe_on internally uses these buffers for connecting the actors. Further, the buffers allows bridging flows between actor and non-actor code.

While one could use an SPSC buffer directly, they usually remain hidden behind another abstraction: asynchronous resources. The resources in CAF usually come in pairs and users may create new ones by calling make_spsc_buffer_resource. This function returns a producer resource and a consumer resource. With these two resources, we can then spawn actors that open the resources for either reading or writing.

To illustrate how the API pieces fit together, we revisit our example a third time. This time, we spawn the actors individually and connect them via the buffer resources:

void caf_main(caf::actor_system& sys, const config& cfg) {
  auto [snk_res, src_res] = caf::async::make_spsc_buffer_resource<int>();
  sys.spawn(sink, std::move(snk_res));
  sys.spawn(source, std::move(src_res), cfg.n);
}

In this iteration of our example, we have moved the implementation for the source and sink actors to their own functions. The source once again creates the data, only this time we subscribe the buffer to the generated sequence:

// Simple source for generating a stream of integers from 1 to n.
void source(caf::event_based_actor* self,
            caf::async::producer_resource<int> out, size_t n) {
  self
    // Get an observable factory.
    ->make_observable()
    // Produce an integer sequence starting at 1, i.e., 1, 2, 3, ...
    .iota(1)
    // Only take the requested number of items from the infinite sequence.
    .take(n)
    // Subscribe the resource to the sequence, thereby starting the stream.
    .subscribe(out);
}

For the sink, we generate an observable from the consumer resource and then once more call for_each:

// Simple sink for consuming a stream of integers, printing it to stdout.
void sink(caf::event_based_actor* self, caf::async::consumer_resource<int> in) {
  self
    // Get an observable factory.
    ->make_observable()
    // Lift the input to an observable flow.
    .from_resource(std::move(in))
    // Print each integer.
    .for_each([](int x) { std::cout << x << std::endl; });
}

Building and Transforming Observables

When building processing pipelines, CAF fuses as many processing steps as possible into a single C++ object. In our examples, we composed the source part like this: self->make_observable().from_callable(...).take(...)....

The first bit, self->make_observable(), returns an observable_builder. This class implements factory functions for creating observable sequences from containers, repeated values, and so on. However, most functions do not actually return an observable. Instead, they return a generation<...> object.

The generation class is a variadic template that allows CAF to incrementally define consecutive processing steps. In our example, we call from_callable on the builder object, which returns a generation<callable_source<...>>. The generation is meant as temporary object only. Hence, most member functions may only get called on an rvalue.

After calling .take(...) on the returned generation, we get a new temporary object of type generation<callable_source<...>, limit_step<...>>.

The generation class also mimics the interface of observable. When calling a member function that requires an actual observable, CAF uses the blueprint stored in the generation to create an actual observable object and then forward the member function call. For example, calling for_each on a generation internally constructs the observable and then calls for_each on that new object.

Users can also call as_observable on a generation explicitly to turn the blueprint into an actual observable sequence.

By delaying the construction of actual observable instances, CAF can fuse consecutive steps into single objects. This reduces the number of heap allocations and also accelerates processing, since the fused processing steps result in simple function call chains without subscriptions and backpressure between them.

Analogues to the generation class for creating new observables from inputs, CAF uses a template called transformation that represents a blueprint for applying operators to existing observables.

Operators

Most operators transform an observable by applying one or more processing steps on all observed values and then emit the result as a new observable. Since the result of a transformation usually is new observable, these operators compose into complex data stream operations.

The operators presented here are available on the template classes observable, generation and transformation.

Concat

The concat operator takes multiple input observables and re-emits the observed items as a single sequence of items without interleaving them.

_images/concat.svg

Concat Map

The concat_map operator takes a function object converts a given input to an observable and then applies concat to all of them.

_images/concat_map.svg

Distinct

The distinct operator makes all items unique by filtering all items have been emitted in the past.

_images/distinct.svg

Element At

The element_at operator re-emits the item at a given index while ignoring all other items.

_images/element_at.svg

Filter

The filter operator re-emits items from its input observable that pass a predicate test.

_images/filter.svg

First

The first operator re-emits only the first item of the input observable.

_images/first.svg

Flat Map

The flat_map operator takes a function object converts a given input to an observable and then applies merge to all of them.

_images/flat_map.svg

Head and Tail

The head_and_tail operator splits an observable into its first item and an observable for the remainder.

_images/head_and_tail.svg

Ignore Elements

The ignore_elements operator ignores all items of the input observable and only emits the completion (or error) signal.

_images/ignore_elements.svg

Last

The last operator re-emits only the last item of the input observable.

_images/last.svg

Map

The map operator applies a unary operation to all items of the input observable and re-emits the resulting items. Similar to std::transform.

_images/map.svg

Merge

The merge operator takes multiple input observables and re-emits the observed items as a single sequence of items as soon as they appear.

_images/merge.svg

Observe On

The observe_on operator pipes data from one actor to another through an asynchronous buffer. The target actor must not run at the point of calling this operator. In the image below, alice (red) and bob (blue) are two actors.

_images/observe_on.svg

Prefix and Tail

The head_and_tail operator splits an observable into its first n items (stores in a caf::cow_vector) and an observable for the remainder.

_images/prefix_and_tail.svg

Reduce

The reduce operator is similar to std::accumulate, only that it operates on an observable instead of an iterator range.

_images/reduce.svg

Ref Count

The ref_count operator turns a connectable back to a regular observable by automatically calling connect as soon as there is an initial “reference” (subscription). After the last “reference” goes away (no more subscription), the ref_count operators unsubscribes from its source.

_images/ref_count.svg

Skip Last

The skip_last operator re-emits all but the last n items from its input observable.

_images/skip_last.svg

Sum

The sum operator accumulates all items and emits the result after the input observable has completed.

_images/sum.svg

Take Last

The take_last operator re-emits the last n items from its input observable.

_images/take_last.svg

Take

The take operator re-emits the first n items from its input observable.

_images/take.svg

Take While

The take_while operator re-emits items from its input observable until its predicate returns false.

_images/take_while.svg

To Vector

The to_vector operator collects all items and emits a single vector containing all observed items after the source observable has completed.

_images/to_vector.svg

Notes on Performance

When working with data flows in CAF, it is important to remember that values are frequently copied and buffered. In languages with call-by-reference semantics such as Java, this is not an issue since the flows basically pass pointers down the chain. In C++ however, programmers must choose wisely what data types are used in a flow.

Passing down types such as std::string or std::vector will invariably slow down your application to a crawl. CAF offers three classes that help mitigate this problem: caf::cow_string, caf::cow_vector and caf::cow_tuple. These type are thin wrappers around their standard library counterpart that add copy-on-write (COW) semantics. Internally, all COW-types have a pointer to the actual data with a reference count. This makes them cheap to copy and save to use in a data flow. Most of the time, data in a flow does not need to change after creating and is de-facto immutable. However, the COW-optimization still gives you a mutable reference if you really need it and you make a deep copy only if you must, i.e., if there are multiple references to the data.

Key Differences to ReactiveX

Observables are not thread-safe. They describe a flow of data within an actor and are thus considered private to an actor.

CAF is more “opinionated” than ReactiveX when it comes to concurrency and ownership. The intended way for connecting concurrent parts of the system is by creating buffer resources and turning them into observables at the observing actor.

Furthermore, CAF does not support the scheduler interface from ReactiveX. Data flows are usually managed by an actor. Hence, there is no analog for operators such as SubscribeOn. That being said, the flow API does not tie observables or observers to actor types. The interface caf::flow::coordinator manages scheduling of flow-related work and can be implemented to run CAF flows without actors, e.g., to integrate them into a custom event loop.

Testing

CAF comes with built-in support for writing unit tests in a domain-specific language (DSL). The API looks similar to well-known testing frameworks such as Boost.Test and Catch but adds CAF-specific macros for testing messaging between actors.

Our design leverages four main concepts:

  • Checks represent single boolean expressions.
  • Tests contain one or more checks.
  • Fixtures equip tests with a fixed data environment.
  • Suites group tests together.

The following code illustrates a very basic test case that captures the four main concepts described above.

// Adds all tests in this compilation unit to the suite "math".
#define CAF_SUITE math

// Pulls in all the necessary macros.
#include "caf/test/dsl.hpp"

namespace {

struct fixture {};

} // namespace

// Makes all members of `fixture` available to tests in the scope.
CAF_TEST_FIXTURE_SCOPE(math_tests, fixture)

// Implements our first test.
CAF_TEST(divide) {
  CAF_CHECK(1 / 1 == 0); // fails
  CAF_CHECK(2 / 2 == 1); // passes
  CAF_REQUIRE(3 + 3 == 5); // fails and aborts test execution
  CAF_CHECK(4 - 4 == 0); // unreachable due to previous requirement error
}

CAF_TEST_FIXTURE_SCOPE_END()

The code above highlights the two basic macros CAF_CHECK and CAF_REQUIRE. The former reports failed checks, but allows the test to continue on error. The latter stops test execution if the boolean expression evaluates to false.

The third macro worth mentioning is CAF_FAIL. It unconditionally stops test execution with an error message. This is particularly useful for stopping program execution after receiving unexpected messages, as we will see later.

Testing Actors

The following example illustrates how to add an actor system as well as a scoped actor to fixtures. This allows spawning of and interacting with actors in a similar way regular programs would. Except that we are using macros such as CAF_CHECK and provide tests rather than implementing a caf_main.

namespace {

struct fixture {
  caf::actor_system_config cfg;
  caf::actor_system sys;
  caf::scoped_actor self;

  fixture() : sys(cfg), self(sys) {
    // nop
  }
};

caf::behavior adder() {
  return {
    [=](int x, int y) {
      return x + y;
    }
  };
}

} // namespace

CAF_TEST_FIXTURE_SCOPE(actor_tests, fixture)

CAF_TEST(simple actor test) {
  // Our Actor-Under-Test.
  auto aut = self->spawn(adder);
  self->request(aut, caf::infinite, 3, 4).receive(
    [=](int r) {
      CAF_CHECK(r == 7);
    },
    [&](caf::error& err) {
      // Must not happen, stop test.
      CAF_FAIL(err);
    });
}

CAF_TEST_FIXTURE_SCOPE_END()

The example above works, but suffers from several issues:

  • Significant amount of boilerplate code.
  • Using a scoped actor as illustrated above can only test one actor at a time. However, messages between other actors are invisible to us.
  • CAF runs actors in a thread pool by default. The resulting nondeterminism makes triggering reliable ordering of messages near impossible. Further, forcing timeouts to test error handling code is even harder.

Deterministic Testing

CAF provides a scheduler implementation specifically tailored for writing unit tests called test_coordinator. It does not start any threads and instead gives unit tests full control over message dispatching and timeout management.

To reduce boilerplate code, CAF also provides a fixture template called test_coordinator_fixture that comes with ready-to-use actor system (sys) and testing scheduler (sched). The optional template parameter allows unit tests to plugin custom actor system configuration classes.

Using this fixture unlocks three additional macros:

  • expect checks for a single message. The macro verifies the content types of the message and invokes the necessary member functions on the test coordinator. Optionally, the macro checks the receiver of the message and its content. If the expected message does not exist, the test aborts.
  • allow is similar to expect, but it does not abort the test if the expected message is missing. This macro returns true if the allowed message was delivered, false otherwise.
  • disallow aborts the test if a particular message was delivered to an actor.

The following example implements two actors, ping and pong, that exchange a configurable amount of messages. The test three pings then checks the contents of each message with expect and verifies that no additional messages exist using disallow.

namespace {

behavior ping(event_based_actor* self, actor pong_actor, int n) {
  self->send(pong_actor, ping_atom_v, n);
  return {
    [=](pong_atom, int x) {
      if (x > 1)
        self->send(pong_actor, ping_atom_v, x - 1);
    },
  };
}

behavior pong() {
  return {
    [=](ping_atom, int x) { return make_result(pong_atom_v, x); },
  };
}

struct ping_pong_fixture : test_coordinator_fixture<> {
  actor pong_actor;

  ping_pong_fixture() {
    // Spawn the Pong actor.
    pong_actor = sys.spawn(pong);
    // Run initialization code for Pong.
    run();
  }
};

} // namespace

CAF_TEST_FIXTURE_SCOPE(ping_pong_tests, ping_pong_fixture)

CAF_TEST(three pings) {
  // Spawn the Ping actor and run its initialization code.
  auto ping_actor = sys.spawn(ping, pong_actor, 3);
  sched.run_once();
  // Test communication between Ping and Pong.
  expect((ping_atom, int), from(ping_actor).to(pong_actor).with(_, 3));
  expect((pong_atom, int), from(pong_actor).to(ping_actor).with(_, 3));
  expect((ping_atom, int), from(ping_actor).to(pong_actor).with(_, 2));
  expect((pong_atom, int), from(pong_actor).to(ping_actor).with(_, 2));
  expect((ping_atom, int), from(ping_actor).to(pong_actor).with(_, 1));
  expect((pong_atom, int), from(pong_actor).to(ping_actor).with(_, 1));
  // No further messages allowed.
  disallow((ping_atom, int), from(ping_actor).to(pong_actor).with(_, 1));
}

CAF_TEST_FIXTURE_SCOPE_END()

Metrics

Building and testing an application (or microservice) is merely the first step in its lifetime cycle. Once you enter production and start deploying your software, you constantly need to monitor it. Is it still running? How many actors do we have? How much requests can our system handle? Where are potential bottlenecks? Do we have resources to spare or do we need to allocate more? Are we keeping our SLAs?

In order to answer such high-level questions, powerful tools like Prometheus have emerged. However, such monitoring systems are only as good as the data you feed it.

The metrics API in CAF enables you to instrument your code for generating performance data. The API is vendor-neutral, but borrows many concepts as well as terminology from Prometheus. Currently, CAF can only export metrics to Prometheus. However, the API allows users to collect the metrics manually for writing custom integrations.

Note

All classes for instrumenting code live in the namespace caf::telemetry.

Metric Names and Labels

Each metric is uniquely identified by:

  • A prefix. This acts as a namespace for grouping metrics together. All metrics that CAF collects by itself use the prefix caf.
  • A name. This identifies the metric within the prefix. By convention, these names are all-lowercase and hyphenated. For example, running-actors.
  • Any number of label dimensions. Labels are key-value pairs that divide a metric into useful categories. For example, a metric that counts HTTP requests could split into method=get, method=put, method=post, etc. Aggregating all metrics by method would then yield the total amount.

Metrics that share prefix, name and label names form a metric family. This is also directly reflected in the API: the class metric_family bundles all shared attributes and stores all instances as children.

A metric family without labels always contains exactly one child. Hence, CAF calls this metric singleton in its API.

Note

CAF identifies metrics by prefix and name. Hence, families with the same prefix and name but different label names are prohibited.

Metric Types

CAF knows these types of metrics:

  1. Counters. A counter represents a monotonically increasing value. For example, the total number of messages received by all actors, the total number of errors since starting the system, etc.
  2. Gauges. A gauge represents a numerical value that can arbitrarily increase or decrease. For example, the current number of messages in all mailboxes, the number of running actors, etc.
  3. Histograms. A histogram observes numerical values and counts them in (configurable) buckets. For example, sampling the processing time of messages t with buckets for 0ms t 1ms, 1ms < t 10ms, 10ms < t 100ms, and so on gives information on the usual response time and outliers. Histograms internally consist of counters and provide a relatively lightweight sampling mechanism. However, providing the right boundaries for the buckets can require some experimentation or experience.

Further, CAF provides two implementations for each metric type: one using int64_t as internal representation and one using double. Both implementations use atomic operations, but the former is usually more efficient on platforms such as x86. In user code, we recommend only using these type definitions:

  • dbl_counter for monotonically increasing floating point numbers
  • int_counter for monotonically increasing 64-bit integers
  • dbl_gauge for arbitrary floating point numbers
  • int_gauge for arbitrary 64-bit integers
  • dbl_histogram for sampling floating point numbers
  • int_histogram for sampling 64-bit integers

The associated headers are:

  • caf/telemetry/counter.hpp
  • caf/telemetry/gauge.hpp
  • caf/telemetry/histogram.hpp

Counters

Counters wrap an atomic count but only allows incrementing it. The class provides the following member functions:

/// Increments the counter by 1.
void inc() noexcept;

/// Increments the counter by `amount`.
/// @pre `amount > 0`
void inc(value_type amount) noexcept;

/// Returns the current value of the counter.
value_type value() const noexcept;

/// Increments the counter by 1.
/// @note only available if value_type == int64_t
value_type operator++() noexcept;

Gauges

Like counters, gauges also wrap an atomic count. However, gauges are less permissive and allow decrementing as well.

/// Increments the gauge by 1.
void inc() noexcept;

/// Increments the gauge by `amount`.
void inc(value_type amount) noexcept;

/// Decrements the gauge by 1.
void dec() noexcept;

/// Decrements the gauge by `amount`.
void dec(value_type amount) noexcept;

/// Sets the gauge to `x`.
void value(value_type x) noexcept;

/// Increments the gauge by 1.
/// @returns The new value of the gauge.
/// @note only available if value_type == int64_t
value_type operator++() noexcept;

/// Decrements the gauge by 1.
/// @returns The new value of the gauge.
/// @note only available if value_type == int64_t
value_type operator--() noexcept;

/// Returns the current value of the gauge.
value_type value() const noexcept;

Histogram

Histograms consist of one counter per bucket as well as a gauge for the sum of all observed values (values may be negative).

/// Increments the bucket where the observed value falls into and increments
/// the sum of all observed values.
void observe(value_type value);

/// Returns the sum of all observed values.
value_type sum() const noexcept;

Metric Units and Flags

All metric types store numerical values, either as double or as int64_t. For giving this number additional semantics, CAF allows assigning units (of measurement) to metrics. The default unit is 1, which denotes dimensionless counts such as the number of messages in a mailbox.

The unit can be any string, but we recommend using only base units such as seconds or bytes to make processing of these metrics with monitoring systems easier.

Each metric also carries one flag: is-sum. Setting this to true (the default is false) indicates that this metric adds something up to a total where only the total value is of interest. For example, the total number of HTTP requests. CAF itself does not care about the flag, but it can give extra information to collectors or exporters. For example, the Prometheus exporter will add a _total suffix to the exported metric name.

The Metric Registry

All metrics of an actor system are managed by a single registry to make sure only one metric instance exists per prefix and name combination. Further, the registry stores all metrics in a single place to allow collectors to iterate over all metrics in a single place.

A minimal custom collector class requires providing operator() overloads as shown below:

class my_collector {
public:
  void operator()(const metric_family* family, const metric* instance,
                  const dbl_counter* impl);

  void operator()(const metric_family* family, const metric* instance,
                  const int_counter* impl);

  void operator()(const metric_family* family, const metric* instance,
                  const dbl_gauge* impl);

  void operator()(const metric_family* family, const metric* instance,
                  const int_gauge* impl);

  void operator()(const metric_family* family, const metric* instance,
                  const dbl_histogram* impl);

  void operator()(const metric_family* family, const metric* instance,
                  const int_histogram* impl);
};

Applying the collector to the registry looks as follows (with sys being a reference to an actor_system):

my_collector f;
sys.metrics().collect(f);

The associated headers is caf/telemetry/metric_registry.hpp.

Accessing Metrics

Accessing a metric is a three-step process:

  1. Get the metric_registry from the actor system.
  2. Get the metric_family from the registry.
  3. Call get_or_add on the family to get a pointer to the counter, gauge, or histogram.

The pointer remains valid until the actor system gets destroyed. Hence, holding on to the pointer in an actor is always safe.

The registry creates metrics lazily (to be more precise, it creates families lazily that in turn create metric instances lazily). Since this requires synchronization via mutexes, we recommend to only access the registry once per metric and then store the pointer.

Accessing Counters and Gauges

Counters and gauges are very similar in their API. Hence, all functions that work on gauges only require replacing gauge with counter to work with counters instead.

Gauges are owned (and created) by a gauge family object. We can either get the family object explicitly by calling gauge_family, or we can use one of the two shortcut functions gauge_instance or gauge_singleton. The C++ prototypes for the registry member functions look as follows:

template <class ValueType = int64_t>
auto* gauge_family(string_view prefix, string_view name,
                   span<const string_view> labels, string_view helptext,
                   string_view unit = "1", bool is_sum = false);

template <class ValueType = int64_t>
auto* gauge_instance(string_view prefix, string_view name,
                     span<const label_view> labels, string_view helptext,
                     string_view unit = "1", bool is_sum = false);

template <class ValueType = int64_t>
auto* gauge_singleton(string_view prefix, string_view name,
                      string_view helptext, string_view unit = "1",
                      bool is_sum = false);

Note

All functions that take a span also provide an overload that accepts a std::initializer_list instead to make working with constants easier.

The function gauge_family returns a type-specific metric family object, while the other two functions return the gauge directly.

The family objects only have a single noteworthy member function, get_or_add:

auto fptr = registry.counter_family("http", "requests", {"method"},
                                    "Number of HTTP requests.", "seconds",
                                    true);
auto count = fptr->get_or_add({{"method", "put"}});

If we only get a single counter from the family, we can use counter_instance instead:

auto count = registry.counter_instance("http", "requests",
                                       {{"method", "put"}},
                                       "Number of HTTP requests.",
                                       "seconds", true);

Accessing Histograms

The member functions for accessing histogram families and histograms follow the same pattern as the member functions for counters and gauges.

template <class ValueType = int64_t>
auto* histogram_family(string_view prefix, string_view name,
                       span<const string_view> label_names,
                       span<const ValueType> default_upper_bounds,
                       string_view helptext, string_view unit = "1",
                       bool is_sum = false);

template <class ValueType = int64_t>
auto* histogram_instance(string_view prefix, string_view name,
                         span<const label_view> label_names,
                         span<const ValueType> default_upper_bounds,
                         string_view helptext, string_view unit = "1",
                         bool is_sum = false);

template <class ValueType = int64_t>
auto* histogram_singleton(string_view prefix, string_view name,
                          span<const ValueType> default_upper_bounds,
                          string_view helptext, string_view unit = "1",
                          bool is_sum = false);

Compared to the member functions for counters and guages, histograms require one addition argument for the default bucket upper bounds.

Warning

The default_upper_bounds parameter must be sorted!

CAF automatically adds one additional bucket for observing all values between the last upper bound and infinity (double) or INT_MAX (int64_t). For example, passing [10, 100, 1000] as upper bounds creates four buckets in total. The first bucket captues all values with x 10. The second bucket captues all values with 10 < x 100. The third bucket captures all values with 100 < x 1000. Finally, the fourth bucket (added automatically) captures all values with 1000 < x INT_MAX.

Configuration Parameters

Histograms use the actor system configuration to enable users to override hard-coded default bucket settings. On construction, the histogram family check whether a key caf.metrics.${prefix}.${name}.buckets exists. Further, the metric instance also checks on construction whether a more specific bucket setting for one of its label dimensions exist.

For example, consider we add a histogram family with prefix http, name request-duration, and label dimension method to the registry. The family first tries to read caf.metrics.http.request-duration.buckets from the configuration and otherwise falls back to the hard-coded defaults. When creating a histogram instance from the family with the label method=put, the construct first tries to read caf.metrics.http.request-duration.method=put.buckets from the configuration and otherwise uses the default for the family.

In a configuration file, users may provide bucket settings like this:

caf {
  metrics {
    http {
      # measures the duration per HTTP request in seconds
      request-duration {
        buckets = [
          0.001, # ≤   1ms
          0.01,  # ≤  10ms
          0.05,  # ≤  50ms
          0.1,   # ≤ 100ms
          0.25,  # ≤ 250ms
          0.5,   # ≤ 500ms
          0.75,  # ≤ 750ms
        ]
        # use different settings for get requests
        "method=put" {
          buckets = [
            0.007, # ≤   7ms
            0.012, # ≤  12ms
            0.025, # ≤  25ms
            0.05,  # ≤  50ms
            0.1,   # ≤ 100ms
          ]
        }
      }
    }
  }
}

Note

Ambiguous settings for metrics with multiple label dimensions will result in CAF picking the first match from an unspecified order. Hence, prefer using only one label dimension for configuring buckets or otherwise make sure there is always exactly one match for instance labels.

Performance Considerations

Instrumenting code should affect the performance as little as possible. Keep in mind that each member function on the registry has to acquire a lock. Ideally, applications call functions such as gauge_family once during setup and then store the family pointer to create metric instances later.

Ideally, there is a single occurrence in the code for getting the family object from the registry and a single occurrence in the code for getting the gauge/counter/histogram object from the family (get_or_add also has to acquire a lock).

All operations on gauges, counters and histograms use atomic operations. Depending on the type, CAF internally uses std::atomic<int64_t> or std::atomic<double>. Adding a sample to a histogram requires two atomic operations: one for the bucket and one for the sum.

Atomic operations are reasonably fast, but we still recommend to avoid them in tight loops.

Builtin Metrics

CAF collects a set of builtin metrics in order to provide insights into the actor system and its modules. Some are always collect while others require configuration by the user.

Base Metrics

The actor system collects this set of metrics always by default (note that all caf.middleman metrics only appear when loading the I/O module).

caf.system.running-actors
  • Tracks the current number of running actors in the system.
  • Type: int_gauge
  • Label dimensions: none.
caf.system.processed-messages
  • Counts the total number of processed messages.
  • Type: int_counter
  • Label dimensions: none.
caf.system.rejected-messages
  • Counts the number of messages that where rejected because the target mailbox was closed or did not exist.
  • Type: int_counter
  • Label dimensions: none.
caf.middleman.inbound-messages-size
  • Samples the size of inbound messages before deserializing them.
  • Type: int_histogram
  • Unit: bytes
  • Label dimensions: none.
caf.middleman.outbound-messages-size
  • Samples the size of outbound messages after serializing them.
  • Type: int_histogram
  • Unit: bytes
  • Label dimensions: none.
caf.middleman.deserialization-time
  • Samples how long the middleman needs to deserialize inbound messages.
  • Type: dbl_histogram
  • Unit: seconds
  • Label dimensions: none.
caf.middleman.serialization-time
  • Samples how long the middleman needs to serialize outbound messages.
  • Type: dbl_histogram
  • Unit: seconds
  • Label dimensions: none.

Actor Metrics and Filters

Unlike the base metrics, actor metrics are off by default. Applications can spawn thousands of actors, with many only existing for a brief time. Hence, blindly collecting data from all actors in the system can impact the performance and also produce a lot of irrelevant noise.

To make sure CAF only collects actor metrics that are relevant to the user, the actor system configuration provides two lists: caf.metrics-filters.actors.includes and caf.metrics-filters.actors.excludes. CAF collects metrics for all actors that have names that are selected by the includes list and are not selected by the excludes list. Entries in the list can use glob-style syntax, in particular *-wildcards. For example:

caf {
  metrics-filters {
    actors {
      includes = [ "foo.*" ]
      excludes = [ "foo.bar" ]
    }
  }
}

The configuration above would select all actors with names that start with foo. except for actors named foo.bar.

Note

Names belong to actor types. CAF assigns default names such as user.scheduled-actor by default. To provide a custom name, either override the member function const char* name() const when implementing class-based actors or add a static member variable static inline const char* name = "..." to your state class when using stateful actors.

CAF uses a hierarchical, hyphenated naming scheme with . as the separator and all-lowercase name components. For example, caf.system.spawn-server. Users may follow this naming scheme for consistency, but CAF does not enforce any structure on the names. However, we do recommend to avoid whitespaces and special characters that the glob engine recognizes, such as *, /, etc.

For all actors that are selected by the user-defined filters, CAF collects this set of metrics:

caf.actor.processing-time
  • Samples how long the actor needs to process messages.
  • Type: dbl_histogram
  • Unit: seconds
  • Label dimensions: name.
caf.actor.mailbox-time
  • Samples how long messages wait in the mailbox before being processed.
  • Type: dbl_histogram
  • Unit: seconds
  • Label dimensions: name.
caf.actor.mailbox-size
  • Counts how many messages are currently waiting in the mailbox.
  • Type: int_gauge
  • Label dimensions: name.
caf.actor.stream.processed-elements
  • Counts the total number of processed stream elements from upstream.
  • Type: int_counter
  • Label dimensions: name, type.
caf.actor.stream.input-buffer-size
  • Tracks how many stream elements from upstream are currently buffered.
  • Type: int_gauge
  • Label dimensions: name, type.
caf.stream.pushed-elements
  • Counts the total number of elements that have been pushed downstream.
  • Type: int_counter
  • Label dimensions: name, type.
caf.stream.output-buffer-size
  • Tracks how many stream elements are currently waiting in the output buffer.
  • Type: int_gauge
  • Label dimensions: name, type.

Exporting Metrics to Prometheus

The network module in CAF comes with builtin support for exporting metrics to Prometheus via HTTP. However, this feature is off by default since CAF generally avoids opening ports without explicit user input.

During startup, the middleman enables the export of metrics when the configuration provides a valid value (0 to 65536) for caf.middleman.prometheus-http.port as shown in the example config file below.

caf {
  middleman {
    prometheus-http {
      # listen for incoming HTTP requests on port 8080 (required parameter)
      port = 8080
      # the bind address (optional parameter; default is 0.0.0.0)
      address = "0.0.0.0"
    }
  }
}

Middleman

The middleman is the main component of the I/O module and enables distribution. It transparently manages proxy actor instances representing remote actors, maintains connections to other nodes, and takes care of serialization of messages. Applications install a middleman by loading caf::io::middleman as module (see Configuring Actor Applications). Users can include "caf/io/all.hpp" to get access to all public classes of the I/O module.

Class middleman

Remoting  
expected<uint16> open(uint16, const char*, bool) See Publishing and Connecting.
expected<uint16> publish(T, uint16, const char*, bool) See Publishing and Connecting.
expected<void> unpublish(T x, uint16) See Publishing and Connecting.
expected<node_id> connect(std::string host, uint16_t port) See Publishing and Connecting.
expected<T> remote_actor<T = actor>(string, uint16) See Publishing and Connecting.
expected<T> spawn_broker(F fun, ...) See Network I/O with Brokers.
expected<T> spawn_client(F, string, uint16, ...) See Network I/O with Brokers.
expected<T> spawn_server(F, uint16, ...) See Network I/O with Brokers.

Publishing and Connecting

The member function publish binds an actor to a given port, thereby allowing other nodes to access it over the network.

template <class T>
expected<uint16_t> middleman::publish(T x, uint16_t port,
                                      const char* in = nullptr,
                                      bool reuse_addr = false);

The first argument is a handle of type actor or typed_actor<...>. The second argument denotes the TCP port. The OS will pick a random high-level port when passing 0. The third parameter configures the listening address. Passing null will accept all incoming connections (INADDR_ANY). Finally, the flag reuse_addr controls the behavior when binding an IP address to a port, with the same semantics as the BSD socket flag SO_REUSEADDR. For example, with reuse_addr = false, binding two sockets to 0.0.0.0:42 and 10.0.0.1:42 will fail with EADDRINUSE since 0.0.0.0 includes 10.0.0.1. With reuse_addr = true binding would succeed because 10.0.0.1 and 0.0.0.0 are not literally equal addresses.

The member function returns the bound port on success. Otherwise, an error (see Errors) is returned.

template <class T>
expected<uint16_t> middleman::unpublish(T x, uint16_t port = 0);

The member function unpublish allows actors to close a port manually. This is performed automatically if the published actor terminates. Passing 0 as second argument closes all ports an actor is published to, otherwise only one specific port is closed.

The function returns an error (see Errors) if the actor was not bound to given port.

template<class T = actor>
expected<T> middleman::remote_actor(std::string host, uint16_t port);

After a server has published an actor with publish, clients can connect to the published actor by calling remote_actor:

// node A
auto ping = spawn(ping);
system.middleman().publish(ping, 4242);

// node B
auto ping = system.middleman().remote_actor("node A", 4242);
if (!ping)
  cerr << "unable to connect to node A: " << to_string(ping.error()) << '\n';
else
  self->send(*ping, ping_atom::value);

There is no difference between server and client after the connection phase. Remote actors use the same handle types as local actors and are thus fully transparent.

The function pair open and connect allows users to connect CAF instances without remote actor setup. The function connect returns a node_id that can be used for remote spawning (see (see Remote Spawning of Actors)).

Free Functions

The following free functions in the namespace caf::io avoid calling the middleman directly. This enables users to easily switch between communication backends as long as the interfaces have the same signatures. For example, the (experimental) OpenSSL binding of CAF implements the same functions in the namespace caf::openssl to easily switch between encrypted and unencrypted communication.

expected<uint16> open(actor_system&, uint16, const char*, bool) See Publishing and Connecting.
expected<uint16> publish(T, uint16, const char*, bool) See Publishing and Connecting.
expected<void> unpublish(T x, uint16) See Publishing and Connecting.
expected<node_id> connect(actor_system&, std::string host, uint16_t port) See Publishing and Connecting.
expected<T> remote_actor<T = actor>(actor_system&, string, uint16) See Publishing and Connecting.

Transport Protocols experimental

CAF communication uses TCP per default and thus the functions shown in the middleman API above are related to TCP. There are two alternatives to plain TCP: TLS via the OpenSSL module shortly discussed in (see Free Functions) and UDP.

UDP is integrated in the default multiplexer and BASP broker. Set the flag middleman_enable_udp to true to enable it (see Configuring Actor Applications). This does not require you to disable TCP. Use publish_udp and remote_actor_udp to establish communication.

Communication via UDP is inherently unreliable and unordered. CAF reestablishes order and drops messages that arrive late. Messages that are sent via datagrams are limited to a maximum of 65.535 bytes which is used as a receive buffer size by CAF. Note that messages that exceed the MTU are fragmented by IP and are considered lost if a single fragment is lost. Optional reliability based on retransmissions and messages slicing on the application layer are planned for the future.

Network I/O with Brokers

Note

We no longer recommend the Brokers from the I/O module for new code. Please consider using Networking Module instead: Overview. The new caf-net module offers higher-level abstractions that interface with flows (see Data Flows). Message-based communication is also available with an Actor Shell.

When communicating to other services in the network, sometimes low-level socket I/O is inevitable. For this reason, CAF provides brokers. A broker is an event-based actor running in the middleman that multiplexes socket I/O. It can maintain any number of acceptors and connections. Since the broker runs in the middleman, implementations should be careful to consume as little time as possible in message handlers. Brokers should outsource any considerable amount of work by spawning new actors or maintaining worker actors.

Note that all UDP-related functionality is still experimental.

Spawning Brokers

Brokers are implemented as functions and are spawned by calling on of the three following member functions of the middleman.

template <spawn_options Os = no_spawn_options,
          class F = std::function<void(broker*)>, class... Ts>
typename infer_handle_from_fun<F>::type
spawn_broker(F fun, Ts&&... xs);

template <spawn_options Os = no_spawn_options,
          class F = std::function<void(broker*)>, class... Ts>
expected<typename infer_handle_from_fun<F>::type>
spawn_client(F fun, const std::string& host, uint16_t port, Ts&&... xs);

template <spawn_options Os = no_spawn_options,
          class F = std::function<void(broker*)>, class... Ts>
expected<typename infer_handle_from_fun<F>::type>
spawn_server(F fun, uint16_t port, Ts&&... xs);

The function spawn_broker simply spawns a broker. The convenience function spawn_client tries to connect to given host and port over TCP and returns a broker managing this connection on success. Finally, spawn_server opens a local TCP port and spawns a broker managing it on success. There are no convenience functions spawn a UDP-based client or server.

Class broker

void configure_read(connection_handle hdl, receive_policy::config config);

Modifies the receive policy for the connection identified by hdl. This will cause the middleman to enqueue the next new_data_msg according to the given config created by receive_policy::exactly(x), receive_policy::at_most(x), or receive_policy::at_least(x) (with x denoting the number of bytes).

void write(connection_handle hdl, size_t num_bytes, const void* buf)
void write(datagram_handle hdl, size_t num_bytes, const void* buf)

Writes data to the output buffer.

void enqueue_datagram(datagram_handle hdl, std::vector<char> buf);

Enqueues a buffer to be sent as a datagram. Use of this function is encouraged over write as it allows reuse of the buffer which can be returned to the broker in a datagram_sent_msg.

void flush(connection_handle hdl);
void flush(datagram_handle hdl);

Sends the data from the output buffer.

template <class F, class... Ts>
actor fork(F fun, connection_handle hdl, Ts&&... xs);

Spawns a new broker that takes ownership of a given connection.

size_t num_connections();

Returns the number of open connections.

void close(connection_handle hdl);
void close(accept_handle hdl);
void close(datagram_handle hdl);

Closes the endpoint related to the handle.

expected<std::pair<accept_handle, uint16_t>>
add_tcp_doorman(uint16_t port = 0, const char* in = nullptr,
                bool reuse_addr = false);

Creates new doorman that accepts incoming connections on a given port and returns the handle to the doorman and the port in use or an error.

expected<connection_handle>
add_tcp_scribe(const std::string& host, uint16_t port);

Creates a new scribe to connect to host:port and returns handle to it or an error.

expected<std::pair<datagram_handle, uint16_t>>
add_udp_datagram_servant(uint16_t port = 0, const char* in = nullptr,
                         bool reuse_addr = false);

Creates a datagram servant to handle incoming datagrams on a given port. Returns the handle to the servant and the port in use or an error.

expected<datagram_handle>
add_udp_datagram_servant(const std::string& host, uint16_t port);

Creates a datagram servant to send datagrams to host:port and returns a handle to it or an error.

Manually Triggering Events experimental

Brokers receive new events as new_connection_msg and new_data_msg as soon and as often as they occur, per default. This means a fast peer can overwhelm a broker by sending it data faster than the broker can process it. In particular if the broker outsources work items to other actors, because work items can accumulate in the mailboxes of the workers.

Calling self->trigger(x,y), where x is a connection or acceptor handle and y is a positive integer, allows brokers to halt activities after y additional events. Once a connection or acceptor stops accepting new data or connections, the broker receives a connection_passivated_msg or acceptor_passivated_msg.

Brokers can stop activities unconditionally with self->halt(x) and resume activities unconditionally with self->trigger(x).

Remote Spawning of Actors

Remote spawning is an extension of the dynamic spawn using run-time type names (see Adding Custom Actor Types experimental). The following example assumes a typed actor handle named calculator with an actor implementing this messaging interface named “calculator”.

void client(actor_system& system, const config& cfg) {
  auto node = system.middleman().connect(cfg.host, cfg.port);
  if (!node) {
    cerr << "*** connect failed: " << to_string(node.error()) << endl;
    return;
  }
  auto type = "calculator";             // type of the actor we wish to spawn
  auto args = make_message();           // arguments to construct the actor
  auto tout = std::chrono::seconds(30); // wait no longer than 30s
  auto worker = system.middleman().remote_spawn<calculator>(*node, type, args,
                                                            tout);
  if (!worker) {
    cerr << "*** remote spawn failed: " << to_string(worker.error()) << endl;
    return;
  }
  // start using worker in main loop
  client_repl(make_function_view(*worker));
  // be a good citizen and terminate remotely spawned actor before exiting
  anon_send_exit(*worker, exit_reason::kill);
}

We first connect to a CAF node with middleman().connect(...). On success, connect returns the node ID we need for remote_spawn. This requires the server to open a port with middleman().open(...) or middleman().publish(...). Alternatively, we can obtain the node ID from an already existing remote actor handle—returned from remote_actor for example—via hdl->node(). After connecting to the server, we can use middleman().remote_spawn<...>(...) to create actors remotely.

Overview

The networking module offers a high-level, declarative DSL for individual protocols as well as low-level building blocks for implementing new protocols and assembling protocol stacks.

When using caf-net for applications, we generally recommend sticking to the declarative API.

Initializing the Module

The networking module is an optional component of CAF. To use it, users can pass caf::net::middleman to CAF_MAIN.

When not using the CAF_MAIN macro, users must initialize the library by:

  • Calling caf::net::middleman::init_global_meta_objects() before initializing the actor_system. This step adds the necessary meta objects to the global meta object registry.
  • Calling caf::net::middleman::init_host_system() before initializing the actor_system (or calling any function of the module). This step runs platform-specific initialization code (such as calling WSAStartup on Windows) by calling caf::net::this_host::startup() and initializes the SSL library by calling caf::net::ssl::startup() The function init_host_system returns a guard object that calls caf::net::this_host::cleanup() and caf::net::ssl::cleanup() in its destructor.

Declarative High-level DSL experimental

The high-level APIs follow a factory pattern that configures each layer from the bottom up, usually starting at the actor system. For example:

namespace ws = caf::net::web_socket;
auto conn = ws::with(sys)
              .connect("localhost", 8080)
              .start([&sys](auto pull, auto push) {
                sys.spawn(my_actor, pull, push);
              });

This code snippet tries to establish a WebSocket connection on localhost:8080 and then spawns an actor that receives messages from the WebSocket by reading from pull and sends messages to the WebSocket by writing to push. Errors are also transmitted over the pull resource.

A trivial implementation for my_actor that sends all messages it receives back to the sender could look like this:

namespace ws = caf::net::web_socket;
void my_actor(caf::event_based_actor* self,
              ws::default_trait::pull_resource pull,
              ws::default_trait::push_resource push) {
  self->make_observable().from_resource(pull).subscribe(push);
}

In general, starting a client that connects to a server follows the pattern:

PROTOCOL::with(CONTEXT).connect(WHERE).start(ON_CONNECT);

Starting a server uses accept instead:

PROTOCOL::with(CONTEXT).accept(WHERE).start(ON_ACCEPT);

The ON_ACCEPT handler may be optional and some protocols do not support it at all. For example, the HTTP server accepts callbacks for individual routes and users may call start without arguments to simply launch the server and dispatch to the configured routes.

In all cases, CAF returns expected<disposable>. The disposable allows users to cancel the background activity at any time. Note: when canceling a server, this only disposes the server itself. Previously established connections remain unaffected.

For error reporting, most factories also allow setting a callback with do_on_error. This can be useful for ignoring the result value but still reporting errors.

Many protocols also accept additional configuration parameters. For example, the connect API may also allows to configure multiple connection attempts.

Please refer to the sections for the individual protocols for a more detailed description of available options.

Protocol Layering

Layering plays an important role in the design of the caf.net module. When implementing a new protocol for caf.net, this protocol should integrate naturally with any number of other protocols. To enable this key property, caf.net establishes a set of conventions and abstract interfaces.

Please note that the protocol layering is meant for adding new networking capabilities to CAF, but we do not recommend using the protocol stacks directly in application code. The protocol stacks are meant as building blocks for higher-level, declarative APIs.

A protocol layer is a class that implements a single processing step in a communication pipeline. Multiple layers are organized in a protocol stack. Each layer may only communicate with its direct predecessor or successor in the stack.

At the bottom of the protocol stack is usually a transport layer. For example, an octet_stream::transport that manages a stream socket and provides access to input and output buffers to the upper layer.

At the top of the protocol is an application that utilizes the lower layers for communication via the network. Applications should only rely on the abstract interface type when communicating with their lower layer. For example, an application that processes a data stream should not implement against a TCP socket interface directly. By programming against the abstract interface types of caf.net, users can instantiate an application with any compatible protocol stack of their choosing. For example, a user may add extra security by using application-level data encryption or combine a custom datagram transport with protocol layers that establish ordering and reliability to emulate a stream.

By default, caf.net distinguishes between these abstract interface types:

  • datagram: A datagram interface provides access to some basic transfer units that may arrive out of order or not at all.
  • stream: An octet stream interface represents a sequence of Bytes, transmitted reliable and in order.
  • message: A message interface provides access to high-level, structured data. Messages usually consist of a header and a payload. A single message may span multiple datagrams.

Note that each interface type also depends on the direction, i.e., whether talking to the upper or lower level. Incoming data always travels the protocol stack up. Outgoing data always travels the protocol stack down.

A protocol stack always lives in a socket_manager. The deepest layer in the stack is always a socket_event_layer that simply turns events on sockets (e.g., ready-to-read) into function calls. Only transport layers will implement this layer.

A transport layer then responds to socket events by reading and writing to the socket. The transport acts as the lower layer for the next layer in the processing chain. For example, the octet_stream::transport is an octet_stream::lower_layer. To interface with an octet stream, user-defined classes implement octet_stream::upper_layer.

When instantiating a protocol stack, each layer is represented by a concrete object and we build the pipeline from top to bottom, i.e., we create the highest layer first and then pass the last layer to the next lower layer until arriving at the socket manager.

The layering API is generally structured into upper and lower layers. For example, the upper layer for HTTP consumes the requests while the lower layer can be used to send responses. Since the layering API is quite low level, we recommend consulting the Doxygen documentation for the class interfaces and looking at existing protocols such as the length-prefix framing as basis for implementing custom protocols. In the manual, we focus on the high-level APIs.

SSL

Protocols such as HTTP and WebSocket have a secure version that requires an SSL/TLS layer. CAF bundles functions and classes for implementing secure communication in the namespace caf::net::ssl. It is worth mentioning that classes and functions in this namespace only provide convenient access to SSL capabilities by wrapping an actual SSL implementation (usually OpenSSL).

Context

CAF users usually only need to care about the class context. This is the central state of an SSL-enabled server or client. On a server, all incoming connection will use the configuration from this context.

To create a new context, we can use one of these factory member functions:

  • enable(bool flag)
  • make(tls min_version, tls max_version = tls::any)
  • make_server(tls min_version, tls max_version = tls::any)
  • make_client(tls min_version, tls max_version = tls::any)
  • make(dtls min_version, dtls max_version = dtls::any)
  • make_server(dtls min_version, dtls max_version = dtls::any)
  • make_client(dtls min_version, dtls max_version = dtls::any)

All make* functions return an caf::expected<caf::net::ssl::context>. However, the factory function enable returns caf::expected<void> instead. This function is meant as an entry point for creating a context through a series of chained and_then invocations on the expected. Here is an example to illustrate how it works in practice:

namespace ws = caf::net::web_socket;
auto pem = ssl::format::pem;
auto key_file = caf::get_as<std::string>(cfg, "tls.key-file");
auto cert_file = caf::get_as<std::string>(cfg, "tls.cert-file");
if (!key_file != !cert_file) {
  std::cerr << "*** inconsistent TLS config: declare neither file or both\n";
  return EXIT_FAILURE;
}
auto server
  = ws::with(sys)
      .context(ssl::context::enable(key_file && cert_file)
                 .and_then(ssl::emplace_server(ssl::tls::v1_2))
                 .and_then(ssl::use_private_key_file(key_file, pem))
                 .and_then(ssl::use_certificate_file(cert_file, pem)))
      // ...

Passing false to ssl::context::enable returns an expected with a default-constructed caf::error. Since the expected contains an error, all subsequent and_then calls turn into no-ops. However, a default-constructed error means “no error”. Hence, CAF continues without an SSL context in the example and the server will use plain (unencrypted) WebSocket communication.

Passing true to ssl::context::enable returns an “empty” expected<void>. The next call to and_then will call the function object (with no arguments) and return a new expected. In the example, we use emplace_server to create a function object that will call context::make_server(min, max) when called without arguments. Hence, we convert an expected<void> to an expected<ssl::context> with this step. The functions ssl::use_private_key_file and ssl::use_certificate_file return functions objects (lambdas) that take an ssl::context as argument and return expected<ssl::context> again. In this way, we can chain functions that modify a context with and_then to describe the setup for building up an SSL context in a declarative way. If all of the steps for building the SSL context succeed, the server will use WebSocket over SSL to encrypt communication to clients. If any of the steps produces an actual error (for example if the key file does not exist), CAF will produce an error and not start the server at all.

For the full class interface, please refer to the Doxygen documentation.

Actor Shell

Actor shells allow socket managers to interface with regular actors. Much like regular actors, actor shells come in two flavors: with dynamic typing (actor_shell) or with static typing (typed_actor_shell).

Actor shells can be embedded into a protocol instance to turn messages on the network to actor messages and vice versa. The primary use case in CAF at the moment is to allow servers to send a request message to an actor and then use the response message to generate an output on the network. Please see examples/http/rest.cpp as a reference for this use case.

Unlike a “regular” actor, an actor shell has no own control loop. Users can define a behavior with set_behavior, but are responsible for embedding the shell into some sort of control loop.

For the full class interface, please refer to the Doxygen documentation.

HTTP experimental

Servers

Note

For this API, include caf/net/http/with.hpp.

HTTP is an essential protocol for the world wide web as well as for micro services. In CAF, starting an HTTP server with the declarative API uses the entry point caf::net::http::with that takes an actor_system as argument.

On the result factory object, we can optionally call context to set an SSL context for using HTTPS instead of plain HTTP connections. Once we call accept, we enter the second phase of the setup. The accept function has multiple overloads:

  • accept(uint16_t port, std::string bind_address = "") for opening a new port. The bind_address optionally restricts which IP addresses may connect to the server. Passing an empty string (default) allows any client.
  • accept(tcp_accept_socket fd) for running the server on already configured TCP server socket.
  • accept(ssl::tcp_acceptor acc) for running the server on already configured TCP server socket with an SSL context. Using this overload after configuring an SSL context is an error, since CAF cannot use two SSL contexts on one server.

After calling accept, we enter the second step of configuring the server. Here, we can (optionally) fine-tune the server with these member functions:

  • do_on_error(F callback) installs a callback function that CAF calls if an error occurs while starting the server.
  • max_connections(size_t value) configures how many clients the server may allow to connect before blocking further connections attempts.
  • reuse_address(bool value) configures whether we create the server socket with SO_REUSEADDR. Has no effect if we have passed a socket or SSL acceptor to accept.
  • monitor(ActorHandle hdl) configures the server to monitor an actor and automatically stop the server when the actor terminates.

At this step, we may also defines routes on the HTTP server. A route binds a callback to an HTTP path on the server. On each HTTP request, the server iterates over all routes and selects the first matching route to process the request.

When defining a route, we pass an absolute path on the server, optionally the HTTP method for the route and the handler. In the path, we can use <arg> placeholders. Each argument defined in this way maps to an argument of the callback. The callback always must take http::responder& as the first argument, followed by one argument for each <arg> placeholder in the path.

For example, the following route would forward any HTTP request on /user/<arg> with the HTTP method GET to the custom handler:

.route("/user/<arg>", http::method::get, [](http::responder& res, int id) {
  // ...
})

CAF evaluates the signature of the callback to automatically deduce the argument types. On a mismatch, for example if a user accesses /user/foo, the conversion to int would fail and CAF would refuse the request with an error.

The responder encapsulates the state for responding to an HTTP request (see Responders) and allows our handler to send a response message either immediately or at some later time.

To start an HTTP server, we have two overloads for start available.

The first start overload takes no arguments. Use this overload after configuring at least one route to start an HTTP server that only dispatches to its predefined routes, as shown in the example below. Calling this overload without defining at least one route prior is an error.

int caf_main(caf::actor_system& sys, const config& cfg) {
  namespace http = caf::net::http;
  namespace ssl = caf::net::ssl;
  // Read the configuration.
  auto port = caf::get_or(cfg, "port", default_port);
  auto pem = ssl::format::pem;
  auto key_file = caf::get_as<std::string>(cfg, "tls.key-file");
  auto cert_file = caf::get_as<std::string>(cfg, "tls.cert-file");
  auto max_connections = caf::get_or(cfg, "max-connections",
                                     default_max_connections);
  if (!key_file != !cert_file) {
    std::cerr << "*** inconsistent TLS config: declare neither file or both\n";
    return EXIT_FAILURE;
  }
  // Open up a TCP port for incoming connections and start the server.
  auto server
    = http::with(sys)
        // Optionally enable TLS.
        .context(ssl::context::enable(key_file && cert_file)
                   .and_then(ssl::emplace_server(ssl::tls::v1_2))
                   .and_then(ssl::use_private_key_file(key_file, pem))
                   .and_then(ssl::use_certificate_file(cert_file, pem)))
        // Bind to the user-defined port.
        .accept(port)
        // Limit how many clients may be connected at any given time.
        .max_connections(max_connections)
        // Provide the time at '/'.
        .route("/", http::method::get,
               [](http::responder& res) {
                 auto str = caf::deep_to_string(caf::make_timestamp());
                 res.respond(http::status::ok, "text/plain", str);
               })
        // Launch the server.
        .start();
  // Report any error to the user.
  if (!server) {
    std::cerr << "*** unable to run at port " << port << ": "
              << to_string(server.error()) << '\n';
    return EXIT_FAILURE;
  }
  // Note: the actor system will only wait for actors on default. Since we don't
  // start actors, we need to block on something else.
  std::cout << "Server is up and running. Press <enter> to shut down."
            << std::endl;
  getchar();
  std::cout << "Terminating.\n";
  return EXIT_SUCCESS;
}

Note

For details on ssl::context::enable, please see SSL.

The second start overload takes one argument: a function object that takes an asynchronous resource for processing http::request objects. This class also allows to respond to HTTP requests, but is always asynchronous. Internally, the class http::request is connected to the HTTP server with a future.

After calling start, CAF returns an expected<disposble>. On success, the disposable is a handle to the launched server and calling dispose on it stops the server. On error, the result contains a human-readable description of what went wrong.

Responders

The responder holds a pointer back to the HTTP server as well as pointers to the HTTP headers and the payload.

The most commonly used member functions are as follows:

  • const request_header& header() const noexcept to access the HTTP header fields.
  • const_byte_span payload() const noexcept to access the bytes of the HTTP body (payload).
  • actor_shell* self() for allowing the responder to interface with regular actors. See Actor Shell.
  • void respond(...) (multiple overloads) for sending a response message to the client.

The class also supports conversion to an asynchronous http::request via to_request or to create a promise on the server via to_promise. The promise is bound to the server and may not be used in another thread. Usually, the promises are used in conjunction with self()->request(...) to generate an HTTP response from an actor’s response. Please look at the example under examples/http/rest.cpp as a reference.

For the full class interface, please refer to the Doxygen documentation.

Length-prefix Framing experimental

Length-prefix framing is a simple protocol for encoding variable-length messages on the network. Each message is preceded by a fixed-length value (32 bit) that indicates the length of the message in bytes.

When a sender wants to transmit a message, it first encodes the message into a sequence of bytes. It then calculates the length of the byte sequence and prefixes the message with the length. The receiver then reads the length prefix to determine the length of the message before reading the bytes for the message.

Note

For the high-level API, include caf/net/lp/with.hpp.

Servers

The simplest way to start a length-prefix framing server is by using the high-level factory DSL. The entry point for this API is calling the caf::net::lp::with function:

caf::net::lp::with(sys)

Optionally, you can also provide a custom trait type by calling caf::net::lp::with<my_trait>(sys) instead. The default trait class caf::net::lp::default_trait configures the transport to exchange caf::net::lp::frame objects with the application, which essentially wraps raw bytes.

Once you have set up the factory, you can call the accept function on it to start accepting incoming connections. The accept function has multiple overloads:

  • accept(uint16_t port, std::string bind_address = "") for opening a new port. The bind_address optionally restricts which IP addresses may connect to the server. Passing an empty string (default) allows any client.
  • accept(tcp_accept_socket fd) for running the server on already configured TCP server socket.
  • accept(ssl::tcp_acceptor acc) for running the server on already configured TCP server socket with an SSL context. Using this overload after configuring an SSL context is an error, since CAF cannot use two SSL contexts on one server.

After setting up the factory and calling accept, you can use the following functions on the returned object:

  • do_on_error(F callback) installs a callback function that CAF calls if an error occurs while starting the server.
  • max_connections(size_t value) configures how many clients the server may allow to connect before blocking further connections attempts.
  • reuse_address(bool value) configures whether we create the server socket with SO_REUSEADDR. Has no effect if we have passed a socket or SSL acceptor to accept.
  • start(OnStart) to initialize the server and run it in the background. The OnStart callback takes an argument of type trait::acceptor_resource. This is a consumer resource for receiving accept events. Each accept event consists of an input resource and an output resource for reading from and writing to the new connection.

After calling start, CAF returns an expected<disposble>. On success, the disposable is a handle to the launched server and calling dispose on it stops the server. On error, the result contains a human-readable description of what went wrong.

Clients

Starting a client also uses the with factory. Calling connect instead of accept creates a factory for clients. The connect function may be called with:

  • connect(std::string host, uint16_t port) for connecting to the server at host on the given port.
  • connect(stream_socket fd) for establishing a WebSocket connection on an already connected TCP socket.
  • connect(ssl::tcp_acceptor acc) for establishing a WebSocket connection on an already established SSL connection.

After calling connect, we can configure various parameters:

  • do_on_error(F callback) installs a callback function that CAF calls if an error occurs while starting the server.
  • retry_delay(timespan) to set the delay between connection retries when a connection attempt fails.
  • connection_timeout(timespan) to set the maximum amount of time to wait for a connection attempt to succeed before giving up.
  • max_retry_count(size_t) to set the maximum number of times to retry a connection attempt before giving up.

Finally, we call start to launch the client. The function expects an OnStart callback takes two arguments: the input resource and the output resource for reading from and writing to the new connection.

Prometheus

CAF ships a Prometheus scraper implementation that allows exposing metrics from the actor system. User can create an instance of the scraper with the factory function caf::net::prometheus::scraper (include caf/net/prometheus.hpp). The scraper is designed to work with an HTTP server.

A minimal example for starting an HTTP server (see HTTP experimental) at port 8081 that responds to GET requests on /metrics could look like this:

auto server = net::http::with(sys)
                .accept(8081)
                .route("/metrics", net::prometheus::scraper(sys))
                .start();

Note

Usually, users can simply use the configuration options of the actor system to export metrics: Exporting Metrics to Prometheus. When setting these options, CAF uses this implementation to start the Prometheus server in the background.

WebSocket experimental

Servers

There are two ways to implement a WebSocket server with CAF. The first option is to create a standalone server that only accepts incoming WebSocket clients. The second option is to start a regular HTTP server and then use the caf::net::web_socket::switch_protocol factory to create a route for WebSocket clients.

In both cases, the server runs in the background and communicates asynchronously with the application logic over asynchronous buffer resource. Usually, this resource is used to create an observable on some user-defined actor that implements the server logic. The server passes connection events over this buffer, whereas each event then consists of two new buffer resources: one for receiving and one for sending messages from/to the WebSocket client.

Note

Closing either of the asynchronous resources will close the WebSocket connection. When only reading from a WebSocket connection, we can subscribe the output channel to a never observable. Likewise, we can pass std::ignore to the input observable for applications that are only interested in writing to the WebSocket connection.

Standalone Server

Note

For this API, include caf/net/web_socket/with.hpp.

Starting a WebSocket server has three distinct steps.

In the first step, we bind the server to an actor system and optionally configure SSL. The entry point is always calling the free function caf::net::web_socket::with that takes an actor_system as argument. Optionally, users may set the Trait template parameter (see Traits) of the function. When not setting this parameter, it defaults to caf::net::web_socket::default_trait. With this policy, the WebSocket sends and receives caf::net::web_socket::frame objects (see Frames).

On the result factory object, we can optionally call context to set an SSL context. Once we call accept, we enter the second phase of the setup. The accept function has multiple overloads:

  • accept(uint16_t port, std::string bind_address = "") for opening a new port. The bind_address optionally restricts which IP addresses may connect to the server. Passing an empty string (default) allows any client.
  • accept(tcp_accept_socket fd) for running the server on already configured TCP server socket.
  • accept(ssl::tcp_acceptor acc) for running the server on already configured TCP server socket with an SSL context. Using this overload after configuring an SSL context is an error, since CAF cannot use two SSL contexts on one server.

After calling accept, we enter the second step of configuring the server. Here, we can (optionally) fine-tune the server with these member functions:

  • do_on_error(F callback) installs a callback function that CAF calls if an error occurs while starting the server.
  • max_connections(size_t value) configures how many clients the server may allow to connect before blocking further connections attempts.
  • reuse_address(bool value) configures whether we create the server socket with SO_REUSEADDR. Has no effect if we have passed a socket or SSL acceptor to accept.

The second step is completed when calling on_request. This function require one argument: a function object that takes a net::web_socket::acceptor<Ts...>& argument, whereas the template parameter pack Ts... is freely chosen and may be empty. The types we select here allow us to pass any number of arguments to the connection event later on.

The third and final step is to call start with a function object that takes an asynchronous resource for processing connect events. The type is usually obtained from the configured trait class via Trait::acceptor_resource<Ts...>, whereas Ts... must be the same set of types we have used previously for the on_request handler.

After calling start, CAF returns an expected<disposble>. On success, the disposable is a handle to the launched server and calling dispose on it stops the server. On error, the result contains a human-readable description of what went wrong.

This example illustrates how the API looks when putting all the pieces together:

int caf_main(caf::actor_system& sys, const config& cfg) {
  namespace http = caf::net::http;
  namespace ssl = caf::net::ssl;
  namespace ws = caf::net::web_socket;
  // Read the configuration.
  auto port = caf::get_or(cfg, "port", default_port);
  auto pem = ssl::format::pem;
  auto key_file = caf::get_as<std::string>(cfg, "tls.key-file");
  auto cert_file = caf::get_as<std::string>(cfg, "tls.cert-file");
  auto max_connections = caf::get_or(cfg, "max-connections",
                                     default_max_connections);
  if (!key_file != !cert_file) {
    std::cerr << "*** inconsistent TLS config: declare neither file or both\n";
    return EXIT_FAILURE;
  }
  // Open up a TCP port for incoming connections and start the server.
  using trait = ws::default_trait;
  auto server
    = ws::with(sys)
        // Optionally enable TLS.
        .context(ssl::context::enable(key_file && cert_file)
                   .and_then(ssl::emplace_server(ssl::tls::v1_2))
                   .and_then(ssl::use_private_key_file(key_file, pem))
                   .and_then(ssl::use_certificate_file(cert_file, pem)))
        // Bind to the user-defined port.
        .accept(port)
        // Limit how many clients may be connected at any given time.
        .max_connections(max_connections)
        // Accept only requests for path "/".
        .on_request([](ws::acceptor<>& acc) {
          // The header parameter contains fields from the WebSocket handshake
          // such as the path and HTTP header fields..
          auto path = acc.header().path();
          std::cout << "*** new client request for path " << path << std::endl;
          // Accept the WebSocket connection only if the path is "/".
          if (path == "/") {
            // Calling `accept` causes the server to acknowledge the client and
            // creates input and output buffers that go to the worker actor.
            acc.accept();
          } else {
            // Calling `reject` aborts the connection with HTTP status code 400
            // (Bad Request). The error gets converted to a string and sent to
            // the client to give some indication to the client why the request
            // was rejected.
            auto err = caf::make_error(caf::sec::invalid_argument,
                                       "unrecognized path, try '/'");
            acc.reject(std::move(err));
          }
          // Note: calling nothing on `acc` also rejects the connection.
        })
        // When started, run our worker actor to handle incoming connections.
        .start([&sys](trait::acceptor_resource<> events) {
          sys.spawn([events](caf::event_based_actor* self) {
            // For each buffer pair, we create a new flow ...
            self->make_observable()
              .from_resource(events) //
              .for_each([self](const trait::accept_event<>& ev) {
                // ... that simply pushes data back to the sender.
                auto [pull, push] = ev.data();
                pull.observe_on(self)
                  .do_on_error([](const caf::error& what) { //
                    std::cout << "*** connection closed: " << to_string(what)
                              << std::endl;
                  })
                  .do_on_complete([] { //
                    std::cout << "*** connection closed" << std::endl;
                  })
                  .do_on_next([](const ws::frame& x) {
                    if (x.is_binary()) {
                      std::cout
                        << "*** received a binary WebSocket frame of size "
                        << x.size() << std::endl;
                    } else {
                      std::cout
                        << "*** received a text WebSocket frame of size "
                        << x.size() << std::endl;
                    }
                  })
                  .subscribe(push);
              });
          });
        });
  // Report any error to the user.
  if (!server) {
    std::cerr << "*** unable to run at port " << port << ": "
              << to_string(server.error()) << '\n';
    return EXIT_FAILURE;
  }
  // Note: the actor system will keep the application running for as long as the
  // worker from .start() is still alive.
  return EXIT_SUCCESS;
}

Note

For details on ssl::context::enable, please see SSL.

HTTP Server with a WebSocket Route

Note

For this API, include caf/net/web_socket/switch_protocol.hpp.

Sometimes, we want a server to accept WebSocket connections only on a specific path and otherwise act as regular HTTP server. This use case is covered by caf::net::web_socket::switch_protocol. The function is similar to with in that it takes a template parameter for the trait, but it requires no arguments. In this version, we skip the accept step (since the HTTP server takes care of accepting connections) and call on_request as the first step. Then, we call on_start (instead of start) to add the WebSocket server logic when starting up the parent server.

The following snippet showcases the setup for adding a WebSocket route to an HTTP server (you can find the full example under examples/web_socket/quote-server.cpp):

        // On "/ws/quotes/<arg>", we switch the protocol to WebSocket.
        .route("/ws/quotes/<arg>", http::method::get,
               ws::switch_protocol()
                 // Check that the client asks for a known philosopher.
                 .on_request(
                   [](ws::acceptor<caf::cow_string>& acc, std::string name) {
                     auto quotes = quotes_by_name(name);
                     if (quotes.empty()) {
                       auto err = make_error(caf::sec::invalid_argument,
                                             not_found_str(name));
                       acc.reject(std::move(err));
                     } else {
                       // Forward the name to the WebSocket worker.
                       acc.accept(caf::cow_string{std::move(name)});
                     }
                   })
                 // Spawn a worker for the WebSocket clients.
                 .on_start(
                   [&sys](trait::acceptor_resource<caf::cow_string> events) {
                     // Spawn a worker that reads from `events`.
                     using event_t = trait::accept_event<caf::cow_string>;
                     sys.spawn([events](caf::event_based_actor* self) {
                       // Each WS connection has a pull/push buffer pair.
                       self->make_observable()
                         .from_resource(events) //
                         .for_each([self](const event_t& ev) mutable {
                           // Forward the quotes to the client.
                           auto [pull, push, name] = ev.data();
                           auto quotes = quotes_by_name(name);
                           assert(!quotes.empty()); // Checked in on_request.
                           self->make_observable()
                             .from_container(quotes)
                             .map([](std::string_view quote) {
                               return ws::frame{quote};
                             })
                             .subscribe(push);
                           // We ignore whatever the client may send to us.
                           pull.observe_on(self).subscribe(std::ignore);
                         });
                     });
                   }))
        .route("/status", http::method::get,
               [](http::responder& res) {
                 res.respond(http::status::no_content);
               })

Clients

Clients use the same entry point as standalone servers, i.e., caf::net::web_socket::with. However, instead of calling accept, we call connect. Like accept, CAF offers multiple overloads for this function:

  • connect(std::string host, uint16_t port) for connecting to the server at host on the given port.
  • connect(uri endpoint) for connecting to the given server. The URI scheme must be either ws or wss.
  • connect(expected<uri> endpoint) like the previous overload. Forwards the error in case endpoint does not represent an actual value. Useful for passing the result of make_uri to connect directly without having to check the result first.
  • connect(stream_socket fd) for establishing a WebSocket connection on an already connected TCP socket.
  • connect(ssl::tcp_acceptor acc) for establishing a WebSocket connection on an already established SSL connection.

Note

When configuring an SSL context prior to calling connect, the URI scheme must be wss. When not configuring an SSL prior to calling connect and passing an URI with scheme wss, CAF will automatically create a SSL context for the connection.

After calling connect, we can configure various parameters:

  • do_on_error(F callback) installs a callback function that CAF calls if an error occurs while starting the server.
  • retry_delay(timespan) to set the delay between connection retries when a connection attempt fails.
  • connection_timeout(timespan) to set the maximum amount of time to wait for a connection attempt to succeed before giving up.
  • max_retry_count(size_t) to set the maximum number of times to retry a connection attempt before giving up.

Finally, we call start to launch the client. The function takes a function object that must take two parameters: the input and output resources for reading from and writing to the WebSocket connection. The types may be obtained from the trait class (caf::net::web_socket::default_trait unless passing a different type to with) via input_resource and output_resource. However, the function object (lambda) may also take the two parameters as auto for brevity, as shown in the example below.

int caf_main(caf::actor_system& sys, const config& cfg) {
  namespace ws = caf::net::web_socket;
  // Sanity checking.
  auto server = caf::get_as<caf::uri>(cfg, "server");
  if (!server) {
    std::cerr << "*** mandatory argument server missing or invalid\n";
    return EXIT_FAILURE;
  }
  // Ask the user for the hello message.
  std::string hello;
  std::cout << "Please enter a hello message for the server: " << std::flush;
  std::getline(std::cin, hello);
  // Try to establish a connection to the server and send the hello message.
  auto conn
    = ws::with(sys)
        // Connect to the given URI.
        .connect(server)
        // If we don't succeed at first, try up to 10 times with 1s delay.
        .retry_delay(1s)
        .max_retry_count(9)
        // On success, spin up a worker to manage the connection.
        .start([&sys, hello](auto pull, auto push) {
          sys.spawn([hello, pull, push](caf::event_based_actor* self) {
            // Open the pull handle.
            pull
              .observe_on(self)
              // Print errors to stderr.
              .do_on_error([](const caf::error& what) {
                std::cerr << "*** error while reading from the WebSocket: "
                          << to_string(what) << std::endl;
              })
              // Restrict how many messages we receive if the user configured
              // a limit.
              .compose([self](auto in) {
                if (auto limit = caf::get_as<size_t>(self->config(), "max")) {
                  return std::move(in).take(*limit).as_observable();
                } else {
                  return std::move(in).as_observable();
                }
              })
              // Print a bye-bye message if the server closes the connection.
              .do_on_complete([] { //
                std::cout << "Server has closed the connection" << std::endl;
              })
              // Print everything from the server to stdout.
              .for_each([](const ws::frame& msg) {
                if (msg.is_text()) {
                  std::cout << "Server: " << msg.as_text() << std::endl;
                } else if (msg.is_binary()) {
                  std::cout << "Server: [binary message of size "
                            << msg.as_binary().size() << "]" << std::endl;
                }
              });
            // Send our hello message and wait until the server closes the
            // socket. We keep the connection alive by never closing the write
            // channel.
            self->make_observable()
              .just(ws::frame{hello})
              .concat(self->make_observable().never<ws::frame>())
              .subscribe(push);
          });
        });
  if (!conn) {
    std::cerr << "*** unable to connect to " << server->str() << ": "
              << to_string(conn.error()) << std::endl;
    return EXIT_FAILURE;
  }
  // Note: the actor system will keep the application running for as long as the
  // workers are still alive.
  return EXIT_SUCCESS;
}

Frames

The WebSocket protocol operates on so-called frames. A frame contains a single text or binary message. The class caf::net::web_socket::frame is an implicitly-shared handle type that represents a single WebSocket frame (binary or text).

The most commonly used member functions are as follows:

  • size_t size() const noexcept returns the size of the frame in bytes.
  • bool empty() const noexcept queries whether the frame has a size of 0.
  • bool is_binary() const noexcept queries whether the frame contains raw Bytes.
  • bool is_text() const noexcept queries whether the frame contains a text message.
  • const_byte_span as_binary() const noexcept accesses the bytes of a binary message.
  • std::string_view as_text() const noexcept accesses the characters of a text message.

For the full class interface, please refer to the Doxygen documentation.

Traits

A trait translates between text or binary frames on the network and the application by defining C++ types for reading and writing from/to the WebSocket connection. The trait class also binds these types to the asynchronous resources that connect the WebSocket in the background to the application logic.

The interface of the default looks as follows:

/// Configures a WebSocket server or client to operate on the granularity of
/// regular WebSocket frames.
class CAF_NET_EXPORT default_trait {
public:
  /// The input type of the application, i.e., what that flows from the
  /// WebSocket to the application layer.
  using input_type = frame;

  /// The output type of the application, i.e., what flows from the application
  /// layer to the WebSocket.
  using output_type = frame;

  /// A resource for consuming input_type elements.
  using input_resource = async::consumer_resource<input_type>;

  /// A resource for producing output_type elements.
  using output_resource = async::producer_resource<output_type>;

  /// An accept event from the server to transmit read and write handles.
  template <class... Ts>
  using accept_event = cow_tuple<input_resource, output_resource, Ts...>;

  /// A resource for consuming accept events.
  template <class... Ts>
  using acceptor_resource = async::consumer_resource<accept_event<Ts...>>;

  /// Queries whether `x` should be serialized as binary frame (`true`) or text
  /// frame (`false`).
  bool converts_to_binary(const output_type& x);

  /// Serializes an output to raw bytes for sending a binary frame
  /// (`converts_to_binary` returned `true`).
  bool convert(const output_type& x, byte_buffer& bytes);

  /// Serializes an output to ASCII (or UTF-8) characters for sending a text
  /// frame (`converts_to_binary` returned `false`).
  bool convert(const output_type& x, std::vector<char>& text);

  /// Converts the raw bytes from a binary frame to the input type.
  bool convert(const_byte_span bytes, input_type& x);

  /// Converts the characters from a text frame to the input type.
  bool convert(std::string_view text, input_type& x);

  /// Returns a description of the error if any of the `convert` functions
  /// returned `false`.
  error last_error();
};

Users may implement custom trait types by providing the same member functions and type aliases.

Frequently Asked Questions

This Section is a compilation of the most common questions via GitHub, chat, and mailing list.

Can I Encrypt CAF Communication?

Yes, by using the OpenSSL module (see Free Functions).

Can I Create Messages Dynamically?

Yes.

Usually, messages are created implicitly when sending messages but can also be created explicitly using make_message. In both cases, types and number of elements are known at compile time. To allow for fully dynamic message generation, CAF also offers message_builder:

message_builder mb;
// prefix message with some atom
mb.append(strings_atom::value);
// fill message with some strings
std::vector<std::string> strings{/*...*/};
for (auto& str : strings)
  mb.append(str);
// create the message
message msg = mb.to_message();

What Debugging Tools Exist?

The scripts/ directory contains some useful tools to aid in analyzing CAF log output.

Common Pitfalls

This Section highlights common mistakes or C++ subtleties that can show up when programming in CAF.

Defining Message Handlers

C++ evaluates comma-separated expressions from left-to-right, using only the last element as return type of the whole expression. This means that message handlers and behaviors must not be initialized like this:

message_handler wrong = (
  [](int i) { /*...*/ },
  [](float f) { /*...*/ }
);

The correct way to initialize message handlers and behaviors is to either use the constructor or the member function assign:

message_handler ok1{
  [](int i) { /*...*/ },
  [](float f) { /*...*/ }
};

message_handler ok2;
// some place later
ok2.assign(
  [](int i) { /*...*/ },
  [](float f) { /*...*/ }
);

Event-Based API

The member function become does not block, i.e., always returns immediately. Thus, lambda expressions should always capture by value. Otherwise, all references on the stack will cause undefined behavior if the lambda expression is executed.

Requests

A handle returned by request represents exactly one response message. It is not possible to receive more than one response message.

The handle returned by request is bound to the calling actor. It is not possible to transfer a handle to a response to another actor.

Sharing

It is strongly recommended to not share states between actors. In particular, no actor shall ever access member variables or member functions of another actor. Accessing shared memory segments concurrently can cause undefined behavior that is incredibly hard to find and debug. However, sharing data between actors is fine, as long as the data is immutable and its lifetime is guaranteed to outlive all actors. The simplest way to meet the lifetime guarantee is by storing the data in smart pointers such as std::shared_ptr. Nevertheless, the recommended way of sharing information is message passing. Sending the same message to multiple actors does not result in copying the data several times.

Using aout – A Concurrency-safe Wrapper for cout

When using cout from multiple actors, output often appears interleaved. Moreover, using cout from multiple actors – and thus from multiple threads – in parallel should be avoided regardless, since the standard does not guarantee a thread-safe implementation.

By replacing std::cout with caf::aout, actors can achieve a concurrency-safe text output. The header caf/all.hpp also defines overloads for std::endl and std::flush for aout, but does not support the full range of ostream operations (yet). Each write operation to aout sends a message to a “hidden” actor. This actor only prints lines, unless output is forced using flush. The example below illustrates printing of lines of text from multiple actors (in random order).

#include "caf/actor_ostream.hpp"
#include "caf/actor_system.hpp"
#include "caf/caf_main.hpp"
#include "caf/event_based_actor.hpp"

#include <chrono>
#include <cstdlib>
#include <iostream>
#include <random>

using namespace caf;

behavior printer(event_based_actor* self, int32_t num, int32_t delay) {
  aout(self) << "Hi there! This is actor nr. " << num << "!" << std::endl;
  std::chrono::milliseconds timeout{delay};
  self->delayed_send(self, timeout, timeout_atom_v);
  return {
    [=](timeout_atom) {
      aout(self) << "Actor nr. " << num << " says goodbye after waiting for "
                 << delay << "ms!" << std::endl;
    },
  };
}

void caf_main(actor_system& sys) {
  std::random_device rd;
  std::minstd_rand re(rd());
  std::uniform_int_distribution<int32_t> dis{1, 99};
  for (int32_t i = 1; i <= 50; ++i)
    sys.spawn(printer, i, dis(re));
}

CAF_MAIN()