This the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Examples

In here, you will find some examples that you can use as a simple blueprint to get started building projections. We make use of some abstract classes here that might be more convenient to use. Feel free to study the implementations of those abstracts to see what is going on, especially when you plan to implement projections with different datastore than what we use in the examples.

1 - UserNames (Spring/JDBC)

Here is an example for a managed projection externalizing its state to a relational database (PostgreSQL here) using Spring transactional management.

The example projects a list of used UserNames in the System.

Preparation

We need to store two things in our JDBC Datastore:

  • the actual list of UserNames, and
  • the fact-stream-position of your projection.

Therefore we create the necessary tables (probably using liquibase/flyway or similar tooling of your choice):

CREATE TABLE users (
    name TEXT,
    id UUID,
    PRIMARY KEY (id));
CREATE TABLE fact_stream_positions (
    projection_name TEXT,
    fact_stream_position UUID,
    PRIMARY KEY (projection_name));

Given a unique projection name, we can use fact_stream_positions as a common table for all our JDBC managed projections.

Constructing

Since we decided to use a managed projection, we extended the AbstractSpringTxManagedProjection class. To configure transaction management, our managed projection exposes the injected transaction manager to the rest of Factus by calling the parent constructor.

@ProjectionMetaData(serial = 1)
@SpringTransactional
public class UserNames extends AbstractSpringTxManagedProjection {

    private final JdbcTemplate jdbcTemplate;

    public UserNames(
            @NonNull PlatformTransactionManager platformTransactionManager, JdbcTemplate jdbcTemplate) {
        super(platformTransactionManager);
        this.jdbcTemplate = jdbcTemplate;
    }
    ...

As we’re making use of Spring here, we inject a PlatformTransactionManager and a JdbcTemplate here in order to communicate with the database in a transactional way.

Two remarks:

  1. As soon as your project uses the spring-boot-starter-jdbc dependency, Spring Boot will automatically provide you with a JDBC-aware PlatformTransactionManager.
  2. To ensure that the database communication participates in the managed transaction, the database access mechanism must be also provided by Spring. Thus, we suggest using the JdbcTemplate.

Configuration

The @SpringTransactional annotation provides various configuration options:

Parameter NameDescriptionDefault Value
bulkSizebulk size50
timeoutInSecondstimeout in seconds30

Updating the projection

The two possible abstract base classes, AbstractSpringTxManagedProjection or AbstractSpringTxSubscribedProjection, both require the following methods to be implemented:

Method SignatureDescription
public UUID factStreamPosition()read the last position in the Fact stream from the database
public void factStreamPosition(@NonNull UUID factStreamPosition)write the current position of the Fact stream to the database
public WriterToken acquireWriteToken(@NonNull Duration maxWait)coordinates write access to the projection, see here for details

The first two methods tell Factus how to read and write the Fact stream’s position from the database.

Writing the fact position

Provided the table fact_stream_positions exists, here is an example of how to write the Fact position:

@Override
public void factStreamPosition(@NonNull UUID factStreamPosition) {
    jdbcTemplate.update(
            "INSERT INTO fact_stream_positions (projection_name, fact_stream_position) " +
            "VALUES (?, ?) " +
            "ON CONFLICT (projection_name) DO UPDATE SET fact_stream_position = ?",
            getScopedName().asString(),
            factStreamPosition,
            factStreamPosition);
}

For convenience, an UPSERT statement (Postgres syntax) is used, which INSERTs the UUID the first time and subsequently only UPDATEs the value.

To avoid hard-coding a unique name for the projection, the provided method getScopedName() is employed. The default implementation makes sure the name is unique and includes the serial of the projection.

Reading the fact position

To read the last Fact stream position, we simply select the previously written value:

@Override
public UUID factStreamPosition() {
    try {
        return jdbcTemplate.queryForObject(
                "SELECT fact_stream_position FROM fact_stream_positions WHERE projection_name = ?",
                UUID.class,
                getScopedName().asString());
    } catch (IncorrectResultSizeDataAccessException e) {
        // no position yet, just return null
        return null;
    }
}

In case no previous Fact position exists, null is returned.

Applying Facts

When processing the UserCreated event, we add a new row to the users tables, filled with event data:

@Handler
void apply(UserCreated e) {
    jdbcTemplate.update(
            "INSERT INTO users (name, id) VALUES (?,?);",
            e.getUserName(),
            e.getAggregateId());
}

When handling the UserDeleted event we do the opposite and remove the appropriate row:

@Handler
void apply(UserDeleted e) {
    jdbcTemplate.update("DELETE FROM users where id = ?", e.getAggregateId());
}

We have finished the implementation of the event-processing part of our projection. What is missing is a way to make the projection’s data accessible for users.

Querying the projection

Users of our projections (meaning “other code”) contact the projection via it’s public API. Currently, there is no public method offering “user names”. So let’s change that:

public List<String> getUserNames() {
    return jdbcTemplate.query("SELECT name FROM users", (rs, rowNum) -> rs.getString(1));
}

Using The Projection

Calling code that wants to talk to the projection, now just needs to call the getUserNames method:

// create a local instance or get a Spring Bean from the ApplicationContext, depending on your code organization
UserNames userNameProjection = new UserNames(platformTransactionManager, jdbcTemplate);

// depending on many factors you *may* want to update the projection before querying it
factus.update(userNameProjection);

List<String> userNames = userNameProjection.getUserNames();

First, we create an instance of the projection and provide it with all required dependencies. As an alternative, you may want to let Spring manage the lifecycle of the projection and let the dependency injection mechanism provide you an instance.

Next, we call update(...) on the projection to fetch the latest events from the Fact stream. Note that when you use a pre-existing (maybe Spring managed singleton) instance of the projection, this step is optional and depends on your use-case. As last step, we ask the projection to provide us with user names by calling getUserNames().

Full Example

To study the full example see

2 - UserNames (Redis Transactional)

Here is a projection that handles UserCreated and UserDeleted events. It solves the same problem as the example we’ve seen in Spring transactional projections. However, this time we use Redis as our data store and Redisson as the access API.

Configuration

The @RedisTransactional annotation provides various configuration options:

Parameter NameDescriptionDefault Value
bulkSizebulk size50
timeouttimeout in milliseconds until a transaction is interrupted and rolled back30000
responseTimeouttimeout in milliseconds for Redis response. Starts to countdown when transaction has been successfully submitted5000
retryAttemptsmaximum attempts to send transaction5
retryIntervaltime interval in milliseconds between retry attempts3000

Constructing

Since we decided to use a managed projection, we extended the AbstractRedisManagedProjection class. To configure the connection to Redis via Redisson, we injected RedissonClient in the constructor, calling the parent constructor.

@ProjectionMetaData(serial = 1)
@RedisTransactional
public class UserNames extends AbstractRedisManagedProjection {

