Message Passing¶
Sending and receiving messages is a fundamental concept in CAF. Actors communicate by sending messages to each other. A message is a tuple of values that can be delivered to local or remote actors (network transparency).
The class message
holds a sequence of values of arbitrary types. Users can
create messages directly by calling make_message(...)
or by using a
message_builder
. The latter allows users to build messages incrementally.
However, users rarely interact with messages directly. Instead, they define message handlers that process incoming messages and create message implicitly when using the message passing API.
Copy on Write¶
A message
in CAF is a copy-on-write (COW) type (see
Copy-on-Write Types). This means that copying a message is cheap because
it only copies the reference to the message content. The actual copying of the
content only happens when one of the copies is modified.
This allows sending the same message to multiple receivers without copying overhead, as long as all receivers only read the content of the message.
Actors copy message contents whenever other actors hold references to it and if one or more arguments of a message handler take a mutable reference. Again, this is transparent to the user most of the time. However, it is still important to know about the COW semantics for understanding the performance characteristics of an actor system.
Sending Messages: The Mail API¶
Sending messages in CAF starts by calling the member function mail
or the
free function anon_mail
. The arguments of these functions are the contents
of the message. Then, CAF returns a builder object that allows users to specify
additional information about the message, such as the priority or whether the
message shall be send after some delay:
Calling
urgent()
on the builder object sets the priority of the message tomessage_priority::high
. This causes the receiver to process the message before regular messages.Calling
schedule(actor_clock::time_point timeout)
on the builder object instructs CAF to send the message at a specific point in time.Calling
delay(actor_clock::duration timeout)
on the builder object instructs CAF to send the message after a relative timeout has passed.
In all cases, CAF returns a new builder object that allows users to chain multiple calls. The final call to the builder object is one of:
send(Handle receiver)
to send the message to a specific actor as an asynchronous message (fire-and-forget).delegate(Handle receiver)
to send the message to a specific actor and transfer the responsibility for responding to the original sender (see Delegating Messages).request(Handle receiver, timespan timeout)
to send the message to a specific actor as a request message. CAF will raise an error if the receiver does not respond within the specified timeout (passinginfinite
disables the timeout).
When sending a delayed or scheduled message, these member functions have two
additional, optional parameters: a RefTag
and a SelfRefTag
. These tags
configure what kind of reference CAF will hold onto for the actors while the
message is pending. The RefTag
specifies what kind of reference CAF will
hold onto for the receiver of the message. By default, CAF uses strong_ref
,
which instructs CAF to store a strong reference to the receiver. Passing
weak_ref
instead will cause CAF to store a weak reference instead. Likewise,
SelfRefTag
specifies what kind of reference CAF will hold onto for the
sender of the message. By default, CAF uses strong_self_ref
, which instructs
CAF to store a strong reference to the sender. Passing weak_self_ref
will
cause CAF to store a weak reference instead.
When using weak references, CAF will try to convert them to strong references before sending the message. If the conversion fails, CAF will drop the message.
When calling send
or delegate
, the result is either void
(immediate
send) or a disposable
(delayed or scheduled send). The latter allows users
to cancel a send operation while it is still pending.
When calling request
, CAF will return a handle for the response message.
This handle either offers then
and await
member functions when using an
event-based actor or receive
when using a blocking actor (see
Requests)
Note: the builder object from anon_send
only supports send
.
Requirements for Message Types¶
Message types in CAF must meet the following requirements:
Inspectable (see Type Inspection)
Default constructible
Copy constructible
A type T
is inspectable if it provides a free function
inspect(Inspector&, T&)
or specializes inspector_access
.
Requirement 2 is a consequence of requirement 1, because CAF needs to be able to
create an object for T
when deserializing incoming messages. Requirement 3
allows CAF to implement Copy on Write (see Copy on Write).
Matching Messages¶
The receiver of a message processes incoming messages by applying a matching callback from their behavior (see Message Handlers). CAF will automatically move the message content into the message handler if possible. This allows users to write message handlers that take arguments by value. If there are multiple references to the message content, CAF will copy the values instead.
When taking arguments by const reference, CAF will never cause a copy of the message content.
For example, the following example actor will process get
and put
messages for key-value pairs:
behavior kvp_actor_impl() {
return {
[](caf::put_atom, std::string key, std::string val) {
// ...
},
[](caf::get_atom, const std::string& key) {
// ...
},
};
}
When receiving a put
message, CAF checks whether the message has a reference
count of exactly one. If this is the case, CAF will move the content of the
message into the lambda, i.e., key
and val
will be moved. Otherwise,
both strings will be copied.
When receiving a get
message, CAF will simply pass a reference for key
from the message content. Since key
asks for read-only access, CAF can
safely pass a reference to the message content.
Requests¶
A main feature of CAF is its ability to couple input and output types via the
type system. For example, a typed_actor<result<int32_t>(int32_t)>
essentially behaves like a function. It receives a single int32_t
as input
and responds with another int32_t
. CAF embraces this functional take on
actors by simply creating response messages from the result of message handlers.
This allows CAF to match request to response messages and to provide a
convenient API for this style of communication.
Sending Requests and Handling Responses¶
Actors send request messages by calling
mail(content...).request(receiver, timeout)
. This function returns an
intermediate object that allows an actor to set a one-shot handler for the
response message. Event-based actors can use either request(...).then
or
request(...).await
. The former multiplexes the one-shot handler with the
regular actor behavior and handles requests as they arrive. The latter suspends
the regular actor behavior until all awaited responses arrive and handles
requests in LIFO order. Blocking actors always use request(...).receive
,
which blocks until the one-shot handler was called. Actors receive a
sec::request_timeout
(see Default Error Codes) error message (see
Error Messages) if a timeout occurs. Users can set the timeout to
infinite
for unbound operations. This is only recommended if the receiver is
known to run locally.
In our following example, we use the simple cell actor shown below as communication endpoint.
struct cell_trait {
using signatures
= type_list<result<void>(put_atom, int32_t), // 'put' writes to the cell
result<int32_t>(get_atom)>; // 'get 'reads from the cell
};
using cell = typed_actor<cell_trait>;
struct cell_state {
static constexpr inline const char* name = "cell";
cell::pointer self;
int32_t value;
cell_state(cell::pointer ptr, int32_t val) : self(ptr), value(val) {
// nop
}
cell::behavior_type make_behavior() {
return {
[this](put_atom, int32_t val) { value = val; },
[this](get_atom) { return value; },
};
}
};
To showcase the slight differences in API and processing order, we implement three testee actors that all operate on a list of cell actors.
void waiting_testee(event_based_actor* self, vector<cell> cells) {
for (auto& x : cells)
self->mail(get_atom_v).request(x, 1s).await([self, x](int32_t y) {
self->println("cell #{} -> {}", x.id(), y);
});
}
void multiplexed_testee(event_based_actor* self, vector<cell> cells) {
for (auto& x : cells)
self->mail(get_atom_v).request(x, 1s).then([self, x](int32_t y) {
self->println("cell #{} -> {}", x.id(), y);
});
}
void blocking_testee(scoped_actor& self, vector<cell> cells) {
for (auto& x : cells)
self->mail(get_atom_v)
.request(x, 1s)
.receive([&](int32_t y) { self->println("cell #{} -> {}", x.id(), y); },
[&](error& err) {
self->println("cell #{} -> {}", x.id(), err);
});
}
Our caf_main
for the examples spawns five cells and assign the initial
values 0, 1, 4, 9, and 16. Then it spawns one instance for each of our testee
implementations and we can observe the different outputs.
Our waiting_testee
actor will always print:
cell #9 -> 16
cell #8 -> 9
cell #7 -> 4
cell #6 -> 1
cell #5 -> 0
This is because await
puts the one-shots handlers onto a stack and
enforces LIFO order by re-ordering incoming response messages as necessary.
The multiplexed_testee
implementation does not print its results in
a predicable order. Response messages arrive in arbitrary order and are handled
immediately.
Finally, the blocking_testee
has a deterministic output again. This is
because it blocks on each request until receiving the result before making the
next request.
cell #5 -> 0
cell #6 -> 1
cell #7 -> 4
cell #8 -> 9
cell #9 -> 16
Both event-based approaches send all requests, install a series of one-shot handlers, and then return from the implementing function. In contrast, the blocking function waits for a response before sending another request.
Error Handling in Requests¶
Requests allow CAF to unambiguously correlate request and response messages.
This is also true if the response is an error message. Hence, CAF allows to add
an error handler as optional second parameter to then
and await
(this
parameter is mandatory for receive
). If no such handler is defined, the
default error handler (see Error Messages) is used as a fallback in
scheduled actors.
As an example, we consider a simple divider that returns an error on a division by zero. This examples uses a custom error category (see Errors).
struct divider_trait {
using signatures = type_list<result<double>(div_atom, double, double)>;
};
using divider = typed_actor<divider_trait>;
divider::behavior_type divider_impl() {
return {
[](div_atom, double x, double y) -> result<double> {
if (y == 0.0)
return math_error::division_by_zero;
return x / y;
},
};
}
When sending requests to the divider, we can use a custom error handlers to report errors to the user like this:
auto div = system.spawn(divider_impl);
scoped_actor self{system};
self->mail(div_atom_v, x, y)
.request(div, 10s)
.receive([&](double z) { self->println("{} / {} = {}", x, y, z); },
[&](const error& err) {
self->println("*** cannot compute {} / {} => {}", x, y, err);
});
Delegating Messages¶
Actors can transfer responsibility for a request by using delegate
.
This enables the receiver of the delegated message to reply as usual—simply
by returning a value from its message handler—and the original sender of the
message will receive the response. The following diagram illustrates request
delegation from actor B to actor C.
A B C
| | |
| ---(request)---> | |
| | ---(delegate)--> |
| X |---\
| | | compute
| | | result
| |<--/
| <-------------(reply)-------------- |
| X
X
Returning the result of delegate(...)
from a message handler, as
shown in the example below, suppresses the implicit response message and allows
the compiler to check the result type when using statically typed actors.
struct adder_trait {
using signatures = type_list<result<int32_t>(add_atom, int32_t, int32_t)>;
};
using adder_actor = typed_actor<adder_trait>;
adder_actor::behavior_type worker_impl() {
return {
[](add_atom, int32_t x, int32_t y) { return x + y; },
};
}
adder_actor::behavior_type server_impl(adder_actor::pointer self,
adder_actor worker) {
return {
[self, worker](add_atom add, int32_t x, int32_t y) {
return self->mail(add, x, y).delegate(worker);
},
};
}
void client_impl(event_based_actor* self, adder_actor adder, int32_t x,
int32_t y) {
self->mail(add_atom_v, x, y).request(adder, 10s).then([=](int32_t result) {
self->println("{} + {} = {}", x, y, result);
});
}
void caf_main(actor_system& sys) {
auto worker = sys.spawn(worker_impl);
auto server = sys.spawn(server_impl, sys.spawn(worker_impl));
sys.spawn(client_impl, server, 1, 2);
}
Response Promises¶
Response promises allow an actor to send and receive other messages prior to
replying to a particular request. Actors create a response promise using
self->make_response_promise<Ts...>()
, where Ts
is a
template parameter pack describing the promised return type. Dynamically typed
actors simply call self->make_response_promise()
. After retrieving
a promise, an actor can fulfill it by calling the member function
deliver(...)
, as shown in the following example.
struct adder_trait {
using signatures
= caf::type_list<result<int32_t>(add_atom, int32_t, int32_t)>;
};
using adder_actor = typed_actor<adder_trait>;
adder_actor::behavior_type worker_impl() {
return {
[](add_atom, int32_t x, int32_t y) { return x + y; },
};
}
adder_actor::behavior_type server_impl(adder_actor::pointer self,
adder_actor worker) {
return {
[=](add_atom, int32_t y, int32_t z) {
auto rp = self->make_response_promise<int32_t>();
self->mail(add_atom_v, y, z)
.request(worker, infinite)
.then([rp](int32_t result) mutable { rp.deliver(result); },
[rp](error& err) mutable { rp.deliver(std::move(err)); });
return rp;
},
};
}
void client_impl(event_based_actor* self, adder_actor adder, int32_t x,
int32_t y) {
self->mail(add_atom_v, x, y).request(adder, 10s).then([=](int32_t result) {
self->println("{} + {} = {}", x, y, result);
});
}
void caf_main(actor_system& sys) {
auto worker = sys.spawn(worker_impl);
auto server = sys.spawn(server_impl, sys.spawn(worker_impl));
sys.spawn(client_impl, server, 1, 2);
}
This example is almost identical to the example for delegating messages. However, there is a big difference in the flow of messages. In our first version, the worker (C) directly responded to the client (A). This time, the worker sends the result to the server (B), which then fulfills the promise and thereby sends the result to the client:
A B C
| | |
| ---(request)---> | |
| | ---(request)---> |
| | |---\
| | | | compute
| | | | result
| | |<--/
| | <----(reply)---- |
| | X
| <----(reply)---- |
| X
X
Special Message Types¶
CAF has a few system-level message types such as exit_msg
and error
that have a special meaning in the actor system and have default handlers in
all actors. These messages are not part of the user-level API and are not
visible to users unless they explicitly handle them.
Exit Messages¶
Bidirectional monitoring with a strong lifetime coupling is established by
calling self->link_to(other)
. This will cause the runtime to send an
exit_msg
if either this
or other
dies. Per default, actors terminate
after receiving an exit_msg
unless the exit reason is
exit_reason::normal
. This mechanism propagates failure states in an actor
system. Linked actors form a sub system in which an error causes all actors to
fail collectively. Actors can override the default by providing a handler for
exit_msg
.
Error Messages¶
Actors send error messages to others by returning an error
(see
Errors) from a message handler. Similar to exit messages, error messages
usually cause the receiving actor to terminate, unless the behavior includes a
handler for error
. The default handler for errors in actors will terminate
the actor.
Idle Timeouts¶
Event-based actors can set an idle timeout to wake up after a certain period of not receiving any messages. This is useful for actors that observe external events and need to perform some cleanup or error handling if no events arrive for a while.
To set a timeout, actors call
self->set_idle_timeout(duration, ref_type, repeat_policy, callback)
,
whereas:
duration
is the amount of time to wait before the timeout triggers. Whenever the actor handles a message, the timeout resets.ref_type
specifies whether CAF should hold a strong or weak reference to the actor while it is idle. This parameter must be eitherstrong_ref
orweak_ref
. When in doubt, usestrong_ref
.repeat_policy
specifies whether the timeout should trigger only once or repeatedly. This parameter must be eitheronce
orrepeat
.callback
is a function object taking no arguments. CAF calls this function whenever the timeout triggers.
The messages that trigger the timeout are handled transparently by CAF and do use the same message handler as regular messages.