This the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Atomicity

Introduction

When processing events, an externalized projection has two tasks:

  1. persist the changes resulting from the Fact
  2. store the current fact-stream-position

When using an external datastore (e.g. Redis, JDBC, MongoDB), Factus needs to ensure that these two tasks happen atomically: either both tasks are executed or none. This prevents corrupted data in case e.g. the Datastore goes down in the wrong moment.

Factus offers atomic writes through atomic projections.

sequenceDiagram
  participant Projection
  participant External Data Store
  Projection->>External Data Store: 1) update projection
  Note right of External Data Store: Inside Transaction
  Projection->>External Data Store: 2) store fact-stream-position

In an atomic Projection, the projection update and the update of the fact-stream-position need to run atomically

Factus currently supports atomicity for the following external data stores:

Configuration

Atomic projections are declared via specific annotations. Currently, supported are

These annotations share a common configuration attribute:

Parameter NameDescriptionDefault Value
bulkSizehow many events are processed in a bulk50

as well as, different attributes needed to configure the respective underlying technical solution (Transaction/Batch/…). There are reasonable defaults for all of those attributes present.

Optimization: Bulk Processing

In order to improve the throughput of event processing, atomic projections support bulk processing.

With bulk processing

  • the concrete underlying transaction mechanism (e.g. Spring Transaction Management) can optimize accordingly.
  • skipping unnecessary fact-stream-position updates is possible (see next section).

The size of the bulk can be configured via a common bulkSize attribute of the @SpringTransactional or @RedisTransactional annotation.

Once the bulkSize is reached, or a configured timeout is triggered, the recorded operations of this bulk will be flushed to the datastore.

Skipping fact-stream-position Updates

Skipping unnecessary updates of the fact-stream-position reduces the writes to the external datastore, thus improving event-processing throughput.

The concept is best explained with an example: Suppose we have three events which are processed by a transactional projection and the bulk size set to “1”. Then, we see the following writes going to the external datastore:

sequenceDiagram
  participant Projection
  participant External Data Store
  Projection->>External Data Store: event 1: update projection data
  Projection->>External Data Store: event 1: store fact-stream-position
  Projection->>External Data Store: event 2: update projection data
  Projection->>External Data Store: event 2: store fact-stream-position
  Projection->>External Data Store: event 3: update projection data
  Projection->>External Data Store: event 3: store fact-stream-position

Processing three events with bulk size “1” - each fact-stream-position is written
As initially explained, here, each update of the projection is accompanied by an update of the fact-stream-position.

In order to minimize the writes to the necessary, we now increase the bulk size to “3”:

sequenceDiagram
  participant Projection
  participant External Data Store
  Projection->>External Data Store: event 1: update projection data
  Projection->>External Data Store: event 2: update projection data
  Projection->>External Data Store: event 3: update projection data
  Projection->>External Data Store: event 3: store fact-stream-position

Processing three events with bulk size “3” - only the last fact-stream-position written

This configuration change eliminates two unnecessary intermediate fact-stream-position updates. The bulk is still executed atomically, so in terms of fact-stream-position updates, we are just interested in the last, most recent position.

Skipping unnecessary intermediate updates to the fact-stream-position, noticeably reduces the required writes to the external datastore. Provided a large enough bulk size (“50” is a reasonable default), this significantly improves event-processing throughput.

1 - Spring Transactional

Broad Data-Store Support

Spring comes with extensive support for transactions which is employed by Spring Transactional Projections.

Standing on the shoulders of Spring Transactions, Factus supports transactionality for every data-store for which Spring transaction management is available. In more detail, for the data-store in question, an implementation of the Spring PlatformTransactionManager must exist.

Motivation

You would want to use Spring Transactional for two reasons:

  • atomicity of factStreamPosition updates and your projection state updates
  • increased fact processing throughput

The Performance bit is achieved by skipping unnecessary factStreamPosition updates and (more importantly) by reducing the number of transactions on your datastore by using one Transaction for bulkSize updates instead of single writes. For instance, if you use Spring Transactions on a JDBC Datastore, you will have one database transaction around the update of bulkSize events. The bulkSize is configurable per projection via the @SpringTransactional annotation.

Configuration

In order to make use of spring transaction support, the necessary dependency has to be included in your project:

    <dependency>
        <groupId>org.factcast</groupId>
        <artifactId>factcast-factus-spring-tx</artifactId>
    </dependency>

Structure

To use Spring Transactionality, a projection needs to:

  • be annotated with @SpringTransactional to configure bulk and transaction-behavior and
  • implement SpringTxProjection to return the responsible PlatformTransactionManager for this kind of Projection

Applying facts

In your @Handler methods, you need to make sure you use the Spring-Managed Transaction when talking to your datastore. This might be entirely transparent for you (for instance, when using JDBC that assigns the transaction to the current thread), or will need you to resolve the current transaction from the given platformTransactionManager example.

Please consult the Spring docs or your driver’s documentation.

You can find blueprints of getting started in the example section.

2 - Redis Transactional

A Redis transactional projection is a transactional projection based on Redisson RTransaction.

Compared to a Spring transactional projection, a Redis transactional projection is more lightweight since

  • transactionality is directly provided by RTransaction. There is no need to deal with Spring’s PlatformTransactionManager
  • the fact stream position is automatically managed (see example below)

Motivation

You would want to use Redis Transactional for two reasons:

  • atomicity of factStreamPosition updates and your projection state updates
  • increased fact processing throughput

The performance bit is achieved by skipping unnecessary factStreamPosition updates and (more importantly) by reducing the number of operations on your Redis backend by using bulkSize updates with one redisson transsaction instead of single writes. The bulkSize is configurable per projection via the @RedisTransactional annotation.

Working with a Redis transactional projection you can read your own uncommitted write. For this reason, a Redis transactional projection is best used for projections which need to access the projection’s data during the handling of an event.

Configuration

In order to make use of redisson RTransaction support, the necessary dependency has to be included in your project:

    <dependency>
        <groupId>org.factcast</groupId>
        <artifactId>factcast-factus-redis</artifactId>
    </dependency>

Structure

A Redis transactional projection can be a managed- or a subscribed projection and is defined as follows:

  • it is annotated with @RedisTransactional (optional when using the default values and extending one of Factus’ abstract classes mentioned below)
  • it implements RedisProjection revealing the RedisClient used
  • it provides the revision number of the projection via the @ProjectionMetaData annotation
  • the handler methods receive an additional RTransaction parameter

Example

@Handler
void apply(SomethingHappened fact, RTransaction tx) {
    myMap = tx.getMap( ... ).put( fact.getKey() , fact.getValue() );
}

a full example can be found here