  public UserNames(RedissonClient redisson) {
    super(redisson);
  }
    ...

FactStreamPosition and Lock-Management are automatically taken care of by the underlying AbstractRedisManagedProjection.

In contrast to non-atomic projections, when applying Facts to the Redis data structure, the instance variable userNames cannot be used as this would violate the transactional semantics. Instead, accessing and updating the state is carried out on a transaction derived data-structure (Map here) inside the handler methods.

Updating the projection

Applying Events

Received events are processed inside the methods annotated with @Handler (the handler methods). To participate in the transaction, these methods have an additional RTransaction parameter which represents the current transaction.

Let’s have a closer look at the handler for the UserCreated event:

@Handler
void apply(UserCreated e,RTransaction tx){
        Map<UUID, String> userNames=tx.getMap(getRedisKey());
        userNames.put(e.getAggregateId(),e.getUserName());
}

In the previous example, the method getRedisKeys() was used to retrieve the Redis key of the projection. Let’s have a closer look at this method in the next section.

Default redisKey

The data structures provided by Redisson all require a unique identifier which is used to store them in Redis. The method getRedisKey() provides an automatically generated name, assembled from the class name of the projection and the serial number configured with the @ProjectionMetaData.

Additionally, an AbstractRedisManagedProjection or a AbstractRedisSubscribedProjection maintain the following keys in Redis:

  • getRedisKey() + "_state_tracking" - contains the UUID of the last position of the Fact stream
  • getRedisKey() + "_lock" - shared lock that needs to be acquired to update the projection.

Redisson API Datastructures vs. Java Collections

As seen in the above example, some Redisson data structures also implement the appropriate Java Collections interface. For example, you can assign a Redisson RMap also to a standard Java Map:

// 1) use specific Redisson type
RMap<UUID, String> =tx.getMap(getRedisKey());

// 2) use Java Collections type
        Map<UUID, String> =tx.getMap(getRedisKey());

There are good reasons for either variant, 1) and 2):

