Data Flows¶
Data flows, or streams, are potentially unbound sequences of values. The flow API in CAF makes it easy to generate, transform, and consume observable sequences with a ReactiveX-style interface.
Data flows in CAF use backpressure to make sure that fast senders cannot overwhelm receivers.
We do not assume prior experience with ReactiveX for this chapter and there are some key differences to ReactiveX implementations that we point out in Key Differences to ReactiveX.
Introduction¶
The fundamental building blocks of the flow API are observable
, observer
and subscription
.
observable
Emits a potentially unbound sequence of values to observers that have subscribed to the observable. Offers the single member function
subscribe
to add more observers.observer
Subscribes to and consumes values from an observable. This interface bundles callbacks for the observable, namely
on_subscribe
,on_next
,on_complete
andon_error
.subscription
Manages the flow of items between an observable and an observer. An observer calls
request
to ask for more items ordispose
to stop receiving data.
When working with data flows, these interfaces usually remain hidden and
applications leverage high-level operators that either generate or transform
an observable
. For example, the code snippet below illustrates a trivial
data flow for integers inside a single actor that only uses the high-level
composition API without any manual setup for observers or subscriptions:
void caf_main(caf::actor_system& sys, const config& cfg) {
auto n = get_or(cfg, "num-values", default_num_values);
sys.spawn([n](caf::event_based_actor* self) {
self
// Get an observable factory.
->make_observable()
// Produce an integer sequence starting at 1, i.e., 1, 2, 3, ...
.iota(1)
// Only take the requested number of items from the infinite sequence.
.take(n)
// Print each integer.
.for_each([self](int x) { self->println("{}", x); });
});
}
In the concurrency model of CAF, all of these building blocks describe
processing steps that happen inside of an actor. The actor owns all of its
observer
, observable
and subscription
objects and they cannot
outlive their parent actor. This means that an observable
must not be
shared with others.
To move data from one actor to another, CAF provides asynchronous buffers. The following figure depicts the general architecture when working with data flows across actor boundaries:
Here, actor A creates an observbale
and then applies the flat_map
and
filter
operators to it. The resulting items then flow into an asynchronous
buffer that connects to actor B. Actor B then applies the map
and filter
operators to the incoming data before terminating the data flow, e.g., by
printing each value.
Concurrent Processing¶
Flows that only run inside a single actors are of course quite useless outside of toy examples. For running different parts of a data flow on different actors, CAF offers two APIs: one for setting up a processing chain declaratively and one for setting up processing chains dynamically.
Declarative Setup: observe_on
¶
If the entire processing chain is known at coding time, observe_on
provides
the easiest way to assign work to individual actors. The following example
revisits our first example, but this time generates the numbers on one actor and
then prints them on another.
void caf_main(caf::actor_system& sys, const config& cfg) {
// Create two actors without actually running them yet.
auto n = get_or(cfg, "num-values", default_num_values);
auto [src, launch_src] = sys.spawn_inactive();
auto [snk, launch_snk] = sys.spawn_inactive();
// Define our data flow: generate data on `src` and print it on `snk`.
src
// Get an observable factory.
->make_observable()
// Produce an integer sequence starting at 1, i.e., 1, 2, 3, ...
.iota(1)
// Only take the requested number of items from the infinite sequence.
.take(n)
// Switch to `snk` for further processing.
.observe_on(snk)
// Print each integer.
.for_each([snk = snk](int x) { snk->println("{}", x); });
// Allow the actors to run. After this point, we may no longer dereference
// the `src` and `snk` pointers! Calling these manually is optional. When
// removing these two lines, CAF automatically launches the actors at scope
// exit.
launch_src();
launch_snk();
}
Please note that calling observe_on
requires that the target actor is
inactive. Otherwise, this function call results in unsynchronized state access.
Dynamic Setup: Asynchronous Buffers¶
Our second option for spanning data flows across multiple actors is using SPSC
(Single Producer Single Consumer) buffers. This option is more general. In fact,
observe_on
internally uses these buffers for connecting the actors. Further,
the buffers allows bridging flows between actor and non-actor code.
While one could use an SPSC buffer directly, they usually remain hidden behind
another abstraction: asynchronous resources. The resources in CAF usually come
in pairs and users may create new ones by calling make_spsc_buffer_resource
.
This function returns a producer resource and a consumer resource. With these
two resources, we can then spawn actors that open the resources for either
reading or writing.
To illustrate how the API pieces fit together, we revisit our example a third time. This time, we spawn the actors individually and connect them via the buffer resources:
void caf_main(caf::actor_system& sys, const config& cfg) {
auto [pull, push] = caf::async::make_spsc_buffer_resource<int>();
auto n = get_or(cfg, "num-values", default_num_values);
sys.spawn(sink, std::move(pull));
sys.spawn(source, std::move(push), n);
}
In this iteration of our example, we have moved the implementation for the source and sink actors to their own functions. The source once again creates the data, only this time we subscribe the buffer to the generated sequence:
// Simple source for generating a stream of integers from 1 to n.
void source(caf::event_based_actor* self,
caf::async::producer_resource<int> out, size_t n) {
self
// Get an observable factory.
->make_observable()
// Produce an integer sequence starting at 1, i.e., 1, 2, 3, ...
.iota(1)
// Only take the requested number of items from the infinite sequence.
.take(n)
// Subscribe the resource to the sequence, thereby starting the stream.
.subscribe(out);
}
For the sink, we generate an observable from the consumer resource and then once
more call for_each
:
// Simple sink for consuming a stream of integers, printing it to stdout.
void sink(caf::event_based_actor* self, caf::async::consumer_resource<int> in) {
self
// Get an observable factory.
->make_observable()
// Lift the input to an observable flow.
.from_resource(std::move(in))
// Print each integer.
.for_each([self](int x) { self->println("{}", x); });
}
Building and Transforming Observables¶
When building processing pipelines, CAF fuses as many processing steps as
possible into a single C++ object. In our examples, we composed the source part
like this:
self->make_observable().from_callable(...).take(...)...
.
The first bit, self->make_observable()
, returns an observable_builder
.
This class implements factory functions for creating observable sequences from
containers, repeated values, and so on. However, most functions do not actually
return an observable
. Instead, they return a generation<...>
object.
The generation
class is a variadic template that allows CAF to incrementally
define consecutive processing steps. In our example, we call from_callable
on the builder object, which returns a generation<callable_source<...>>
. The
generation is meant as temporary object only. Hence, most member functions
may only get called on an rvalue.
After calling .take(...)
on the returned generation
, we get a new
temporary object of type generation<callable_source<...>, limit_step<...>>
.
The generation
class also mimics the interface of observable
. When
calling a member function that requires an actual observable
, CAF uses the
blueprint stored in the generation
to create an actual observable object and
then forward the member function call. For example, calling for_each
on a
generation
internally constructs the observable
and then calls
for_each
on that new object.
Users can also call as_observable
on a generation
explicitly to turn the
blueprint into an actual observable sequence.
By delaying the construction of actual observable
instances, CAF can fuse
consecutive steps into single objects. This reduces the number of heap
allocations and also accelerates processing, since the fused processing steps
result in simple function call chains without subscriptions and backpressure
between them.
Analogues to the generation
class for creating new observables from inputs,
CAF uses a template called transformation
that represents a blueprint for
applying operators to existing observables.
Operators¶
Most operators transform an observable by applying one or more processing steps on all observed values and then emit the result as a new observable. Since the result of a transformation usually is new observable, these operators compose into complex data stream operations.
The operators presented here are available on the template classes
observable
, generation
and transformation
.
Buffer¶
Emits items in buffers of configurable size. Each buffer is emitted as a
copy-on-write vector (caf::cow_vector
). The operator will emit “partial”
buffers if the source observable completes before the buffer is full.
The operator also takes an optional second argument that specifies the maximum time period between two items before the buffer is emitted regardless of its size.
Concat¶
The concat
operator takes multiple input observables and re-emits the
observed items as a single sequence of items without interleaving them.
Concat Map¶
The concat_map
operator takes a function object converts a given input to an
observable
and then applies concat
to all of them.
Distinct¶
The distinct
operator makes all items unique by filtering all items have
been emitted in the past.
Element At¶
The element_at
operator re-emits the item at a given index while ignoring
all other items.
Filter¶
The filter
operator re-emits items from its input observable that pass a
predicate test.
First¶
The first
operator re-emits only the first item of the input observable.
Flat Map¶
The flat_map
operator takes a function object converts a given input to an
observable
and then applies merge
to all of them.
Head and Tail¶
The head_and_tail
operator splits an observable
into its first item and
an observable
for the remainder.
Ignore Elements¶
The ignore_elements
operator ignores all items of the input observable and
only emits the completion (or error) signal.
Last¶
The last
operator re-emits only the last item of the input observable.
Map¶
The map
operator applies a unary operation to all items of the input
observable and re-emits the resulting items. Similar to
std::transform.
Merge¶
The merge
operator takes multiple input observables and re-emits the
observed items as a single sequence of items as soon as they appear.
Observe On¶
The observe_on
operator pipes data from one actor to another through an
asynchronous buffer. The target actor must not run at the point of calling this
operator. In the image below, alice (red) and bob (blue) are two actors.
On Backpressure Buffer¶
This operator controls the behavior of a flow when the producer emits items faster than the consumer can process them. By default, the backpressure mechanism in CAF will slow down the producer, eventually bringing it to a halt. When connecting multiple consumers, a single slow consumer can block the entire flow. Especially when connecting external consumers, e.g., via the network, applications may want to alter this behavior.
The on_backpressure_buffer
operator allows users to have consumers buffer up
to a fixed number of items before responding to the overload condition.
The operator allows to select one of three strategies:
drop_oldest
: Discards the oldest item in the buffer when it is full.drop_newest
: Discards the newest item in the buffer when it is full.fail
: Aborts the flow with an error message when the buffer is full.
The default strategy (when calling on_backpressure_buffer
with a single
argument) is fail
.
On Error Complete¶
The on_error_complete
operator catches an error event and completes the
observable instead of propagating the error to the observer. In other words, the
operator will call on_complete
on the observer instead of on_error
in
case of an error, effectively ignoring the error.
On Error Return¶
Takes an error handler function object with the signature
expected<T>(const error&)
and replaces an error event with the result of the
handler. If the handler returns a value, the observable will emit this value
instead of the error. If the handler returns an error, the observable will emit
this error instead of the original error.
On Error Return Item¶
Takes a value and replaces an error event with this value. If the observable encounters an error, it will emit this value instead of the error.
Prefix and Tail¶
The head_and_tail
operator splits an observable
into its first n
items (stores in a caf::cow_vector
) and an observable
for the remainder.
Reduce¶
The reduce
operator is similar to
std::accumulate,
only that it operates on an observable
instead of an iterator range.
Ref Count¶
The ref_count
operator turns a connectable
back to a regular
observable
by automatically calling connect
as soon as there is an
initial “reference” (subscription). After the last “reference” goes away (no
more subscription), the ref_count
operators unsubscribes from its source.
Sample¶
The sample
operator emits the most recent item emitted by the source
observable within periodic time intervals.
Skip Last¶
The skip_last
operator re-emits all but the last n
items from its input
observable.
Skip¶
The skip
operator re-emits all but the first n
items from its input
observable.
Start With¶
The start_with
operator emits a given value (or observable) before the first
item of the input observable.
Sum¶
The sum
operator accumulates all items and emits the result after the input
observable
has completed.
Take Last¶
The take_last
operator re-emits the last n
items from its input
observable.
Take¶
The take
operator re-emits the first n
items from its input observable.
Take While¶
The take_while
operator re-emits items from its input observable until its
predicate returns false
.
To Vector¶
The to_vector
operator collects all items and emits a single vector
containing all observed items after the source observable
has completed.
Bridging between Flows and Other Input Sources¶
For pushing data into flows from other sources, CAF provides the
caf::flow::multicaster
class. As the name suggests, this class multi-casts
data to multiple observers. If no observer is connected, the multicaster
will discard the data.
A common use case for the multicaster
is have an actor receive asynchronous
messages and then push them into a flow. Please look at
examples/flow/multicaster.cpp
in the CAF repository for a demonstration.
Notes on Performance¶
When working with data flows in CAF, it is important to remember that values are frequently copied and buffered. In languages with call-by-reference semantics such as Java, this is not an issue since the flows basically pass pointers down the chain. In C++ however, programmers must choose wisely what data types are used in a flow.
Passing down types such as std::string
or std::vector
will invariably
slow down your application to a crawl. CAF offers three classes that help
mitigate this problem: caf::cow_string
, caf::cow_vector
and
caf::cow_tuple
. These type are thin wrappers around their standard library
counterpart that add copy-on-write (COW) semantics. Internally, all COW-types
have a pointer to the actual data with a reference count. This makes them cheap
to copy and save to use in a data flow. Most of the time, data in a flow does
not need to change after creating and is de-facto immutable. However, the
COW-optimization still gives you a mutable reference if you really need it and
you make a deep copy only if you must, i.e., if there are multiple references to
the data.
Key Differences to ReactiveX¶
Observables are not thread-safe. They describe a flow of data within an actor and are thus considered private to an actor.
CAF is more “opinionated” than ReactiveX when it comes to concurrency and ownership. The intended way for connecting concurrent parts of the system is by creating buffer resources and turning them into observables at the observing actor.
Furthermore, CAF does not support the scheduler interface from ReactiveX. Data
flows are usually managed by an actor. Hence, there is no analog for operators
such as SubscribeOn
. That being said, the flow API does not tie observables
or observers to actor types. The interface caf::flow::coordinator
manages
scheduling of flow-related work and can be implemented to run CAF flows without
actors, e.g., to integrate them into a custom event loop.