This the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Atomicity

Introduction

When processing events, an externalized projection has two tasks:

  1. persist the changes resulting from the Fact
  2. store the current fact-stream-position

When using an external datastore (e.g. Redis, JDBC, MongoDB), Factus needs to ensure that these two tasks happen atomically: either both tasks are executed or none. This prevents corrupted data in case e.g. the Datastore goes down in the wrong moment.

Factus offers atomic writes through atomic projections.

sequenceDiagram
  participant Projection
  participant External Data Store
  Projection->>External Data Store: 1) update projection
  Note right of External Data Store: Inside Transaction
  Projection->>External Data Store: 2) store fact-stream-position

In an atomic Projection, the projection update and the update of the fact-stream-position need to run atomically

Factus currently supports atomicity for the following external data stores:

Configuration

Atomic projections are declared via specific annotations. Currently, supported are

These annotations share a common configuration attribute:

Parameter NameDescriptionDefault Value
bulkSizehow many events are processed in a bulk50

as well as, different attributes needed to configure the respective underlying technical solution (Transaction/Batch/…). There are reasonable defaults for all of those attributes present.

Optimization: Bulk Processing

In order to improve the throughput of event processing, atomic projections support bulk processing.

With bulk processing

  • the concrete underlying transaction mechanism (e.g. Spring Transaction Management) can optimize accordingly.
  • skipping unnecessary fact-stream-position updates is possible (see next section).

The size of the bulk can be configured via a common bulkSize attribute of the @SpringTransactional, @RedisTransactional or @RedisBatched annotation.

Once the bulkSize is reached, or a configured timeout is triggered, the recorded operations of this bulk will be flushed to the datastore.

Skipping fact-stream-position Updates

Skipping unnecessary updates of the fact-stream-position reduces the writes to the external datastore, thus improving event-processing throughput.

The concept is best explained with an example: Suppose we have three events which are processed by a transactional projection and the bulk size set to “1”. Then, we see the following writes going to the external datastore:

sequenceDiagram
  participant Projection
  participant External Data Store
  Projection->>External Data Store: event 1: update projection data
  Projection->>External Data Store: event 1: store fact-stream-position
  Projection->>External Data Store: event 2: update projection data
  Projection->>External Data Store: event 2: store fact-stream-position
  Projection->>External Data Store: event 3: update projection data
  Projection->>External Data Store: event 3: store fact-stream-position

Processing three events with bulk size “1” - each fact-stream-position is written
As initially explained, here, each update of the projection is accompanied by an update of the fact-stream-position.

In order to minimize the writes to the necessary, we now increase the bulk size to “3”:

sequenceDiagram
  participant Projection
  participant External Data Store
  Projection->>External Data Store: event 1: update projection data
  Projection->>External Data Store: event 2: update projection data
  Projection->>External Data Store: event 3: update projection data
  Projection->>External Data Store: event 3: store fact-stream-position

Processing three events with bulk size “3” - only the last fact-stream-position written

This configuration change eliminates two unnecessary intermediate fact-stream-position updates. The bulk is still executed atomically, so in terms of fact-stream-position updates, we are just interested in the last, most recent position.

Skipping unnecessary intermediate updates to the fact-stream-position, noticeably reduces the required writes to the external datastore. Provided a large enough bulk size (“50” is a reasonable default), this significantly improves event-processing throughput.

1 - Spring Transactional

Broad Data-Store Support

Spring comes with extensive support for transactions which is employed by Spring Transactional Projections.

Standing on the shoulders of Spring Transactions, Factus supports transactionality for every data-store for which Spring transaction management is available. In more detail, for the data-store in question, an implementation of the Spring PlatformTransactionManager must exist.

Motivation

You would want to use Spring Transactional for two reasons:

  • atomicity of factStreamPosition updates and your projection state updates
  • increased fact processing throughput

The Performance bit is achieved by skipping unnecessary factStreamPosition updates and (more importantly) by reducing the number of transactions on your datastore by using one Transaction for bulkSize updates instead of single writes. For instance, if you use Spring Transactions on a JDBC Datastore, you will have one database transaction around the update of bulkSize events. The bulkSize is configurable per projection via the @SpringTransactional annotation.

Configuration

In order to make use of spring transaction support, the necessary dependency has to be included in your project:

    <dependency>
        <groupId>org.factcast</groupId>
        <artifactId>factcast-factus-spring-tx</artifactId>
    </dependency>

Structure

