This the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

API

Choose your weapon: FactCast vs Factus

One main design goal of FactCast is to be non-intrusive. That is supposed to mean, that tries to impose as little constraints on the client as possible and thus does not make too many assumptions about how exactly Facts are generated or processed. Facts - as you remember - are just tuples of JSON-Strings (header & payload) that everyone can use the way he/she likes.

However, that focus on freedom sometimes makes it hard for application programmers to know where to start, or how to implement good practices like for instance the different kinds of models, locking or even just generating Facts from Java Objects.

This is where Factus comes in.

Factus is a higher-level API for java applications to work with factcast. Using Factus from java is entirely optional. Factus just uses FactCast underneath, so every feature you may find in Factus can be used with raw FactCast as well.

Whereas factcast tries to limit the number of assumptions, Factus is highly opinionated.

Factus provides higher level abstractions that are supposed to make it faster and more convenient to work with FactCast from Java. For an Overview about what Factus can do, see the Factus section.

1 - Usage (low-level)

This section will walk you through how to use FactCast from an application programmers perspective.

Please note, that this is a low-level API. If you’re a Java Programmer and want to use a higher-level API or explore what you can do with FactCast in a more approachable way first, you should have a look at the Factus section.

1.1 - Java

This section will walk you through how to use FactCast from an application programmers perspective.

Please note, that this is a low-level API. If you’re a Java Programmer and want to use a higher-level API or explore what you can do with FactCast in a more approachable way first, you should have a look at the Factus section.

1.1.1 - Java GRPC Producer

FactCast.publish(List<Fact> factsToPublish)

In order to produce Facts, you need to create Fact Instances and publish them via FactCast.publish().

On method return, the publishing of all the Facts passed to FactCast.publish() as a parameter are expected to have been written to PostgreSQL successfully. The order of the Facts is preserved while inserting into the database. All the inserts are done in a transactional context, so that the atomicity is preserved.

If the method returns exceptionally, of the process is killed or interrupted in any way, you cannot know if the Facts have been successfully written. In that case, just repeat the call: if the write had gone through you’ll get and exception complaining about duplicate IDs, if not – you may have a chance to succeed now.

FactCast.publish(Fact toPublish)

acts the same way, as the List counterparts above, just for a List of one Fact.

Example Code

Here is some example code assuming you use the Spring GRPC Client:

@Component
class Foo{
 @Autowired
 FactCast fc;

 public void someMethod(){
   fc.publish( new SomethingHappenedFact() );
 }
}

1.1.2 - Java GRPC Consumer

As mentioned before, there are three main Use-Cases for subscribing to a Fact-Stream:

  • Validation of Changes against a strictly consistent Model (Catchup)
  • Creating and maintaining a Read-Model (Follow)
  • Managing volatile cached data (Ephemeral)

Here is some example code assuming you use the Spring GRPC Client:

Example Code: Catchup

@Component
class CustomerRepository{
 @Autowired
 FactCast factCast;

 // oversimplified code !
 public Customer getCustomer(UUID customerId){
   // match all Facts currently published about that customer
   SubscriptionRequest req = SubscriptionRequest.catchup(FactSpec.ns("myapp").aggId(customerId)).fromScratch();

   Customer customer = new Customer(id);
   // stream all these Facts to the customer object's handle method, and wait until the stream ends.
   factCast.subscribe(req, customer::handle ).awaitComplete();

   // the customer object should now be in its latest state, and ready for command validation
   return customer;
 }

}

class Customer {
  Money balance = new Money(0); // starting with no money.
  //...
  public void handle(Fact f){
    // apply Fact, so that the customer earns and spend some money...
  }
}

Example Code: Follow

@Component
class QueryOptimizedView {
 @Autowired
 FactCast factCast;

 @PostConstruct
 public void init(){

   UUID lastFactProcessed = persistentModel.getLastFactProcessed();

   // subscribe to all customer related changes.
   SubscriptionRequest req = SubscriptionRequest
      .follow(type("CustomerCreated"))
          .or(type("CustomerDeleted"))
          .or(type("CustomerDeposition"))
          .or(type("PurchaseCompleted"))
      .from(lastFactProcessed);

   factCast.subscribe(req, this::handle );
 }

 private FactSpec type(String type){
   return FactSpec.ns("myapp").type(type);
 }

 @Transactional
 public void handle(Fact f){
    // apply Fact, to the persistent Model
    // ...
    persistentModel.setLastFactProcessed(f.id());
 }

Example Code: Ephemeral

@Component
class CustomerCache {
 @Autowired
 FactCast factCast;

 Map<UUID,Customer> customerCache = new HashMap<>();

 @PostConstruct
 public void init(){
   // subscribe to all customer related changes.
   SubscriptionRequest req = SubscriptionRequest.
      .follow(type("CustomerCreated"))
          .or(type("CustomerDeleted"))
          .or(type("CustomerDeposition"))
          .or(type("PurchaseCompleted"))
      .fromNowOn();

   factCast.subscribe(req, this::handle );
 }

 private FactSpec type(String type){
  return FactSpec.ns("myapp").type(type);
 }

 @Transactional
 public void handle(Fact f){
    // if anything has changed, invalidate the cached value.
    // ...
    Set<UUID> aggregateIds = f.aggId();
    aggregateIds.forEach(customerCache::remove);
 }

1.1.3 - Java Optimistic Locking

Motivation

Whatever your particular way of modelling your software, in order to be able to enforce invariants in your aggregates, you’d need to coordinate writes to it. In simple monoliths, you do that by synchronizing write access to the aggregate. When Software-Systems become distributed (or at least replicated), this coordination obviously needs to be externalized.

Pessimistic Locking

While pessimistic locking makes sure every change is strictly serializable, it has obvious drawbacks in terms of throughput and complexity (timeouts) as well as the danger of deadlock, when the scope of the lock expands to more than one aggregate. This is why we chose to implement a means of optimistic locking in FactCast.

Optimistic Locking

In general, the idea of optimistic locking is to make a change and before publishing it, make sure there was no potentially contradicting change in between. If there was, the process can safely be retried, as there was nothing published yet.

Transferred to FactCast, this means to express a body of code that:

  1. creates an understanding of the published state of the aggregates in question
  2. invokes its business logic according to that state
  3. creates the effects: either fails (if business logic decides to do so), or publishes new Fact(s)
  4. rechecks, if the state recorded in 1. is still unchanged and then
  5. either publishes the prepared Facts or retries by going back to 1.

Usage

a simple example

This code checks if an account with id newAccountId already exists, and if not - creates it by publishing the Fact accordingly.

factcast.lock("myBankNamespace")
        .on(newAccountId)
        .attempt(() -> {
            // check and maybe abort
            if (repo.findById(newAccountId) !=null)
                return Attempt.abort("Already exists.");
            else
              return Attempt.publish(
                Fact.builder()
                .ns("myBankNamespace")
                .type("AccountCreated")
                .aggId(newAccountId)
                .build("{...}")
              );
        });

You may probably guess what happens, remembering the above steps. Let’s dive into details with a more complex scenario.

a complete example

The unavoidable imaginary example, of two BankAccounts and a money transfer between them:

factcast.lock("myBankNamespace")
        .on(sourceAccountId,targetAccountId)
        .optimistic()            // this is optional, defaults to optimistic, currently the only mode supported
        .retry(100)              // this is optional, default is 10 times
        .interval(5)             // this is optional, default is no wait interval between attempts (equals to 0)
        .attempt(() -> {

            // fetch the latest state
            Account source = repo.findById(sourceAccountId);
            Account target = repo.findById(targetAccountId);

            // run businesslogic on it
            if (source.amount() < amountToTransfer)
                return Attempt.abort("Insufficient funds.");

            if (target.isClosed())
                return Attempt.abort("Target account is closed");

            // everything looks fine, create the Fact to be published
            Fact toPublish = Fact.builder()
                .ns("myBankNamespace")
                .type("transfer")
                .aggId(sourceAccountId)
                .aggId(targetAccountId)
                .build("{...}");

            // register for publishing
            return Attempt.publish(toPublish).andThen(()->{

                // this is only executed at max once, and only if publishing succeeded
                log.info("Money was transferred.");

            });
        });

Explanation

First, you tell factcast to record a state according to all events that have either sourceAccountId or targetAccountId in their list of aggIds and are on namespace myBankNamespace. While the namespace is not strictly necessary, it is encouraged to use it - but it depends on your decision on how to use namespaces and group Facts within them.

The number of retries is set to 100 here (default is 10, which for many systems is an acceptable default). In essence this means, that the attempt will be executed at max 100 times, before factcast gives up and throws an OptimisticRetriesExceededException which extends ConcurrentModificationException.

If interval is not set, it defaults to 0 with the effect, that the code passed into attempt is continuously retried without any pause until it either aborts, succeeds, or the max number of retries was hit (see above). Setting it to 5 means, that before retrying, a 5 msec wait happens.

Everything starts with passing a lambda to the attempt method. The lambda is of type

@FunctionalInterface
public interface Attempt {
    IntermediatePublishResult call() throws AttemptAbortedException;
    //...
}

so that it has to return an instance of IntermediatePublishResult. The only way to create such an instance are static methods on the same interface (abort, publish, …) in order to make it obvious. This lambda now is called according to the logic above.

Inside the lambda, you’d want to check the current state using the very latest facts from factcast (repo.findById(...)) and then check your business constraints on it (if (source.amount() < amountToTransfer)…). If the constraints do not hold, you may choose to abort the Attempt and thus abort the process. In this case, the attempt will not be retried.

On the other hand, if you choose to publish new facts using Attempt.publish(...), the state will be checked and the Fact(s) will be published if there was no change in between (otherwise a retry will be issued, see above). In the rare case, that you do not want to publish anything, you can return Attempt.withoutPublication() to accomplish this.

Optionally, you can pass a runnable using .andThen and schedule it for execution once, if and only if the publishing succeeded. Or in other words, this runnable is executed just once or never (in case of abort or OptimisticRetriesExceededException).

1.2 - JavaScript

This section will walk you through how to use FactCast from an application programmers perspective.

Please note, that this is a low-level API. If you’re a Java Programmer and want to use a higher-level API or explore what you can do with FactCast in a more approachable way first, you should have a look at the Factus section.

1.2.1 - nodeJS GRPC Producer

Producing Facts via nodeJS is very simple due to the available gRPC NPM Module. It will generate a stub constructor called RemoteFactStore from our proto file.

const uuidV4 = require("uuid/v4");
const grpc = require("grpc");
const protoDescriptor = grpc.load("./FactStore.proto");
const RemoteFactStore =
	protoDescriptor.org.factcast.grpc.api.gen.RemoteFactStore;

// store allows us to publish, subscribe and fetchById (see proto file)
const store = new RemoteFactStore(
	"localhost:9090",
	grpc.credentials.createInsecure()
);

store.publish(
	[
		{
			header: JSON.stringify({
				id: uuidV4(),
				ns: "myapp",
			}),
			payload: JSON.stringify({
				foo: Date.now(),
			}),
		},
	],
	(err, feature) => {
		if (err) {
			console.log(err);
		}
	}
);

See the Facts page for detailed information about all possible and required header fields.

1.2.2 - nodeJS GRPC Consumer

const grpc = require("grpc");
const protoDescriptor = grpc.load("./FactStore.proto");
const RemoteFactStore =
	protoDescriptor.org.factcast.grpc.api.gen.RemoteFactStore;

const store = new RemoteFactStore(
	"localhost:9090",
	grpc.credentials.createInsecure()
);

const subscription = store.subscribe({
	json: JSON.stringify({
		continuous: true,
		specs: [
			{
				ns: "myapp",
			},
		],
	}),
});

subscription.on("data", (fact) => {
	console.log(fact);
});

1.3 - CLI

1.3.1 - Factcast CLI

In order to help with quick testing or debugging, FactCast provides a very simple CLI that you can use to publish Facts or subscribe and print Facts received to stdout.

Usage

Once module factcast-grpc-cli is built, it provides a self-contained fc-cli.jar in its target folder. In order to use it, you can either run

java -jar path_to/fc-cli.jar <OPTIONS> <COMMAND> <COMMAND OPTIONS>

or just execute it as

path_to/fc-cli.jar <OPTIONS> <COMMAND> <COMMAND OPTIONS>

Help output at the time of writing is

Usage: fc-cli [options] [command] [command options]
  Options:
    --debug
      show debug-level debug messages
    --address
      the address to connect to
      Default: static://localhost:9090
    --basic, -basic
      Basic-Auth Crendentials in the form "user:password"
    --no-tls
      do NOT use TLS to connect (plaintext-communication)
    --pretty
      format JSON output
  Commands:
    catchup      Read all the matching facts up to now and exit.
      Usage: catchup [options]
        Options:
          -from
            start reading AFTER the fact with the given id
        * -ns
            the namespace filtered on