Redisson specificplain Java
extended functionality which e.g. reduces I/O load. (e.g. see RMap.fastPut(...) and RMap.fastRemove(...)standard, intuitive
only option when using data-structures which are not available in standard Java Collections (e.g. RedissonListMultimap)easier to test

Full Example


@ProjectionMetaData(serial = 1)
@RedisTransactional
public class UserNames extends AbstractRedisManagedProjection {

  private final Map<UUID, String> userNames;

  public UserNames(RedissonClient redisson) {
    super(redisson);

     userNames = redisson.getMap(getRedisKey());
  }

  public List<String> getUserNames() {
    return new ArrayList<>(userNames.values());
  }

  @Handler
  void apply(UserCreated e, RTransaction tx) {
    tx.getMap(getRedisKey()).put(e.getAggregateId(), e.getUserName());
  }

  @Handler
  void apply(UserDeleted e, RTransaction tx) {
    tx.getMap(getRedisKey()).remove(e.getAggregateId());
  }
}

To study the full example, see

3 - UserNames (Redis Batched)

We continue using the previously introduced example of a projection handling UserCreated and UserDeleted events:

Configuration

The @RedisBatched annotation provides various configuration options:

Parameter NameDescriptionDefault Value
bulkSizebulk size50
responseTimeouttimeout in milliseconds for Redis response5000
retryAttemptsmaximum attempts to transmit batch of Redis commands5
retryIntervaltime interval in milliseconds between retry attempts3000

Constructing

Since we decided to use a managed projection, we extended the AbstractRedisManagedProjection class. To configure the connection to Redis via Redisson, we injected RedissonClient in the constructor, calling the parent constructor.

@ProjectionMetaData(serial = 1)
@RedisTransactional
public class UserNames extends AbstractRedisManagedProjection {

  public UserNames(RedissonClient redisson) {
    super(redisson);
  }
    ...

We are using a managed projection, hence we extend the AbstractRedisManagedProjection class. To let Factus take care of the batch submission and to automatically persist the Fact stream position and manage the locks, we provide the parent class with the instance of the Redisson client (call to super(...))

Updating the projection

Applying Events

To access the batch, the handler methods require an additional RBatch parameter:

@Handler
void apply(UserCreated created, RBatch batch) {
    batch.getMap(getRedisKey())
          .putAsync(created.getAggregateId(), created.getUserName());
}

We use a batch-derived RMapAsync object which offers asynchronous versions of the common Map methods. By calling putAsync(...) we add the extracted event data to the map. Underneath, the RBatch collects this change and, at a convenient point in time, transmits it together with other changes to Redis.

Default redisKey

The data structures provided by Redisson all require a unique identifier which is used to store them in Redis. The method getRedisKey() provides an automatically generated name, assembled from the class name of the projection and the serial number configured with the @ProjectionMetaData.

Additionally, an AbstractRedisManagedProjection or a AbstractRedisSubscribedProjection maintain the following keys in Redis:

  • getRedisKey() + "_state_tracking" - contains the UUID of the last position of the Fact stream
  • getRedisKey() + "_lock" - shared lock that needs to be acquired to update the projection.

Redisson API Datastructures vs. Java Collections

As seen in the above example, some Redisson data-structures also implement the appropriate Java Collections interface. For example, you can assign a Redisson RMap also to a standard Java Map:

// 1) use specific Redisson type
        RMap<UUID, String> = tx.getMap(getRedisKey());

// 2) use Java Collections type
        Map<UUID, String> = tx.getMap(getRedisKey());

There are good reasons for either variant, 1) and 2):

Redisson specificplain Java
extended functionality which e.g. reduces I/O load. (e.g. see RMap.fastPut(...) and RMap.fastRemove(...)standard, intuitive
only option when using data-structures which are not available in standard Java Collections (e.g. RedissonListMultimap)easier to test

Full Example

@ProjectionMetaData(serial = 1)
@RedisBatcheded
public class UserNames extends AbstractRedisManagedProjection {

    public UserNames(RedissonClient redisson) {
        super(redisson);
    }

    public List<String> getUserNames() {
        RMap<UUID, String> userNames = redisson.getMap(getRedisKey());
        return new ArrayList<>(userNames.values());
    }

    @Handler
    void apply(UserCreated created, RBatch batch) {
        RMapAsync<UUID, String> userNames = batch.getMap(getRedisKey());
        userNames.putAsync(created.getAggregateId(), created.getUserName());
    }

    @Handler
    void apply(UserDeleted deleted, RBatch batch) {
        batch.getMap(getRedisKey()).removeAsync(deleted.getAggregateId());
    }
}

To study the full example, see