To use Spring Transactionality, a projection needs to:

  • be annotated with @SpringTransactional to configure bulk and transaction-behavior and
  • implement SpringTxProjection to return the responsible PlatformTransactionManager for this kind of Projection

Applying facts

In your @Handler methods, you need to make sure you use the Spring-Managed Transaction when talking to your datastore. This might be entirely transparent for you (for instance, when using JDBC that assigns the transaction to the current thread), or will need you to resolve the current transaction from the given platformTransactionManager example.

Please consult the Spring docs or your driver’s documentation.

You can find blueprints of getting started in the example section.

2 - Redis Transactional

A Redis transactional projection is a transactional projection based on Redisson RTransaction.

Compared to a Spring transactional projection, a Redis transactional projection is more lightweight since

  • transactionality is directly provided by RTransaction. There is no need to deal with Spring’s PlatformTransactionManager
  • the fact stream position is automatically managed (see example below)

Motivation

You would want to use Redis Transactional for two reasons:

  • atomicity of factStreamPosition updates and your projection state updates
  • increased fact processing throughput

The performance bit is achieved by skipping unnecessary factStreamPosition updates and (more importantly) by reducing the number of operations on your Redis backend by using bulkSize updates with one redisson transsaction instead of single writes. The bulkSize is configurable per projection via the @RedisTransactional annotation.

Working with a Redis transactional projection you can read your own uncommitted write. For this reason, a Redis transactional projection is best used for projections which need to access the projection’s data during the handling of an event.

If this is not necessary, you could also use a better performing alternative: Redis batch projection

Configuration

In order to make use of redisson RTransaction support, the necessary dependency has to be included in your project:

    <dependency>
        <groupId>org.factcast</groupId>
        <artifactId>factcast-factus-redis</artifactId>
    </dependency>

Structure

A Redis transactional projection can be a managed- or a subscribed projection and is defined as follows:

  • it is annotated with @RedisTransactional
  • it implements RedisProjection revealing the RedisClient used
  • it provides the serial number of the projection via the @ProjectionMetaData annotation
  • the handler methods receive an additional RTransaction parameter

Example

@Handler
void apply(SomethingHappened fact, RTransaction tx) {
    myMap = tx.getMap( ... ).put( fact.getKey() , fact.getValue() );
}

a full example can be found here

3 - Redis Batched

A Redis batch projection is a atomic projection based on Redisson RBatch. Like Redis transactional projections, also this projection type is more lightweight than Spring transactional projections. No Spring PlatformTransactionManager is needed, the RBatch object of the Redisson library is enough.

Working with a Redis batch projection is asynchronous as multiple commands are collected and executed later in an atomic way (all or none).

Motivation

You would want to use Redis Batched for two reasons:

  • atomicity of factStreamPosition updates and your projection state updates
  • increased fact processing throughput

The performance bit is achieved by skipping unnecessary factStreamPosition updates and (more importantly) by reducing the number of operations on your Redis backend by using a redisson batch instead of single writes for bulkSize updates. The bulkSize is configurable per projection via the @RedisBatched annotation.

Redisson batches improve performance significantly by collecting operations and executing them together as well as delegating all possible work to an async thread. It has the effect, that you cannot read non-executed (batched) changes. If this is unacceptable for your projection, consider Redis transactional projections. See the Redisson documentation for details.

Other than a transaction, writes registered (but not yet executed) on a Redis batch can not be read.

A Redis batch projection, therefore, is recommended for projections which

  • handle a lot of events and
  • don’t require reading the current projection state in an event handler.

For an alternative that allows access to the projection state during event handling, see Redis transactional projection.

Configuration

In order to make use of redisson RBatch support, the necessary dependency has to be included in your project:

    <dependency>
        <groupId>org.factcast</groupId>
        <artifactId>factcast-factus-redis</artifactId>
    </dependency>

Structure

A Redis batch projection supports managed- or subscribed projections and is defined as follows:

  • it is annotated with @RedisBatched
  • it implements RedisProjection revealing the RedisClient used
  • it provides the serial number of the projection via the @ProjectionMetaData annotation
  • the handler methods receive an additional RBatch parameter

This structure is similar to a Redis transactional projection, only the @RedisBatched annotation and the RBatch method parameter differ.

Example

@Handler
void apply(SomethingHappened fact, RBatch batch) {
    myMap = batch.getMap( ... ).putAsync( fact.getKey() , fact.getValue() );
}

a full example can be found here