    follow      read all matching facts and keep connected while listening for
            new ones
      Usage: follow [options]
        Options:
          -from
            start reading AFTER the fact with the given id
          -fromNowOn
            read only future facts
        * -ns
            the namespace filtered on

    publish      publish a fact
      Usage: publish [options]
        Options:
        * --header, -h
            Filename of an existing json file to read the header from
        * --payload, -p
            Filename of an existing json file to read the payload from

    enumerateNamespaces      lists all namespaces in the factstore in no
            particular order
      Usage: enumerateNamespaces

    enumerateTypes      lists all types used with a namespace in no particular
            order
      Usage: enumerateTypes namespace

    serialOf      get the serial of a fact identified by id
      Usage: serialOf id

1.3.2 - Schema Registry CLI

This CLI provides a convenient way to create a suitable Schema Registry for your FactCast installation. It will give you the ability to validate events against examples and to make sure that there’s always an upcast and if necessary a downcast transformation.

It produces a human and a machine-readable output. You will have to use hugo in order to get a proper static website.

A working example can be found here.

Build the example

The example will be built during mvn install, but you can reach the same via

$ java -jar target/fc-schema-cli.jar build -p ../factcast-examples/factcast-example-schema-registry/src/main/resources

build validates and builds the example and also produces a output directory that contains the static website. Inside this folder run

$ hugo server

to get quick feedback or

$ hugo

in order to create the deployable schema registry (located at output/public).

About CI Pipelines and Artifacts

We propose to the following pipeline

Build -> Package -> Upload

Build:

  • runs the fc-schema-cli to build the registry
  • fails on wrong input/broken schema

Package:

  • runs $ hugo in order to produce the artifact

Upload:

  • uploads output/public to static file server (like S3)

Available commands and options

$ java -jar target/fc-schema-cli.jar -h

███████╗ █████╗  ██████╗████████╗ ██████╗ █████╗ ███████╗████████╗
██╔════╝██╔══██╗██╔════╝╚══██╔══╝██╔════╝██╔══██╗██╔════╝╚══██╔══╝
█████╗  ███████║██║        ██║   ██║     ███████║███████╗   ██║
██╔══╝  ██╔══██║██║        ██║   ██║     ██╔══██║╚════██║   ██║
██║     ██║  ██║╚██████╗   ██║   ╚██████╗██║  ██║███████║   ██║
╚═╝     ╚═╝  ╚═╝ ╚═════╝   ╚═╝    ╚═════╝╚═╝  ╚═╝╚══════╝   ╚═╝

Usage: fc-schema [-hV] [COMMAND]
Tool for working with the FactCast Schema Registry spec
  -h, --help      Show this help message and exit.
  -V, --version   Print version information and exit.
Commands:
  validate  Validate your current events
  build     Validates and builds your registry

1.3.3 - 3rd Party CLI

As an alternative to the Factcast CLI there is the Python based PyFactCast. It is still in early development, but you might want to check it out.

2 - Factus (high-level)

This section will walk you through using FactCast from an application programmers perspective using the abstractions of Factus. Factus is an optional high-level API provided in order to make it easier to work with FactCast from Java (or Kotlin or any other JVM language of your choice).

Please be aware that the Factus API is in experimental stage and is expected to change while getting more mature. If you find sharp edges or feel like things are missing, please open an issue on GitHub.

If you want to have more control and don’t want to opt in to Factus, you can have a look at the lower-level FactCast API. See the Lowlevel section instead.

2.1 - Introduction

Motivation

If Factus is optional, why does it exist in the first place, you might ask.

FactCast tries to be non-intrusive. It focuses on publishing, retrieval, validation and transformation of JSON documents. It also provides some tools for advanced (yet necessary) concepts like optimistic locking, but it does not prescribe anything in terms of how to use this to build an application.

Depending on your experience with eventsourcing in general or other products/approaches in particular, it might be hard to see how exactly this helps you to build correct, scalable and maintainable systems. At least this was our experience working with diverse groups of engineers over the years.

Now, instead of documenting lots of good practices here, we thought it would be easier to start with, more convenient and less error-prone to offer a high-level API instead, that codifies those good practices.

We say “good” practices here, rather than “best” practices for a reason. Factus represents just one way of using FactCast from Java. Please be aware that it may grow over time and that there is nothing wrong with using a different approach. Also, be aware that not every possible use case is covered by Factus so that you occasionally might want to fall back to “doing things yourself” with the low-level FactCast API. In case you encounter such a situation, please open a GitHub issue explaining your motivation. Maybe this is something Factus is currently lacking.

Factus as a higher level of abstraction

Factus replaces FactCast as a central interface. Rather than with Facts, Factus primarily deals with EventObjects deserialized from Facts. using an EventSerializer. Factus ships with a default one that uses Jackson, but you’re free to use any library of your taste to accomplish this (like Gson, or whatever is popular with you).

Concrete events will implement EventObject in order to be able to contribute to Fact Headers when serialized, and they are expected to be annotated with @Specification in order to declare what the specifics of the FactHeader (namespace, type and version) are.

import com.google.common.collect.Sets;

/**
 * EventObjects are expected to be annotated with @{@link Specification}.
 */
public interface EventObject {

  default Map<String, String> additionalFactHeaders() {
    return Collections.emptyMap();
  }

  Set<UUID> aggregateIds();

}

/**
 * Example EventObject based event containing one property
 */
@Specification(ns = "user", type = "UserCreated", version = 1)
class UserCreated implements EventObject {

  // getters & setters or builders omitted
  private UUID userId;
  private String name;

  @Override
  public Set<UUID> aggregateIds() {
    return Sets.newHashSet(userId);
  }
}

Now the payload of a Fact created from your Events will be, as you’d expect, the json-serialized form of the Event which is created by the EventSerializer.

Factus ships with a default serializer for EventObjects. It uses Jackson and builds on a predefined ObjectMapper, if defined (otherwise just uses the internal FactCast-configured ObjectMapper). If, for some reason, you want to redefine this, you can use/ provide your own EventSerializer.

As factus is optional, you’ll first want to setup you project to use it. See Factus Setup

2.2 - Setup

Dependencies

First thing you need in your project is a dependency to factus itself.

    <dependency>
      <groupId>org.factcast</groupId>
      <artifactId>factcast-factus</artifactId>
    </dependency>

If you use Spring-Boot and also have the spring boot autoconfiguration dependency included,

    <dependency>
      <groupId>org.factcast</groupId>
      <artifactId>factcast-spring-boot-autoconfigure</artifactId>
    </dependency>

this is all you need to get started.

However, there is a growing list of optional helpful dependencies when it comes to using Factus:


Binary Snapshot Serializer

The default Snapshot Serializer in Factus uses Jackson to serialize to/from JSON. This might be less than optimal in terms of storage cost and transport performance/efficiency. This optional dependency:

    <dependency>
      <groupId>org.factcast</groupId>
      <artifactId>factcast-factus-bin-snapser</artifactId>
    </dependency>

replaces the default Snapshot Serializer by another variant, that - while still using jackson to stay compatible with the default one from the classes perspective - serializes to a binary format and uses lz4 to swiftly (de-)compress it on the fly.

Depending on your environment, you may want to roll your own and use a slower, but more compact compression or maybe just want to switch to plain Java Serialization. In this case, have a look at BinarySnapshotSerializer to explore, how to do it. (If you do, please contribute it back - might be worthwhile integrating into factcast)

Should be straightforward and easy.

In case you want to configure this serializer, define a BinaryJacksonSnapshotSerializerCustomizer bean and define the configuration in there. Take a look at BinaryJacksonSnapshotSerializerCustomizer#defaultCustomizer if you need inspiration.


Redis SnapshotCache

From a client’s perspective, it is nice to be able to persist snapshots directly into factcast, so that you don’t need any additional infrastructure to get started. In busy applications with many clients however, it might be a good idea to keep that load away from factcast, so that it can use its capacity to deal with Facts only.

In this case you want to use a different implementation of the SnapshotCache interface on a client, in order to persist snapshots in your favorite K/V store, Document Database, etc.

We chose Redis as an example database for externalized shared data for the examples, as it has a very simple API and is far more lightweight to use than a RDBMS. But, please be aware, that you can use ANY Database to store shared data and snapshots, by just implementing the respective interfaces.

In case Redis is you weapon of choice, there is a Redis implementation of that interface. Just add

    <dependency>
      <groupId>org.factcast</groupId>
      <artifactId>factcast-snapshotcache-redisson</artifactId>
    </dependency>

to your client’s project and spring autoconfiguration (if you use spring boot) will do the rest.

As it relies on the excellent Redisson library, all you need is to add the corresponding redis configuration to your project. See the Redisson documentation.

2.3 - Publication

The publishing side is easy and should be intuitive to use. Factus offers a few methods to publish either Events (or Facts if you happen to have handcrafted ones) to FactCast.

public interface Factus extends SimplePublisher, ProjectionAccessor, Closeable {

    /**
     * publishes a single event immediately
     */
    default void publish(@NonNull EventObject eventPojo) {
        publish(eventPojo, f -> null);
    }

    /**
     * publishes a list of events immediately in an atomic manner (all or none)
     */
    default void publish(@NonNull List<EventObject> eventPojos) {
        publish(eventPojos, f -> null);
    }

    /**
     * publishes a single event immediately and transforms the resulting facts
     * to a return type with the given resultFn
     */
    <T> T publish(@NonNull EventObject e, @NonNull Function<Fact, T> resultFn);

    /**
     * publishes a list of events immediately in an atomic manner (all or none)
     * and transforms the resulting facts to a return type with the given
     * resultFn
     */
    <T> T publish(@NonNull List<EventObject> e, @NonNull Function<List<Fact>, T> resultFn);

    /**
     * In case you'd need to assemble a fact yourself
     */
    void publish(@NonNull Fact f);

// ...

As you can see, you can either call a void method, or pass a function that translates the published Facts to a return value, in case you need it.

Batches

Just like FactCast’s publish(List<Fact>), you can publish a list of Events/Facts atomically.

However, in some more complex scenarios, it might be more appropriate to have an object to pass around (and maybe mark aborted) where different parts of the code can contribute Events/Facts to publish to. This is what PublishBatch is used for:

public interface PublishBatch extends AutoCloseable {
    PublishBatch add(EventObject p);

    PublishBatch add(Fact f);

    void execute() throws BatchAbortedException;

    <R> R execute(Function<List<Fact>, R> resultFunction) throws BatchAbortedException;

    PublishBatch markAborted(String msg);

    PublishBatch markAborted(Throwable cause);

    void close(); // checks if either aborted or executed already, otherwise will execute
}

In order to use this, just call Factus::batch to create a new PublishBatch object.

2.4 - Projection

Before we can look at processing Events, we first have to talk about another abstraction that does not exist in FactCast: Projection

public interface Projection { ... }

In Factus, a Projection is any kind of state that is distilled from processing Events - in other words: Projections process (or handle) events.

Projections in general

What projections have in common is that they handle Events (or Facts). In order to express that, a projection can have any number of methods annotated with @Handler or @HandlerFor. These methods must be package-level/protected accessible and can be either on the Projection itself or on a nested (non-static) inner class. A simple example might be:

/**
 *  maintains a map of UserId->UserName
**/
public class UserNames implements SnapshotProjection {

    private final Map<UUID, String> existingNames = new HashMap<>();

    @Handler
    void apply(UserCreated created) {
        existingNames.put(created.aggregateId(), created.userName());
    }

    @Handler
    void apply(UserDeleted deleted) {
        existingNames.remove(deleted.aggregateId());
    }
// ...

Here the EventObject ‘UserDeleted’ and ‘UserCreated’ are just basically tuples of a UserId (aggregateId) and a Name (userName). Also, projections must have a default (no-args) constructor.

As we established before, you could also decide to use a nested class to separate the methods from other instance methods, like:

public class UserNames implements SnapshotProjection {

    private final Map<UUID, String> existingNames = new HashMap<>();

    class EventProcessing {

        @Handler
        void apply(UserCreated created) {
            existingNames.put(created.aggregateId(), created.userName());
        }

        @Handler
        void apply(UserDeleted deleted) {
            existingNames.remove(deleted.aggregateId());
        }

    }
// ...

many Flavours

There are several kinds of Projections that we need to look at. But before, let’s start with Snapshotting

2.4.1 - Snapshotting

In EventSourcing a Snapshot is used to memorize an object at a certain point in the EventStream, so that when later-on this object has to be retrieved again, rather than creating a fresh one and use it to process all relevant events, we can start with the snapshot (that already has the state of the object from before) and just process all the facts that happened since.

It is easy to see that storing and retrieving snapshots involves some kind of marshalling and unmarshalling, as well as some sort of Key/Value store to keep the snapshots.

Snapshot Serialization

Serialization is done using a SnapshotSerializer.


public interface SnapshotSerializer {
  byte[] serialize(SnapshotProjection a);

  <A extends SnapshotProjection> A deserialize(Class<A> type, byte[] bytes);

  boolean includesCompression();

  /**
   * In order to catch changes when a {@link SnapshotProjection} got changed, calculate a hash that
   * changes when the schema of the serialised class changes.
   *
   * <p>Note that in some cases, it is possible to add fields and use serializer-specific means to
   * ignore them for serialization (e.g. by using @JsonIgnore with Jackson).
   *
   * <p>Hence, every serializer is asked to calculate it's own hash, that should only change in case
   * changes to the projection where made that were relevant for deserialization.
   *
   * <p>This method is only used if no other means of providing a hash is used. Alternatives are
   * using the ProjectionMetaData annotation or defining a final static long field called
   * serialVersionUID.
   *
   * <p>Note, that the serial will be cached per class
   *
   * @param projectionClass the snapshot projection class to calculate the hash for
   * @return the calculated hash or null, if no hash could be calculated (makes snapshotting fail if
   *     no other means of providing a hash is used)
   */
  Long calculateProjectionSerial(Class<? extends SnapshotProjection> projectionClass);
}

As you can see, there is no assumption whether it produces JSON or anything, it just has to be symmetric. In order to be able to optimize the transport of the snapshot to/from the SnapshotCache, each SnapshotSerializer should indicate if it already includes compression, or if compression in transit might be a good idea. Factus ships with a default SnapshotSerializer, that - you can guess by now - uses Jackson. Neither the most performant, nor the most compact choice. Feel free to create one on your own.

Choosing serializers

If your SnapshotProjection does not declare anything different, it will be serialized using the default SnapshotSerializer known to your SnapshotSerializerSupplier (when using Spring boot, normally automatically bound as a spring bean).

In case you want to use a different implementation for a particular ‘SnapshotProjection’, you can annotate it with ‘@SerializeUsing’

@SerializeUsing(MySpecialSnapshotSerializer.class)
static class MySnapshotProjection implements SnapshotProjection {
    //...
}

Note that those implementations need to have a default constructor and are expected to be stateless.

Snapshot caching

The Key/Value store that keeps and maintains the snapshots is called a SnapshotCache.

Factus comes with a default SnapshotCache that uses FactCast to store/retrieve and maintain those cached snapshots. While this works reasonably well and is easy to use, as it does not involve any other piece of infrastructure, you might want to keep an eye on the load- and storage-requirements imposed by this. It is very easy to provide an implementation of SnapshotCache that uses for instance Redis or memcached instead, so that you keep this load away from FactCast for performance, scalability and in the end also cost efficiency reasons. Also, it has an effect on the availability and maybe responsiveness of your application, but this is obviously outside of the scope of this document.

If you happen to use redis in your application for instance, you could use

<dependency>
    <groupId>org.factcast</groupId>
    <artifactId>factcast-snapshotcache-redisson</artifactId>
</dependency>

in order to override this default.

The SnapshotCache by default only keeps the last version of a particular snapshot, and deletes it after 90 days of being unused. See Properties

Serials

When a projection class is changed (e.g. a field is renamed or its type is changed), depending on the Serializer, there will be a problem with deserialization. In order to rebuild a snapshot in this case a “serial” is to be provided for the Projection. Only snapshots that have the same “serial” than the class in its current state will be used.

Serials are declared to projections by adding a @ProjectionMetaData(serial = 1L) to the type.

2.4.2 - Projection Types

Use the Menu on the left hand side to learn about the different flavors of projections.

2.4.2.1 - Snapshot

Now that we know how snapshotting works and what a projection is, it is quite easy to put things together:

A SnapshotProjection is a Projection (read EventHandler) that can be stored into/created from a Snapshot. Let’s go back to the example we had before:

/**
 *  maintains a map of UserId->UserName
 **/
public class UserNames implements SnapshotProjection {

  private final Map<UUID, String> existingNames = new HashMap<>();

  @Handler
  void apply(UserCreated created) {
    existingNames.put(created.aggregateId(), created.userName());
  }

  @Handler
  void apply(UserDeleted deleted) {
    existingNames.remove(deleted.aggregateId());
  }
// ...

This projection is interested in UserCreated and UserDeleted EventObjects and can be serialized by the SnapshotSerializer.

If you have worked with FactCast before, you’ll know what needs to be done (if you haven’t, just skip this section and be happy not to be bothered by this anymore):

  1. create an instance of the projection class, or get a Snapshot from somewhere
  2. create a list of FactSpecs (FactSpecifications) including the Specifications from UserCreated and UserDeleted
  3. create a FactObserver that implements an void onNext(Fact fact) method, that
    1. looks at the fact’s namespace/type/version
    2. deserializes the payload of the fact into the right EventObject’s instance
    3. calls a method to actually process that EventObject
    4. keeps track of facts being successfully processed
  4. subscribe to a fact stream according to the FactSpecs from above (either from Scratch or from the last factId processed by the instance from the snapshot)
  5. await the completion of the subscription to be sure to receive all EventObjects currently in the EventLog
  6. maybe create a snapshot manually and store it somewhere, so that you do not have to start from scratch next time

… and this is just the “happy-path”.

With Factus however, all you need to do is to use the following method:

 /**
 * If there is a matching snapshot already, it is deserialized and the
 * matching events, which are not yet applied, will be as well. Afterwards, a new
 * snapshot is created and stored.
 * <p>
 * If there is no existing snapshot yet, or they are not matching (see
 * serialVersionUID), an initial one will be created.
 *
 * @return an instance of the projectionClass in at least initial state, and
 *         (if there are any) with all currently published facts applied.
 */
@NonNull
<P extends SnapshotProjection> P fetch(@NonNull Class<P> projectionClass);

like

UserNames currentUserNames=factus.fetch(UserNames.class);

Easy, uh? As the instance is created from either a Snapshot or the class, the instance is private to the caller here. This is the reason why there is no ConcurrentHashMap or any other kind of synchronization necessary within UserNames.

Lifecycle hooks

There are plenty of methods that you can override in order to hook into the lifecycle of a SnapshotProjection.

  • onCatchup() - will be called when the catchup signal is received from the server.
  • onComplete() - will be called when the FactStream is at its end (only valid for catchup projections)
  • onError() - whenever an error occurs on the server side or on the client side before applying a fact
  • onBeforeSnapshot() - will be called whenever factus is about to take a snapshot of the projection. Might be an opportunity to clean up.
  • onAfterRestore() - will be called whenever factus deserializes a projection from a snapshot. Might be an opportunity to initialize things.
  • executeUpdate(Runnable) - will be called to update the state of a projection. The runnable includes applying the Fact/Event and also updating the state of the projection, in case you want to do something like introduce transactionality here.

This is not meant to be an exhaustive list. Look at the interfaces/classes you implement/extend and their javadoc.

2.4.2.2 - Aggregate

Another special flavor of a Snapshot Projection is an Aggregate. An Aggregate extends the notion on Snapshot Projection by bringing in an aggregate Id. This is the one of the UserNames example. It does not make sense to maintain two different UserNames Projections, because by definition, the UserNames projection should contain all UserNames in the system. When you think of User however, you have different users in the System that differ in Id and (probably) UserName. So calling factus.fetch(User.class) would not make any sense. Here Factus offers two different methods for access:

/**
 * Same as fetching on a snapshot projection, but limited to one
 * aggregateId. If no fact was found, Optional.empty will be returned
 */
@NonNull
<A extends Aggregate> Optional<A> find(
        @NonNull Class<A> aggregateClass,
        @NonNull UUID aggregateId);

/**
 * shortcut to find, but returns the aggregate unwrapped. throws
 * {@link IllegalStateException} if the aggregate does not exist yet.
 */
@NonNull
default <A extends Aggregate> A fetch(
        @NonNull Class<A> aggregateClass,
        @NonNull UUID aggregateId) {
    return find(aggregateClass, aggregateId)
            .orElseThrow(() -> new IllegalStateException("Aggregate of type " + aggregateClass
                    .getSimpleName() + " for id " + aggregateId + " does not exist."));
}

As you can see, find returns the user as an Optional (being empty if there never was any EventObject published regarding that User), whereas fetch returns the User unwrapped and fails if there is no Fact for that user found.

All the rules from SnapshotProjections apply: The User instance is (together with its id) stored as a snapshot at the end of the operation. You also have the beforeSnapshot() and afterRestore() in case you want to hook into the lifecycle (see SnapshotProjection)

2.4.2.3 - Managed

As we have learnt, SnapshotProjections are created from scratch or from Snapshots, whenever you fetch them. If you look at it from another angle, you could call them unmanaged in a sense, that the application has no control over their lifecycle. There are use cases where this is less attractive. Consider a query model that powers a high-traffic REST API. Recreating an instance of a SnapshotProjection for every query might be too much of an overhead caused by the network transfer of the snapshot and the deserialization involved.

Considering this kind of use, it would be good if the lifecycle of the Model would be managed by the application. It also means, there must be a way to ‘update’ the model when needed (technically, to process all the Facts that have not yet been applied to the projection). However, if the Projection is application managed (so that it can be shared between threads) but needs to be updated by catching up with the Fact-Stream, there is a problem we did not have with SnapshotProjections, which is locking.

Definition

A ManagedProjection is a projection that is managed by the Application. Factus can be used to lock/update/release a Managed Projection in order to make sure it processes Facts in the correct order and uniquely.

Factus needs to make sure only one thread will change the Projection by catching up with the latest Facts. Also, when Factus has no control over the Projection, the Projection implementation itself needs to ensure that proper concurrency handling is implemented in the place the Projection is being queried from, while being updated. Depending on the implementation strategy used by you, this might be something you don’t need to worry about (for instance when using a transactional datastore).

ManagedProjections are StateAware (they know their position in the FactStream) and WriterTokenAware, so that they provide a way for Factus to coordinate updates.

flexible update

One of the most important qualities of ManagedProjections is that they can be updated at any point. This makes them viable candidates for a variety of use cases. A default one certainly is a “strictly consistent” model, which can be used to provide consistent reads over different nodes that always show the latest state from the fact stream. In order to achieve this, you’d just update the model before reading from it.

// let's consider userCount is a spring-bean
UserCount userCount = new UserCount();

// now catchup with the published events
factus.update(userCount);

Obviously, this makes the application dependent on the event store for availability (and possibly latency). The good part however is, that if FactCast was unavailable, you’d still have (a potentially) stale model you can fall back to.

In cases where consistency with the fact-stream is not that important, you might just want to occasionally update the model. An example would be to call update for logged-in users (to make sure, they see their potential writes) but not updating for public users, as they don’t need to see the very latest changes. One way to manage the extends of “staleness” of a ManagedProjection could be just a scheduled update call, once every 5 minutes or whatever your requirements are for public users.


private final UserCount userCount;
private final Factus factus;

@Scheduled(cron = "*/5 * * * *")
public void updateUserCountRegularly(){
    factus.update(userCount);
}

If the projection is externalized and shared, keep in mind that your users still get a consistent view of the system, because all nodes share the same state.

Typical implementations

ManagedProjections are often used where the state of the projection is externalized and potentially shared between nodes. Think of JPA Repositories or a Redis database.

The ManagedProjection instance in the application should provide access to the externalized data and implement the locking facility.

Over time, there will be some examples added here with exemplary implementations using different technologies.

However, ManagedProjections do not have to work with externalized state. Depending on the size of the Projection and consistency requirements between nodes, it might also be a good idea to just have an in-process (local) representation of the state. That makes at least locking much easier.

Let’s move on to LocalManagedProjections…

2.4.2.4 - Managed (local)

As a specialization of ManagedProjection, a LocalManagedProjection lives within the application process and does not use shared external Databases to maintain its state. Relying on the locality, locking and keeping track of the state (position in the eventstream) is just a matter of synchronization and an additional field, all being implemented in the abstract class LocalManagedProjection that you are expected to extend.

public class UserCount extends LocalManagedProjection {

    private int users = 0;

    @Handler
    void apply(UserCreated created) {
        users++;
    }

    @Handler
    void apply(UserDeleted deleted) {
        users--;
    }

    int count() {
        return users;
    }

}

As you can see, the WriterTokenBusiness and the state management are taken care of for you, so that you can just focus on the implementation of the projection.

Due to the simplicity of use, this kind of implementation would be attractive for starting with for non-aggregates, assuming the data held by the Projection is not huge.

2.4.2.5 - Subscribed

The SnapshotProjection and ManagedProjection have one thing in common: The application actively controls the frequency and time of updates by actively calling a method. While this gives the user a maximum of control, it also requires synchronicity. Especially when building query models, this is not necessarily a good thing. This is where the SubscribedProjection comes into play.

Definition

A SubscribedProjection is subscribed once to a Fact-stream and is asynchronously updated as soon as the application receives relevant facts.

Subscribed projections are created by the application and subscribed (once) to factus. As soon as Factus receives matching Facts from the FactCast Server, it updates the projection. The expected latency is obviously dependent on a variety of parameters, but under normal circumstances it is expected to be <100ms, sometimes <10ms.

However, its strength (being updated in the background) is also its weakness: the application never knows what state the projection is in (eventual consistency).

While this is a perfect projection type for occasionally connected operations or public query models, the inherent eventual consistency might be confusing to users, for instance in a read-after-write scenario, where the user does not see his own write. This can lead to suboptimal UX und thus should be used cautiously after carefully considering the trade-offs.

A SubscribedProjection is also StateAware and WriterTokenAware. However, the token will not be released as frequently as with a ManagedProjection. This may lead to “starving” models, if the process keeping the lock is non-responsive.

Please keep that in mind when implementing the locking facility.

2.4.2.6 - Subscribed (local)

As a specialization of the SubscribedProjection, a LocalSubscribedProjection is local to one VM (just like a LocalManagedProjection). This leads to the same problem already discussed in relation to LocalManagedProjection: A possible inconsistency between nodes.

A LocalSubscribedProjection is providing locking (trivial) and state awareness, so it is very easy to use/extend.

2.4.3 - Atomicity

Introduction

When processing events, an externalized projection has two tasks:

  1. persist the changes resulting from the Fact
  2. store the current fact-stream-position

When using an external datastore (e.g. Redis, JDBC, MongoDB), Factus needs to ensure that these two tasks happen atomically: either both tasks are executed or none. This prevents corrupted data in case e.g. the Datastore goes down in the wrong moment.

Factus offers atomic writes through atomic projections.

sequenceDiagram
  participant Projection
  participant External Data Store
  Projection->>External Data Store: 1) update projection
  Note right of External Data Store: Inside Transaction
  Projection->>External Data Store: 2) store fact-stream-position

In an atomic Projection, the projection update and the update of the fact-stream-position need to run atomically

Factus currently supports atomicity for the following external data stores:

Configuration

Atomic projections are declared via specific annotations. Currently, supported are

These annotations share a common configuration attribute:

Parameter NameDescriptionDefault Value
bulkSizehow many events are processed in a bulk50

as well as, different attributes needed to configure the respective underlying technical solution (Transaction/Batch/…). There are reasonable defaults for all of those attributes present.

Optimization: Bulk Processing

In order to improve the throughput of event processing, atomic projections support bulk processing.

With bulk processing

  • the concrete underlying transaction mechanism (e.g. Spring Transaction Management) can optimize accordingly.
  • skipping unnecessary fact-stream-position updates is possible (see next section).

The size of the bulk can be configured via a common bulkSize attribute of the @SpringTransactional, @RedisTransactional or @RedisBatched annotation.

Once the bulkSize is reached, or a configured timeout is triggered, the recorded operations of this bulk will be flushed to the datastore.

Skipping fact-stream-position Updates

Skipping unnecessary updates of the fact-stream-position reduces the writes to the external datastore, thus improving event-processing throughput.

The concept is best explained with an example: Suppose we have three events which are processed by a transactional projection and the bulk size set to “1”. Then, we see the following writes going to the external datastore:

sequenceDiagram
  participant Projection
  participant External Data Store
  Projection->>External Data Store: event 1: update projection data
  Projection->>External Data Store: event 1: store fact-stream-position
  Projection->>External Data Store: event 2: update projection data
  Projection->>External Data Store: event 2: store fact-stream-position
  Projection->>External Data Store: event 3: update projection data
  Projection->>External Data Store: event 3: store fact-stream-position

Processing three events with bulk size “1” - each fact-stream-position is written
As initially explained, here, each update of the projection is accompanied by an update of the fact-stream-position.

In order to minimize the writes to the necessary, we now increase the bulk size to “3”:

sequenceDiagram
  participant Projection
  participant External Data Store
  Projection->>External Data Store: event 1: update projection data
  Projection->>External Data Store: event 2: update projection data
  Projection->>External Data Store: event 3: update projection data
  Projection->>External Data Store: event 3: store fact-stream-position

Processing three events with bulk size “3” - only the last fact-stream-position written

This configuration change eliminates two unnecessary intermediate fact-stream-position updates. The bulk is still executed atomically, so in terms of fact-stream-position updates, we are just interested in the last, most recent position.

Skipping unnecessary intermediate updates to the fact-stream-position, noticeably reduces the required writes to the external datastore. Provided a large enough bulk size (“50” is a reasonable default), this significantly improves event-processing throughput.

2.4.3.1 - Spring Transactional

Broad Data-Store Support

Spring comes with extensive support for transactions which is employed by Spring Transactional Projections.

Standing on the shoulders of Spring Transactions, Factus supports transactionality for every data-store for which Spring transaction management is available. In more detail, for the data-store in question, an implementation of the Spring PlatformTransactionManager must exist.

Motivation

You would want to use Spring Transactional for two reasons:

  • atomicity of factStreamPosition updates and your projection state updates
  • increased fact processing throughput

The Performance bit is achieved by skipping unnecessary factStreamPosition updates and (more importantly) by reducing the number of transactions on your datastore by using one Transaction for bulkSize updates instead of single writes. For instance, if you use Spring Transactions on a JDBC Datastore, you will have one database transaction around the update of bulkSize events. The bulkSize is configurable per projection via the @SpringTransactional annotation.

Configuration

In order to make use of spring transaction support, the necessary dependency has to be included in your project:

    <dependency>
        <groupId>org.factcast</groupId>
        <artifactId>factcast-factus-spring-tx</artifactId>
    </dependency>

Structure

To use Spring Transactionality, a projection needs to:

  • be annotated with @SpringTransactional to configure bulk and transaction-behavior and
  • implement SpringTxProjection to return the responsible PlatformTransactionManager for this kind of Projection

Applying facts

In your @Handler methods, you need to make sure you use the Spring-Managed Transaction when talking to your datastore. This might be entirely transparent for you (for instance, when using JDBC that assigns the transaction to the current thread), or will need you to resolve the current transaction from the given platformTransactionManager example.

Please consult the Spring docs or your driver’s documentation.

You can find blueprints of getting started in the example section.

2.4.3.2 - Redis Transactional

A Redis transactional projection is a transactional projection based on Redisson RTransaction.

Compared to a Spring transactional projection, a Redis transactional projection is more lightweight since

  • transactionality is directly provided by RTransaction. There is no need to deal with Spring’s PlatformTransactionManager
  • the fact stream position is automatically managed (see example below)

Motivation

You would want to use Redis Transactional for two reasons:

  • atomicity of factStreamPosition updates and your projection state updates
  • increased fact processing throughput

The performance bit is achieved by skipping unnecessary factStreamPosition updates and (more importantly) by reducing the number of operations on your Redis backend by using bulkSize updates with one redisson transsaction instead of single writes. The bulkSize is configurable per projection via the @RedisTransactional annotation.

Working with a Redis transactional projection you can read your own uncommitted write. For this reason, a Redis transactional projection is best used for projections which need to access the projection’s data during the handling of an event.

If this is not necessary, you could also use a better performing alternative: Redis batch projection

Configuration

In order to make use of redisson RTransaction support, the necessary dependency has to be included in your project:

    <dependency>
        <groupId>org.factcast</groupId>
        <artifactId>factcast-factus-redis</artifactId>
    </dependency>

Structure

A Redis transactional projection can be a managed- or a subscribed projection and is defined as follows:

  • it is annotated with @RedisTransactional
  • it implements RedisProjection revealing the RedisClient used
  • it provides the serial number of the projection via the @ProjectionMetaData annotation
  • the handler methods receive an additional RTransaction parameter

Example

@Handler
void apply(SomethingHappened fact, RTransaction tx) {
    myMap = tx.getMap( ... ).put( fact.getKey() , fact.getValue() );
}

a full example can be found here

2.4.3.3 - Redis Batched

A Redis batch projection is a atomic projection based on Redisson RBatch. Like Redis transactional projections, also this projection type is more lightweight than Spring transactional projections. No Spring PlatformTransactionManager is needed, the RBatch object of the Redisson library is enough.

Working with a Redis batch projection is asynchronous as multiple commands are collected and executed later in an atomic way (all or none).

Motivation

You would want to use Redis Batched for two reasons:

  • atomicity of factStreamPosition updates and your projection state updates
  • increased fact processing throughput

The performance bit is achieved by skipping unnecessary factStreamPosition updates and (more importantly) by reducing the number of operations on your Redis backend by using a redisson batch instead of single writes for bulkSize updates. The bulkSize is configurable per projection via the @RedisBatched annotation.

Redisson batches improve performance significantly by collecting operations and executing them together as well as delegating all possible work to an async thread. It has the effect, that you cannot read non-executed (batched) changes. If this is unacceptable for your projection, consider Redis transactional projections. See the Redisson documentation for details.

Other than a transaction, writes registered (but not yet executed) on a Redis batch can not be read.

A Redis batch projection, therefore, is recommended for projections which

  • handle a lot of events and
  • don’t require reading the current projection state in an event handler.

For an alternative that allows access to the projection state during event handling, see Redis transactional projection.

Configuration

In order to make use of redisson RBatch support, the necessary dependency has to be included in your project:

    <dependency>
        <groupId>org.factcast</groupId>
        <artifactId>factcast-factus-redis</artifactId>
    </dependency>

Structure

A Redis batch projection supports managed- or subscribed projections and is defined as follows:

  • it is annotated with @RedisBatched
  • it implements RedisProjection revealing the RedisClient used
  • it provides the serial number of the projection via the @ProjectionMetaData annotation
  • the handler methods receive an additional RBatch parameter

This structure is similar to a Redis transactional projection, only the @RedisBatched annotation and the RBatch method parameter differ.

Example

@Handler
void apply(SomethingHappened fact, RBatch batch) {
    myMap = batch.getMap( ... ).putAsync( fact.getKey() , fact.getValue() );
}

a full example can be found here

2.4.4 - Examples

In here, you will find some examples that you can use as a simple blueprint to get started building projections. We make use of some abstract classes here that might be more convenient to use. Feel free to study the implementations of those abstracts to see what is going on, especially when you plan to implement projections with different datastore than what we use in the examples.

2.4.4.1 - UserNames (Spring/JDBC)

Here is an example for a managed projection externalizing its state to a relational database (PostgreSQL here) using Spring transactional management.

The example projects a list of used UserNames in the System.

Preparation

We need to store two things in our JDBC Datastore:

  • the actual list of UserNames, and
  • the fact-stream-position of your projection.

Therefore we create the necessary tables (probably using liquibase/flyway or similar tooling of your choice):

CREATE TABLE users (
    name TEXT,
    id UUID,
    PRIMARY KEY (id));
CREATE TABLE fact_stream_positions (
    projection_name TEXT,
    fact_stream_position UUID,
    PRIMARY KEY (projection_name));

Given a unique projection name, we can use fact_stream_positions as a common table for all our JDBC managed projections.

Constructing

Since we decided to use a managed projection, we extended the AbstractSpringTxManagedProjection class. To configure transaction management, our managed projection exposes the injected transaction manager to the rest of Factus by calling the parent constructor.

@ProjectionMetaData(serial = 1)
@SpringTransactional
public class UserNames extends AbstractSpringTxManagedProjection {

    private final JdbcTemplate jdbcTemplate;

    public UserNames(
            @NonNull PlatformTransactionManager platformTransactionManager, JdbcTemplate jdbcTemplate) {
        super(platformTransactionManager);
        this.jdbcTemplate = jdbcTemplate;
    }
    ...

As we’re making use of Spring here, we inject a PlatformTransactionManager and a JdbcTemplate here in order to communicate with the database in a transactional way.

Two remarks:

  1. As soon as your project uses the spring-boot-starter-jdbc dependency, Spring Boot will automatically provide you with a JDBC-aware PlatformTransactionManager.
  2. To ensure that the database communication participates in the managed transaction, the database access mechanism must be also provided by Spring. Thus, we suggest using the JdbcTemplate.

Configuration

The @SpringTransactional annotation provides various configuration options:

Parameter NameDescriptionDefault Value
bulkSizebulk size50
timeoutInSecondstimeout in seconds30

Updating the projection

The two possible abstract base classes, AbstractSpringTxManagedProjection or AbstractSpringTxSubscribedProjection, both require the following methods to be implemented:

Method SignatureDescription
public UUID factStreamPosition()read the last position in the Fact stream from the database
public void factStreamPosition(@NonNull UUID factStreamPosition)write the current position of the Fact stream to the database
public WriterToken acquireWriteToken(@NonNull Duration maxWait)coordinates write access to the projection, see here for details

The first two methods tell Factus how to read and write the Fact stream’s position from the database.

Writing the fact position

Provided the table fact_stream_positions exists, here is an example of how to write the Fact position:

@Override
public void factStreamPosition(@NonNull UUID factStreamPosition) {
    jdbcTemplate.update(
            "INSERT INTO fact_stream_positions (projection_name, fact_stream_position) " +
            "VALUES (?, ?) " +
            "ON CONFLICT (projection_name) DO UPDATE SET fact_stream_position = ?",
            getScopedName().asString(),
            factStreamPosition,
            factStreamPosition);
}

For convenience, an UPSERT statement (Postgres syntax) is used, which INSERTs the UUID the first time and subsequently only UPDATEs the value.

To avoid hard-coding a unique name for the projection, the provided method getScopedName() is employed. The default implementation makes sure the name is unique and includes the serial of the projection.

Reading the fact position

To read the last Fact stream position, we simply select the previously written value:

@Override
public UUID factStreamPosition() {
    try {
        return jdbcTemplate.queryForObject(
                "SELECT fact_stream_position FROM fact_stream_positions WHERE projection_name = ?",
                UUID.class,
                getScopedName().asString());
    } catch (IncorrectResultSizeDataAccessException e) {
        // no position yet, just return null
        return null;
    }
}

In case no previous Fact position exists, null is returned.

Applying Facts

When processing the UserCreated event, we add a new row to the users tables, filled with event data:

@Handler
void apply(UserCreated e) {
    jdbcTemplate.update(
            "INSERT INTO users (name, id) VALUES (?,?);",
            e.getUserName(),
            e.getAggregateId());
}

When handling the UserDeleted event we do the opposite and remove the appropriate row:

@Handler
void apply(UserDeleted e) {
    jdbcTemplate.update("DELETE FROM users where id = ?", e.getAggregateId());
}

We have finished the implementation of the event-processing part of our projection. What is missing is a way to make the projection’s data accessible for users.

Querying the projection

Users of our projections (meaning “other code”) contact the projection via it’s public API. Currently, there is no public method offering “user names”. So let’s change that:

public List<String> getUserNames() {
    return jdbcTemplate.query("SELECT name FROM users", (rs, rowNum) -> rs.getString(1));
}

Using The Projection

Calling code that wants to talk to the projection, now just needs to call the getUserNames method:

// create a local instance or get a Spring Bean from the ApplicationContext, depending on your code organization
UserNames userNameProjection = new UserNames(platformTransactionManager, jdbcTemplate);

// depending on many factors you *may* want to update the projection before querying it
factus.update(userNameProjection);

List<String> userNames = userNameProjection.getUserNames();

First, we create an instance of the projection and provide it with all required dependencies. As an alternative, you may want to let Spring manage the lifecycle of the projection and let the dependency injection mechanism provide you an instance.

Next, we call update(...) on the projection to fetch the latest events from the Fact stream. Note that when you use a pre-existing (maybe Spring managed singleton) instance of the projection, this step is optional and depends on your use-case. As last step, we ask the projection to provide us with user names by calling getUserNames().

Full Example

To study the full example see

2.4.4.2 - UserNames (Redis Transactional)

Here is a projection that handles UserCreated and UserDeleted events. It solves the same problem as the example we’ve seen in Spring transactional projections. However, this time we use Redis as our data store and Redisson as the access API.

Configuration

The @RedisTransactional annotation provides various configuration options:

Parameter NameDescriptionDefault Value
bulkSizebulk size50
timeouttimeout in milliseconds until a transaction is interrupted and rolled back30000
responseTimeouttimeout in milliseconds for Redis response. Starts to countdown when transaction has been successfully submitted5000
retryAttemptsmaximum attempts to send transaction5
retryIntervaltime interval in milliseconds between retry attempts3000

Constructing

Since we decided to use a managed projection, we extended the AbstractRedisManagedProjection class. To configure the connection to Redis via Redisson, we injected RedissonClient in the constructor, calling the parent constructor.

@ProjectionMetaData(serial = 1)
@RedisTransactional
public class UserNames extends AbstractRedisManagedProjection {

  public UserNames(RedissonClient redisson) {
    super(redisson);
  }
    ...

FactStreamPosition and Lock-Management are automatically taken care of by the underlying AbstractRedisManagedProjection.

In contrast to non-atomic projections, when applying Facts to the Redis data structure, the instance variable userNames cannot be used as this would violate the transactional semantics. Instead, accessing and updating the state is carried out on a transaction derived data-structure (Map here) inside the handler methods.

Updating the projection

Applying Events

Received events are processed inside the methods annotated with @Handler (the handler methods). To participate in the transaction, these methods have an additional RTransaction parameter which represents the current transaction.

Let’s have a closer look at the handler for the UserCreated event:

@Handler
void apply(UserCreated e,RTransaction tx){
        Map<UUID, String> userNames=tx.getMap(getRedisKey());
        userNames.put(e.getAggregateId(),e.getUserName());
}

In the previous example, the method getRedisKeys() was used to retrieve the Redis key of the projection. Let’s have a closer look at this method in the next section.

Default redisKey

The data structures provided by Redisson all require a unique identifier which is used to store them in Redis. The method getRedisKey() provides an automatically generated name, assembled from the class name of the projection and the serial number configured with the @ProjectionMetaData.

Additionally, an AbstractRedisManagedProjection or a AbstractRedisSubscribedProjection maintain the following keys in Redis:

  • getRedisKey() + "_state_tracking" - contains the UUID of the last position of the Fact stream
  • getRedisKey() + "_lock" - shared lock that needs to be acquired to update the projection.

Redisson API Datastructures vs. Java Collections

As seen in the above example, some Redisson data structures also implement the appropriate Java Collections interface. For example, you can assign a Redisson RMap also to a standard Java Map:

// 1) use specific Redisson type
RMap<UUID, String> =tx.getMap(getRedisKey());

// 2) use Java Collections type
        Map<UUID, String> =tx.getMap(getRedisKey());

There are good reasons for either variant, 1) and 2):

Redisson specificplain Java
extended functionality which e.g. reduces I/O load. (e.g. see RMap.fastPut(...) and RMap.fastRemove(...)standard, intuitive
only option when using data-structures which are not available in standard Java Collections (e.g. RedissonListMultimap)easier to test

Full Example


@ProjectionMetaData(serial = 1)
@RedisTransactional
public class UserNames extends AbstractRedisManagedProjection {

  private final Map<UUID, String> userNames;

  public UserNames(RedissonClient redisson) {
    super(redisson);

     userNames = redisson.getMap(getRedisKey());
  }

  public List<String> getUserNames() {
    return new ArrayList<>(userNames.values());
  }

  @Handler
  void apply(UserCreated e, RTransaction tx) {
    tx.getMap(getRedisKey()).put(e.getAggregateId(), e.getUserName());
  }

  @Handler
  void apply(UserDeleted e, RTransaction tx) {
    tx.getMap(getRedisKey()).remove(e.getAggregateId());
  }
}

To study the full example, see

2.4.4.3 - UserNames (Redis Batched)

We continue using the previously introduced example of a projection handling UserCreated and UserDeleted events:

Configuration

The @RedisBatched annotation provides various configuration options:

Parameter NameDescriptionDefault Value
bulkSizebulk size50
responseTimeouttimeout in milliseconds for Redis response5000
retryAttemptsmaximum attempts to transmit batch of Redis commands5
retryIntervaltime interval in milliseconds between retry attempts3000

Constructing

Since we decided to use a managed projection, we extended the AbstractRedisManagedProjection class. To configure the connection to Redis via Redisson, we injected RedissonClient in the constructor, calling the parent constructor.

@ProjectionMetaData(serial = 1)
@RedisTransactional
public class UserNames extends AbstractRedisManagedProjection {

  public UserNames(RedissonClient redisson) {
    super(redisson);
  }
    ...

We are using a managed projection, hence we extend the AbstractRedisManagedProjection class. To let Factus take care of the batch submission and to automatically persist the Fact stream position and manage the locks, we provide the parent class with the instance of the Redisson client (call to super(...))

Updating the projection

Applying Events

To access the batch, the handler methods require an additional RBatch parameter:

@Handler
void apply(UserCreated created, RBatch batch) {
    batch.getMap(getRedisKey())
          .putAsync(created.getAggregateId(), created.getUserName());
}

We use a batch-derived RMapAsync object which offers asynchronous versions of the common Map methods. By calling putAsync(...) we add the extracted event data to the map. Underneath, the RBatch collects this change and, at a convenient point in time, transmits it together with other changes to Redis.

Default redisKey

The data structures provided by Redisson all require a unique identifier which is used to store them in Redis. The method getRedisKey() provides an automatically generated name, assembled from the class name of the projection and the serial number configured with the @ProjectionMetaData.

Additionally, an AbstractRedisManagedProjection or a AbstractRedisSubscribedProjection maintain the following keys in Redis:

  • getRedisKey() + "_state_tracking" - contains the UUID of the last position of the Fact stream
  • getRedisKey() + "_lock" - shared lock that needs to be acquired to update the projection.

Redisson API Datastructures vs. Java Collections

As seen in the above example, some Redisson data-structures also implement the appropriate Java Collections interface. For example, you can assign a Redisson RMap also to a standard Java Map:

// 1) use specific Redisson type
        RMap<UUID, String> = tx.getMap(getRedisKey());

// 2) use Java Collections type
        Map<UUID, String> = tx.getMap(getRedisKey());

There are good reasons for either variant, 1) and 2):

Redisson specificplain Java
extended functionality which e.g. reduces I/O load. (e.g. see RMap.fastPut(...) and RMap.fastRemove(...)standard, intuitive
only option when using data-structures which are not available in standard Java Collections (e.g. RedissonListMultimap)easier to test

Full Example

@ProjectionMetaData(serial = 1)
@RedisBatcheded
public class UserNames extends AbstractRedisManagedProjection {

    public UserNames(RedissonClient redisson) {
        super(redisson);
    }

    public List<String> getUserNames() {
        RMap<UUID, String> userNames = redisson.getMap(getRedisKey());
        return new ArrayList<>(userNames.values());
    }

    @Handler
    void apply(UserCreated created, RBatch batch) {
        RMapAsync<UUID, String> userNames = batch.getMap(getRedisKey());
        userNames.putAsync(created.getAggregateId(), created.getUserName());
    }

    @Handler
    void apply(UserDeleted deleted, RBatch batch) {
        batch.getMap(getRedisKey()).removeAsync(deleted.getAggregateId());
    }
}

To study the full example, see

2.4.5 - Callbacks

When implementing the Projection interface, the user can choose to override these default hook methods for more fine-grained control:

Method SignatureDescription
List<FactSpec> postprocess(List<FactSpec> specsAsDiscovered)further filter the handled facts via their fact specification including aggregate ID and meta entries
void onCatchup()invoked after all past facts of the streams were processed. This is a good point to signal that the projection is ready to serve data (e.g. via a health indicator).
void onComplete()called when subscription closed without error
void onError(Throwable exception)called when subscription closed after receiving an error. The default impl is to simply logs the error.

postprocess

Annotating your handler methods gives you a convenient way of declaring a projection’s interest into particular facts, filtered by ns,type,pojo to deserialize into, version etc. This kind of filtering should be sufficient for most of the use-cases. However, annotations have to have constant attributes, so what you cannot do this way is to filter on values that are only available at runtime: A particular aggregateId or a calculated meta-attribute in the header.

For these use-cases the postprocess hook can be used.

The following projection handles SomethingStarted and SomethingEnded events. When updating the projection, Factus invokes the postprocess(...) method and provides it with a list of FactSpec specifications as discovered from the annotations. If you override the default behavior here (which is just returning the list unchanged), you can intercept and freely modify, add or remove the FactSpecs. In our example this list will contain two entries with the FactSpec built from the SomethingStarted and ‘SomethingEnded’ classes respectively.

In the example only facts with a specific aggregate ID and the matching meta entry will be considered, by adding these filters to every discovered FactSpec.

public class MyProjection extends LocalManagedProjection {
  @Handler
  void apply(SomethingStarted event) { // ...
  }
  @Handler
  void apply(SomethingEnded event) { // ...
  }

  @Override
  public @NonNull List<FactSpec> postprocess(@NonNull List<FactSpec> specsAsDiscovered) {
    specsAsDiscovered.forEach(
        spec ->
            // method calls can be chained
            spec.aggId(someAggregateUuid)
                .meta("someMetaAttribute", "someValue"));
    return specsAsDiscovered;
  }

onCatchup

The Factus API will call the onCatchup method after an onCatchup signal was received from the server, indicating that the fact stream is now as near as possible to the end of the FactStream that is defined by the FactSpecs used to filter. Depending on the type of projection, the subscription now went from catchup to follow mode (for follow subscriptions), or is completed right after (for catchup subscriptions, see onComplete). One popular use-case for implementing the onCatchup method is to signal the rest of the service that the projection is ready to be queried and serve (not too stale) data. In Spring for instance, a custom health indicator can be used for that purpose.

@Override
public void onCatchup() {
      log.debug("Projection is ready now");
      // perform further actions e.g. switch health indicator to "up"
}

onComplete

The onComplete method is called when the server terminated a subscription without any error. It is the last signal a server sends. The default behavior is to ignore this.

onError

The onError method is called when the server terminated a subscription due to an error, or when one of your apply methods threw an exception. The subscription will be closed, either way. The default behavior is to just log the error.

2.4.6 - Filtering

When implementing a Projection, you would add handler methods (methods annotated with either @Handler or @HandlerFor) in order to express, what the projection is interested in.

Factus will look at these methods in order to discover fact specifications. These fact specifications form a query which is sent to the FactCast server to create a fact-stream suited for this projection. In detail, for each handler method, a Projector inspects the method’s annotations and parameter types including their annotations to build a FactSpec object. This object contains at least the ns, type properties. Optionally the version property is set.

If you look at a FactSpec however, sometimes it makes sense to use additional filtering possibilities like

  • aggregateId
  • meta key/value pair (one or more) or even
  • JavaScript acting as a predicate.

If for a projection these filters are known in advance, you can use additional annotations to declare them:

  • @FilterByAggId
  • @FilterByScript
  • @FilterByMeta (can be used repeatedly)

Example

Let’s say, you only want to receive events that have a meta pair “priority”:“urgent” in their headers. Here, you would use code like:

  @Handler
  @FilterByMeta(key="priority", value="urgent")
  protected void apply(UserCreated created) {
    // ...
  }

This will add the additional filter defined by the @FilterByMeta annotation to FactSpec. As a result, the filtering now takes place at the server side instead of wasteful client side filtering (like in the body of the apply method).

2.5 - Optimistic locking

To make business decisions, you need a model to base those decisions on. In most cases, it is important that this model is consistent with the facts published at the time of the decision and that the model is up-to-date.

For example, we want to ensure that the username is unique for the whole system. In case of (potentially) distributed applications and especially in case of eventsourced applications, this can be a difficult problem. For sure what you want to avoid is pessimistic locking for all sorts of reasons, which leaves us with optimistic locking as a choice.

On a general level, optimistic locking:

  • tries to make a change and then to write that change or
  • if something happens in the meantime that could invalidate this change, discard the change and try again taking the new state into account.

Often this is done by adding a versionId or timestamp to a particular Entity/Aggregate to detect concurrent changes.

This process can be repeated until the change is either successful or definitively unsuccessful and needs to be rejected.

For our example that would mean:

If a new user registers,

  1. check if the username is already taken
    • if so, reject the registration
    • if not, prepare a change that creates the user
  2. check if a new user was created in between, and
    • repeat from the beginning if this is the case
    • execute the change while making sure no other change can interfere.

In FactCast/Factus, there is no need to assign a versionId or timestamp to an aggregate or even have aggregates for that matter. All you have to do is to define a scope of an optimistic lock to check for concurrent changes in order to either discard the prepared changes and try again, or to publish the prepared change if there was no interfering change in between.

Let’s look at the example above:

Consider, you have a SnapshotProjection UserNames that we have seen before.

public class UserNames implements SnapshotProjection {

  private final Map<UUID, String> existingNames = new HashMap<>();

  @Handler
  void apply(UserCreated created) {
    existingNames.put(created.aggregateId(), created.userName());
  }

  @Handler
  void apply(UserDeleted deleted, FactHeader header) {
    existingNames.remove(deleted.aggregateId());
  }

  boolean contains(String name) {
    return existingNames.values().contains(name);
  }

// ...

In order to implement the use case above (enforcing unique usernames), what we can do is basically:

  UserNames names=factus.fetch(UserNames.class);
        if(names.contains(cmd.userName)){
        // reject the change
        }else{
        UserCreated prepared=new UserCreated(cmd.userId,cmd.userName));
        // publish the prepared UserCreated Event
        }

Now in order to make sure that the code above is re-attempted until there was no interference relevant to the UserNames Projection and also that the business decision (the simple if clause) is always based on the latest up-to-date data, Factus offers a simple syntax:

/**
 * optimistically 'locks' on a SnapshotProjection
 */
<P extends SnapshotProjection> Locked<P> withLockOn(@NonNull Class<P> snapshotClass);

Applied to our example that would be


UserRegistrationCommand cmd=...    // details not important here

        factus.withLockOn(UserNames.class)
        .retries(10)                     // optional call to limit the number of retries
        .intervalMillis(50)              // optional call to insert pause with the given number of milliseconds in between attempts
        .attempt((names,tx)->{
        if(names.contains(cmd.userName)){
        tx.abort("The Username is already taken - please choose another one.");
        }else{
        tx.publish(new UserCreated(cmd.userId,cmd.userName));
        }

        });

As you can see here, the attempt call receives a BiConsumer that consumes

  1. your defined scope, updated to the latest changes in the Fact-stream
  2. a RetryableTransaction that you use to either publish to or abort.

Note that you can use either a SnapshotProjection (including aggregates) as well as a ManagedProjection to lock on. A SubscribedProjection however is not usable here, due to the fact that they are in nature eventual consistent, which breaks a necessary precondition for optimistic locking.

Also note that you should not (and cannot) publish to Factus directly when executing an attempt, as this would potentially break the purpose of the optimistic lock, and can lead to infinite loops.

For further details on how to add operations that are executed after successful publishing or on failure handling, please consult the JavaDocs, or look at the provided examples.

2.6 - Testing

Factcast comes with a module factcast-test that includes a Junit5 extension that you can use to wipe the postgres database clean between integration tests. The idea is, that in integration tests, you may want to start every test method with no preexisting events. Assuming you use the excellent TestContainers library in order to create & manage a postgres database in integration tests, the extension will find it and wipe it clean. In order to use the extension you either need to enable Junit-Extension-Autodetection, or use

@ExtendWith(FactCastExtension.class)

on your integration Test Class.

The easy way to get the full package is to just extend AbstractIntegrationTest:

public class MyIntegrationTest extends AbstractFactcastIntegrationTest { // ...
}

which gives you the factcast Docker image respective to the version of the dependency you used from docker-hub running against a sufficiently current postgres, both being started in a docker container (locally installed docker is a prerequisite of course).

If you want to be selective about the versions used, have a look at @FactcastTestConfig which lets you pin the versions if necessary and allows for advanced configuration.

Also, in order to make sure, that FactCast-Server is NOT caching internally in memory, you can add a property to switch it into integrationTestMode. See Properties.

Local Redis

In case you are also using Redis, there is an additional factcast-test-redis module. When added as Maven dependency it is automatically picked up by the FactCastExtension and starts a local Redis instance.

2.7 - Handler Parameters

Inside projections, Factus uses methods annotated with @Handler or @HandlerFor to process events. These methods allow various parameters, also in combination, which can serve as “input” during event handling.

Common Handler Parameters

Parameter TypeDescriptionvalid on @Handlervalid on @HandlerFor
FactProvides access to all Fact details including header (JSON) and payload (JSON)yesyes
FactHeaderthe Fact header. Provides access to event namespace, type, version, meta entries and othersyesyes
UUIDthe Fact ID of the Fact headeryesyes
? extends EventObjectan instance of a concrete class implementing EventObject.yesno

Extras on Redis atomic Projections

Additional to these common parameters, ProjectionLenses can add parameters to be used by handler methods. For instance handler methods of a @RedisBatched projection should use:

Parameter TypeDescriptionvalid on @Handlervalid on @HandlerFor
RBatchneeded in a Redis batched projectionyesyes

similar to @RedisTransactional projections that should use:

Parameter TypeDescriptionvalid on @Handlervalid on @HandlerFor
RTransactionneeded in a Redis transactional projectionyesyes

Examples

@Handler

Here are some examples:

// handle the "SomeThingStarted" event.
// deserialization happened automatically
@Handler
void apply(SomethingStarted event) {
    var someValue = event.getSomeProperty();
    ...
}

// handle the "SomethingChanged" event.
// additionally use information from the Fact header
@Handler
void apply(SomethingChanged event, FactHeader header) {
    int eventVersion = header.version();
    String someMetaDataValue = header.meta().get("some-metadata-key");
    ...
}

// use multiple parameters
@Handler
void apply(SomethingReactivated event,
           FactHeader factHeader,
           UUID factId,
           Fact fact) {
    ...
}

These examples were all based on handling events which

The next section introduces a more direct alternative.

@HandlerFor

The @HandlerFor annotation allows only direct access to the Fact data like header or payload without any deserialization.

// handle "SomethingAdded" events in their version 1
// living in the "test" namespace
@HandlerFor(ns = "test", type = "SomethingAdded", version = 1)
void applySomethingAdded(Fact fact) {
    String payload = fact.jsonPayload();
    ...
}

// also here, multiple parameters can be used
@HandlerFor(ns = "test", type = "SomethingRemoved", version = 2)
void applySomethingRemoved(FactHeader factHeader, UUID factId, Fact fact) {
    ...
}

Full Example

See here for the full example.

2.8 - Metrics

Like the FactCast server, also Factus makes use of micrometer.io metrics.

Metric namespaces and their organization

At the time of writing, there are three namespaces exposed:

  • factus.timings
  • factus.counts
  • factus.gauges

Depending on your micrometer binding, you may see a slightly different spelling in your data (like ' factus_timings`, if your datasource has a special meaning for the ‘.’-character)

The metrics are automatically tagged with

  • the emitting class (class tag)
  • the name of the metric (name tag)

Existing Metrics

At the time of writing (Factcast version 0.3.13) the following metrics are supported:

Counted

  • transaction_attempts - how often was a transaction retried. See Optimistic Locking for more background
  • transaction_abort - how often was an attempted transaction aborted

Gauged

Timed

  • managed_projection_update_duration - duration in milliseconds a Managed Projection took to update
  • fetch_duration - duration in milliseconds it took to fetch a Snapshot projection
  • find_duration - duration in milliseconds it took to find a specific Aggregate
  • event_processing_latency - time difference in milliseconds between a fact was published and a consuming
    Subscribed projection was updated

2.9 - Tips

This section contains some tips and tricks that you might find useful to improve performances or to cover some corner use cases.

@SuppressFactusWarnings

Similar to java.lang.SuppressWarnings, you can use this annotation to suppress warnings. You could notice these when factus encounters a class violating good practices (for instance when scanning your projection) the first time.

The annotation can be scoped to a type, method or field declaration.

It requires a value, which specifies the type of warning(s) to suppress. At the time of writing (Factcast version 0.5.2), the allowed values are:

  • SuppressFactusWarnings.Warning.ALL suppresses all Factus related warnings
  • SuppressFactusWarnings.Warning.PUBLIC_HANDLER_METHOD suppresses “Handler methods should not be public” type of warning, caused by projection handler methods having a public scope

3 - Hitchhiker's Guides

TODO add content

3.1 - Hitchhiker's Guide To Projection Selection

Introduction

Projections, the derived views of our event-sourced data, serve as vital components in shaping our applications, enabling efficient querying, analysis, and decision-making. However, with Factus offering a range of projection options, each with its own strengths and considerations, it becomes essential to choose wisely.

Our objective is to equip you with the knowledge and insights necessary to navigate the available options and make the right choices that align with your project’s requirements. We will delve into the intricacies of each projection type, uncover their unique features and trade-offs, and provide practical advice to aid your decision-making process.


Identifying Relevant Requirements

Before diving into the exploration of different projection types, it is essential to establish a clear understanding of the requirements that are relevant to your specific project. By identifying these requirements upfront, you can effectively narrow down the options and choose the projection type that best aligns with your project goals and constraints.

In this section, we will delve into a comprehensive list of possible requirements that should be considered when evaluating projection types. By examining and prioritizing these requirements, you will gain valuable insights into the trade-offs and considerations associated with each projection type.

Scalability

This requirement focuses on the ability of the chosen projection type to handle growing amounts of data and increasing workloads without compromising performance or functionality. Considerations include the horizontal scalability of the projection, the efficiency of data distribution, and the ability to handle concurrent updates and queries.

Performance (split between latencies and costs?)

Performance refers to the speed and responsiveness of the projection type in processing events and serving queries. It involves evaluating factors such as event ingestion rates, query response times, and the impact of increasing data volumes on overall system performance. Choosing a projection type that can meet the desired performance benchmarks is crucial for maintaining a high-performing and responsive system.

Query flexibility

Query flexibility assesses the ability to express complex queries and retrieve relevant information efficiently. It involves evaluating the projection type’s support for various query patterns, such as filtering, aggregations, joins, and ad-hoc queries. Consider whether the chosen projection type enables the desired flexibility in querying the event-sourced data while maintaining good performance.

Complexity

Complexity refers to the level of intricacy and sophistication involved in implementing and managing the chosen projection type. It encompasses aspects such as the learning curve for developers, the architectural complexity of the projection, and the degree of operational complexity. It is important to assess whether the complexity aligns with the team’s expertise and resources.

Data consistency

Data consistency focuses on ensuring that the derived views produced by the projection type accurately reflect the state of the event stream. It involves assessing how well the projection type handles events, updates, and concurrent modifications to maintain a consistent and coherent view of the data across different projections. Ensuring data consistency is crucial for making reliable and accurate decisions based on the derived views.

Maintainability

Maintainability assesses the ease of managing, updating, and evolving the projection type over time. Considerations include the ability to accommodate changing business requirements, the ease of making modifications or adding new features, and the availability of monitoring and debugging tools. Choosing a projection type that is maintainable ensures long-term sustainability and adaptability of the system.


Comparing Projection Types

Let’s go over a comparative analysis of the different projection types, discussing the strengths and weaknesses of each type concerning the identified requirements.

Snapshot Projection

Snapshot Projection documentation

  • Scalability: by default, a Snapshot Projection stores its cached state in the FactCast server (aka the Event Store). This can create bottlenecks, or impact the performance of the Event Store, whenever the workload increases. It can be optimized, depending on the use case, by hooking into the snapshot lifecycle and changing the way it is accessed, serialized, stored, and retained. Alternatively, the factcast-snapshotcache-redisson module can be used, to easily store the snapshots in a Redis cluster (see Best Practices and Tips section below).

  • Performance: whenever the projection is fetched for new events, the most recent snapshot is transferred, de-serialized, updated, then re-serialized, and transferred back to the Event Store. Performance might decrease with the increase of the snapshots size, and/or the frequency of the queries.

  • Query flexibility: a Snapshot Projection allows to query, and aggregate, multiple event types on-demand. Any type of data structure can be used to store the projected events, considering that it needs to be serializable.

  • Complexity: the complexity of this projection varies, as it’s fairly easy to use, with the default snapshot lifecycle implementation. It can get more complex, whenever it’s necessary to customize its aspects.

  • Data consistency: when fetched, the Snapshot Projection ensures to return the most recent representation of the event stream. It supports optimistic locking to handle concurrent modifications. Check out the Optimistic Locking section for further details.

  • Maintainability: the projection allows to change the business logic of the event handlers, to create new data structures for the derived views, update existing ones, or to add new decisions. To do so, it is necessary to update the serial of the projection every time the projection class is changed - check out the documentation. The full event-stream will then be re-consumed on the subsequent fetch, as the previously created snapshots will get invalidated. The snapshots retention might need to be fine-tuned on the long run, based on the amount of resources available, and the frequency of the queries.

Aggregate

Aggregate documentation

  • Scalability: same considerations made for the Snapshot Projection apply.

  • Performance: same considerations made for the Snapshot Projection apply.

  • Query flexibility: depending on the events schema design, an Aggregate Projection offers limited flexibility, compared to a snapshot projection, as it allows to build views that are specific to a single entity (or aggregate, hence the name of this projection). This doesn’t restrict to perform multiple queries for different aggregate ids, to create relations between them. It could be argued that alternative projection types may be better suited for these types of use cases, thereby reducing the number of requests sent to the server.

  • Complexity: same considerations made for the Snapshot Projection, but conceptually speaking, this is the easiest projection to use, as it should just represent the state of a single entity.

  • Data consistency: same considerations made for the Snapshot Projection apply.

  • Maintainability: same considerations made for the Snapshot Projection apply.

Managed Projection

Managed Projection documentation

Preface: considering a Managed Projection that has its state externalized in a shared database.

  • Scalability: a Managed Projection enables the application to effectively handle the lifecycle of the views, allowing to adapt it to the expected workload. With a shared database, the derived views are uniformly accessible and consistent among multiple instances of the projection.

  • Performance: whenever a projection is updated, the most recent events since the last update are fetched from the Event Store, and processed. The performance of the projection depends on the frequency of the updates, the amount of events that need to be processed, and of course, the complexity of the business logic to manage the derived views.

  • Query flexibility: a Managed Projection allows to query, and aggregate, multiple event types on-demand. Any type of external datasource can be potentially used to store the derived views. Since Factus has no control over the Projection, the projection implementation itself needs to ensure that proper concurrency handling is implemented, whenever the underlying datasource doesn’t support it.

  • Complexity: this projection requires to implement the business logic to manage the derived views, and to handle the concurrency (if needed). It might also be necessary to design the lifecycle of the projection, to ensure that it is updated at the desired frequency (e.g. using scheduled updates).

  • Data consistency: since a shared datasource is used to store the derived views, the same state is shared across all instances of the projection. Of course, the derived views might be stale, whenever new events are published, but the projection can be updated while processing queries, to ensure that the most recent state is returned. It supports optimistic locking to handle concurrent modifications. Check out the Optimistic Locking section for further details.

  • Maintainability: a managed projection enables the construction of derived views that can potentially be queried even when the Event Store is unavailable. The projection allows to change the business logic of the event handlers, and change the underlying structure the derived views. To do so, it is necessary to update the serial of the projection every time the projection class is changed - check out the documentation. The full event-stream will then be re-consumed on the subsequent updates, to rebuild the derived views.

Local Managed Projection

Local Managed Projection documentation

  • Scalability: a Local Managed Projection stores its state in-memory. Depending on the use-case, this can create performance, and availability issues on the long-run, whenever the derived views size increases over time or is affected by peaks. Remember that, during horizontal scaling, each instance will maintain its independent state, potentially resulting in data inconsistencies.

  • Performance: same considerations made for the Managed Projection. Arguably, the performance of a Local Managed Projection is better, as it doesn’t need to access an external datasource to store the derived views. However, it needs to be considered that the derived views are stored in-memory, and that the memory footprint of the projection will increase over time, potentially affecting the performance of the application.

  • Query flexibility: a Local Managed Projection offers the highest degree of freedom, in terms of flexibility, as it enable to manage the in-memory views using whatever data structure offered by the programming language.

  • Complexity: this projection only requires to implement the business logic to manage the derived views. For this reason, it is probably the easiest projection to start with, especially for a proof of concept, or a prototype.

  • Data consistency: since the derived views are stored in-memory, the same state won’t be shared across multiple instances. In terms of staleness, the same considerations made for the Managed Projection apply.

  • Maintainability: a Local Managed Projection is the easiest projection to maintain, as it doesn’t require to manage external datasources. Everytime the application is stopped, the derived views are lost, and need to be rebuilt on subsequent restarts: this allows to easily test the projection, and change its business logic, but also has an impact on the performances, as the derived views need to be rebuilt from scratch.

Subscribed Projection

Subscribed Projection documentation

Preface: considering a Subscribed Projection that has its state externalized in a shared database.

  • Scalability: only one instance will actually subscribe to the event stream, and receive events asynchronously. This implies that horizontal scaling could be limited, as only one instance will be able to execute the handlers business logic. However, with a shared database, the derived views are uniformly accessible and consistent among multiple instances of the projection, enabling to spread the load of the queries.

  • Performance: after catching-up, the projection consumes events right after those are published, with a small latency (expected to be below 100ms). The projection performance only depends on the complexity of the business logic, and the underlying datasource used to store the derived views.

  • Query flexibility: a Managed Projection allows to query, and aggregate, multiple event types on-demand. Any type of external datasource can be potentially used to store the derived views. Since Factus has no control over the Projection, the projection implementation itself needs to ensure that proper concurrency handling is implemented, whenever the underlying datasource doesn’t support it. Since the derived views are updated asynchronously, it is possible to query only the most recent state of the derived views, and not the state at the time of the query.

  • Complexity: this projection requires to implement the business logic to manage the derived views, and to handle the concurrency (if needed). The projection update is handled asynchronously by Factus, reducing the complexity of the application.

  • Data consistency: since a shared datasource is used to store the derived views, the same state is shared across all instances of the projection. Since the application is not responsible for the projection update, it never knows the current projection state, which is then eventually consistent. This might be confusing, especially during a read-after-write scenario, where the user expects to see the result of the update immediately after the command is executed.

  • Maintainability: in terms of maintainability, a Subscribed Projection is similar to a Managed Projection, as it allows to change the business logic of the event handlers, and change the underlying structure the derived views. To do so, it is necessary to update the serial of the projection every time the projection class is changed - check out the documentation. The full event-stream will then be re-consumed on the next catch-up phase (when the new projection starts), to rebuild the derived views.

Local Subscribed Projection

Local Subscribed Projection documentation

  • Scalability: same considerations made for the Local Managed Projection apply.

  • Performance: same considerations made for the Local Managed Projection apply.

  • Query flexibility: same considerations made for the Local Managed Projection apply.

  • Complexity: same considerations made for the Local Managed Projection apply.

  • Data consistency: same considerations made for the Local Managed Projection apply, with the difference that since the application is not responsible for the projection update, it never knows the current projection state, which is then eventually consistent.

  • Maintainability: same considerations made for the Local Managed Projection apply.


Selecting the Right Projection Type

When embarking on the journey of selecting the right projection type for your event sourcing project, it is crucial to carefully evaluate and prioritize the identified requirements based on your project’s unique context.

That being said, here are some general Q&As to help you make an informed choice:

Q: Are you still modelling for a prototype?

A: If yes, then you can start with a Local, as it’s the easiest and most intuitive projection to implement and quick to change an rebuild.

Q: Do you need to easily rebuild your projection?

A: If yes, then consider using a Local Projection, as it allows to rebuild the in-memory derived views from scratch by simply restarting the application. Consider anyway that this might have an impact on the overall application performance, as this will produce an overhead on each deployment.

Q: Do you need to ensure high availability for queries of a usecase, even when the Event Store is unavailable?

A: If yes, then make sure to go for a projection that doesn’t rely on the Event Store for persisting its state. Consider the trade-offs between local and externalized states:

  • Local states are faster to query, easier to implement and to maintain, but they need to be rebuilt from scratch on every restart
  • Externalized states are harder to implement and maintain, but they can be rebuilt incrementally, and are available across multiple instances

Q: Does your query need to ensure read-after-write consistency?

A: If yes, then it’s suggested to choose a projection that can be updated synchronously, like a SnapshotProjection, an Aggregate or a ManagedProjection. Depending on the amount of data to be read, and the persistence layer, this might have a different impact on the application performance.

Q: Should the projected data be available for external services?

A: If yes, then opt for a projection that offers freedom in terms of persistence, like a ManagedProjection or a SubscribedProjection. This will allow to store the derived views in an external datasource, and to query them using whatever technology is available.

Q: Does a specific query need a single entity or object?

A: If yes, then you can opt to use a dedicated Aggregate for the query. Generally speaking, Aggregates are usually fast, easier to implement and to maintain, but they might be not suitable for very complex queries that require to aggregate multiple event types. You can still use different projection types for different queries, and combine them together in your application.


Best Practices and Tips

Check the following guide regularly, as it is updated with tips to improve performance, or fix common issues.

When using Snapshot Projections, consider using the factcast-snapshotcache-redisson module, to store the snapshots in a Redis cluster, instead of the Event Store. This will reduce the load on the Event Store, and will allow to scale the snapshots cache independently.

3.2 - Hitchhiker's Guide To Testing

Introduction

An event-sourced application usually performs two kinds of interactions with the FactCast server:

  • It subscribes to facts and builds up use-case specific views of the received data. These use-case specific views are called projections.
  • It publishes new facts to the event log.

Building up projections works on both APIs, low-level and Factus. However, to simplify development, the high-level Factus API has explicit support for this concept.

Unit Tests

Projections are best tested in isolation, ideally at the unit test level. In the end, they are classes receiving facts and updating some internal state. However, as soon as the projection’s state is externalized (e.g. see here) this test approach can get challenging.

Integration Tests

Integration tests check the interaction of more than one component. Here, we’re looking at integration tests that validate the correct behaviour of a projection that uses an external data store like a Postgres database.

Be aware that FactCast integration tests as shown below can start up real infrastructure via Docker. For this reason, they usually perform significantly slower than unit tests.


Testing FactCast (low-level)

This section introduces the UserEmails projection for which we will write

  • unit tests and
  • integration tests.

For interaction with FactCast we are using the low-level API.

The User Emails Projection

Imagine our application needs a set of user emails currently in use in the system. To provide this information, we identified these facts which contain the relevant data:

  • UserAdded
  • UserRemoved

The user UserAdded fact contains a user ID and the email address. UserRemoved only carries the user ID to remove.

Here is a possible projection using the FactCast low-level API:

@Slf4j
public class UserEmailsProjection {

    private final Map<UUID, String> userEmails = new HashMap<>();

    @NonNull
    public Set<String> getUserEmails() {
        return new HashSet<>(userEmails.values());
    }

    public void apply(Fact fact) {
        switch (fact.type()) {
            case "UserAdded":
                handleUserAdded(fact);
                break;
            case "UserRemoved":
                handleUserRemoved(fact);
                break;
            default:
                log.error("Fact type {} not supported", fact.type());
                break;
        }
    }

    @VisibleForTesting
    void handleUserAdded(Fact fact) {
        JsonNode payload = parsePayload(fact);
        userEmails.put(extractIdFrom(payload), extractEmailFrom(payload));
    }

    @VisibleForTesting
    void handleUserRemoved(Fact fact) {
        JsonNode payload = parsePayload(fact);
        userEmails.remove(extractIdFrom(payload));
    }

    // helper methods:

    @SneakyThrows
    private JsonNode parsePayload(Fact fact) {
        return FactCastJson.readTree(fact.jsonPayload());
    }

    private UUID extractIdFrom(JsonNode payload) {
        return UUID.fromString(payload.get("id").asText());
    }

    private String extractEmailFrom(JsonNode payload) {
        return payload.get("email").asText();
    }
}

The method apply acts as an entry point for the projection and dispatches the received Fact to the appropriate handling behavior. There, the Fact object’s JSON payload is parsed using the Jackson library and the projection’s data (the userEmails map), is updated accordingly.

Note, that we chose to avoid using a raw ObjectMapper here, but instead use the helper class FactCastJson as it contains a pre-configured ObjectMapper.

To query the projection for the user emails, the getUserEmails() method returns the values of our internal userEmails map’s values copied to a new Set.

Unit Tests

Unit testing this projection is very easier, as there are no external dependencies. We use Fact objects as input and check the customized view of the internal map.

Let’s look at an example for the UserAdded fact:

@Test
void whenHandlingUserAddedFactEmailIsAdded() {
    // arrange
    String jsonPayload = String.format(
        "{\"id\":\"%s\", \"email\": \"%s\"}",
        UUID.randomUUID(),
        "user@bar.com");
    Fact userAdded = Fact.builder()
        .id(UUID.randomUUID())
        .ns("user")
        .type("UserAdded")
        .version(1)
        .build(jsonPayload);

    // act
    uut.handleUserAdded(userAdded);

    // assert
    Set<String> emails = uut.getUserEmails();
    assertThat(emails).hasSize(1).containsExactly("user@bar.com");
}

Note the use of the convenient builder the Fact class is providing.

Since the focus of this unit test is on handleUserAdded, we execute the method directly. The full unit test also contains a test for the dispatching logic of the apply method, as well as a similar test for the handleUserRemoved method.

Checking your projection’s logic should preferably be done with unit tests in the first place, even though you might also want to add an integration test to prove it to work in conjunction with its collaborators.

Integration Tests

FactCast provides a Junit5 extension which starts a FactCast server pre-configured for testing plus its Postgres database via the excellent testcontainers library and resets their state between test executions.

Preparation

Before writing your first integration test

  • make sure Docker is installed and running on your machine
  • add the factcast-test module to your pom.xml:

<dependency>
    <groupId>org.factcast</groupId>
    <artifactId>factcast-test</artifactId>
    <version>${factcast.version}</version>
    <scope>test</scope>
</dependency>
  • to allow TLS free authentication between our test code and the local FactCast server, create an application.properties file in the project’s resources directory with the following content:
grpc.client.factstore.negotiationType=PLAINTEXT

This will make the client application connect to the server without using TLS.

Writing The Integration Test

Our integration test builds upon the previous unit test example. This time however, we want to check if the UserEmailsProjection can also be updated by a real FactCast server:

@SpringBootTest
@ExtendWith(FactCastExtension.class)
class UserEmailsProjectionITest {

    @Autowired FactCast factCast;

    private final UserEmailsProjection uut = new UserEmailsProjection();

    private class FactObserverImpl implements FactObserver {

        @Override
        public void onNext(@NonNull Fact fact) {
            uut.apply(fact);
        }
    }

    @Test
    void projectionHandlesUserAddedFact() {
        UUID userId = UUID.randomUUID();
        Fact userAdded = Fact.builder()
            .id(UUID.randomUUID())
            .ns("user")
            .type("UserAdded")
            .version(1)
            .build(String.format(
                "{\"id\":\"%s\", \"email\": \"%s\"}",
                userId,
                "user@bar.com"));

        factCast.publish(userAdded);

        SubscriptionRequest subscriptionRequest = SubscriptionRequest
            .catchup(FactSpec.ns("user").type("UserAdded"))
            .or(FactSpec.ns("user").type("UserRemoved"))
            .fromScratch();

        factCast.subscribe(subscriptionRequest, new FactObserverImpl()).awaitComplete();

        Set<String> userEmails = uut.getUserEmails();
        assertThat(userEmails).hasSize(1).containsExactly("user@bar.com");
  }
  //...

The previously mentioned FactCastExtension starts the FactCast server and the Postgres database once before the first test is executed. Between the tests, the extension wipes all old facts from the FactCast server so that you are guaranteed to always start from scratch.

Once a fact is received, FactCast invokes the onNext method of the FactObserverImpl, which delegates to the apply method of the UserEmailsProjection.

For details of the FactCast low-level API please refer to the API documentation.

Testing with Factus

Factus builds up on the low-level FactCast API and provides a higher level of abstraction. To see Factus in action we use the same scenario as before, an UserEmailsProjection which we will ask for a set of user emails.

These are the events we need to handle:

The UserAdded event contains two properties, the user ID and the email whereas UserRemoved only contains the user ID.

An Example Event

To get an idea of how the events are defined, let’s have a look inside UserAdded:

@Getter
@Specification(ns = "user", type = "UserAdded", version = 1)
public class UserAdded implements EventObject {

    private UUID userId;
    private String email;

    // used by Jackson deserializer
    protected UserAdded(){}

    public static UserAdded of(UUID userId, String email) {
        UserAdded fact = new UserAdded();
        fact.userId = userId;
        fact.email = email;
        return fact;
    }

    @Override
    public Set<UUID> aggregateIds() {
        return Collections.emptySet();
      }
}

We create a Factus compatible event by implementing the EventObject interface and supplying the fact details via the @Specification annotation. The event itself contains the properties userId and email which are simply fields of the UserAdded class. The protected no-args constructor is used by Jackson when deserializing from JSON back to a POJO. The of factory method is used by application- and test code to create an UserAdded event. For more details on how to define a Factus event read on here.

The User Emails Projection

Now that we know which events to handle, we can process them in the Factus based UserEmailsProjection:

public class UserEmailsProjection extends LocalManagedProjection {

    private final Map<UUID, String> userEmails = new HashMap<>();

    public Set<String> getEmails() {
        return new HashSet<>(userEmails.values());
    }

    @Handler
    void apply(UserAdded event) {
        userEmails.put(event.getUserId(), event.getEmail());
    }

    @Handler
    void apply(UserRemoved event) {
        userEmails.remove(event.getUserId());
    }
}

You will instantly notice how short this implementation is compared to the UserEmailsProjection class of the low-level API example before. No dispatching or explicit JSON parsing is needed. Instead, the event handler methods each receive their event as plain Java POJO which is ready to use.

As projection type we decided for a LocalManagedProjection which is intended for self-managed, in-memory use cases. See here for detailed reading on the various Factus supported projection types.

Unit Tests

The unit test for this projection tests each handler method individually. As an example, here is the test for the UserAdded event handler:

@Test
void whenHandlingUserAddedEventEmailIsAdded() {
    UUID someUserId = UUID.randomUUID();

    UserEmailsProjection uut = new UserEmailsProjection();
    uut.apply(UserAdded.of(someUserId, "foo@bar.com"));

    Set<String> emails = uut.getEmails();
    assertThat(emails).hasSize(1).containsExactly("foo@bar.com");
}

First we create a userAddedEvent which we then apply to the responsible handler method of the UserEmailsProjection class. To check the result, we fetch the Set of emails and, as last part, examine the content.

Integration Test

After we have covered each handler method with detailed tests on unit level, we also want an integration test to test against a real FactCast server.

Here is an example:

@SpringBootTest
@ExtendWith(FactCastExtension.class)
public class UserEmailsProjectionITest {

    @Autowired Factus factus;

    @Test
    void projectionHandlesUserAddedEvent() {
        UserAdded userAdded = UserAdded.of(UUID.randomUUID(), "user@bar.com");
        factus.publish(userAdded);

        UserEmailsProjection uut = new UserEmailsProjection();
        factus.update(uut);

        Set<String> emails = uut.getEmails();
        assertThat(emails).hasSize(1).containsExactly("user@bar.com");
    }
    //...

The annotations of the test class are identical to the integration test shown for the low-level API. Hence, we only introduce them quickly here:

  • @SpringBootTest
    • starts a Spring container to enable dependency injection of the factus Spring bean
  • @ExtendWith(FactCastExtension.class)
    • starts a FactCast and its Postgres database in the background
    • erases old events inside FactCast before each test

The test itself first creates a UserAdded event which is then published to FactCast. Compared to the low-level integration test, the “act” part is slim and shows the power of the Factus API: The call to factus.update(...) builds a subscription request for all the handled events of the UserEmailsProjection class. The events returned from FactCast are then automatically applied to the correct handler.

The test concludes by checking if the state of the UserEmailsProjection was updated as correctly.

Full Example Code

The code for all examples introduced here can be found here.