This the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Java

This section will walk you through how to use FactCast from an application programmers perspective.

Please note, that this is a low-level API. If you’re a Java Programmer and want to use a higher-level API or explore what you can do with FactCast in a more approachable way first, you should have a look at the Factus section.

1 - Java GRPC Producer

FactCast.publish(List<Fact> factsToPublish)

In order to produce Facts, you need to create Fact Instances and publish them via FactCast.publish().

On method return, the publishing of all the Facts passed to FactCast.publish() as a parameter are expected to have been written to PostgreSQL successfully. The order of the Facts is preserved while inserting into the database. All the inserts are done in a transactional context, so that the atomicity is preserved.

If the method returns exceptionally, of the process is killed or interrupted in any way, you cannot know if the Facts have been successfully written. In that case, just repeat the call: if the write had gone through you’ll get and exception complaining about duplicate IDs, if not – you may have a chance to succeed now.

FactCast.publish(Fact toPublish)

acts the same way, as the List counterparts above, just for a List of one Fact.

Example Code

Here is some example code assuming you use the Spring GRPC Client:

@Component
class Foo{
 @Autowired
 FactCast fc;

 public void someMethod(){
   fc.publish( new SomethingHappenedFact() );
 }
}

2 - Java GRPC Consumer

As mentioned before, there are three main Use-Cases for subscribing to a Fact-Stream:

  • Validation of Changes against a strictly consistent Model (Catchup)
  • Creating and maintaining a Read-Model (Follow)
  • Managing volatile cached data (Ephemeral)

Here is some example code assuming you use the Spring GRPC Client:

Example Code: Catchup

@Component
class CustomerRepository{
 @Autowired
 FactCast factCast;

 // oversimplified code !
 public Customer getCustomer(UUID customerId){
   // match all Facts currently published about that customer
   SubscriptionRequest req = SubscriptionRequest.catchup(FactSpec.ns("myapp").aggId(customerId)).fromScratch();

   Customer customer = new Customer(id);
   // stream all these Facts to the customer object's handle method, and wait until the stream ends.
   factCast.subscribe(req, customer::handle ).awaitComplete();

   // the customer object should now be in its latest state, and ready for command validation
   return customer;
 }

}

class Customer {
  Money balance = new Money(0); // starting with no money.
  //...
  public void handle(Fact f){
    // apply Fact, so that the customer earns and spend some money...
  }
}

Example Code: Follow

@Component
class QueryOptimizedView {
 @Autowired
 FactCast factCast;

 @PostConstruct
 public void init(){

   UUID lastFactProcessed = persistentModel.getLastFactProcessed();

   // subscribe to all customer related changes.
   SubscriptionRequest req = SubscriptionRequest
      .follow(type("CustomerCreated"))
          .or(type("CustomerDeleted"))
          .or(type("CustomerDeposition"))
          .or(type("PurchaseCompleted"))
      .from(lastFactProcessed);

   factCast.subscribe(req, this::handle );
 }

 private FactSpec type(String type){
   return FactSpec.ns("myapp").type(type);
 }

 @Transactional
 public void handle(Fact f){
    // apply Fact, to the persistent Model
    // ...
    persistentModel.setLastFactProcessed(f.id());
 }

Example Code: Ephemeral

@Component
class CustomerCache {
 @Autowired
 FactCast factCast;

 Map<UUID,Customer> customerCache = new HashMap<>();

 @PostConstruct
 public void init(){
   // subscribe to all customer related changes.
   SubscriptionRequest req = SubscriptionRequest.
      .follow(type("CustomerCreated"))
          .or(type("CustomerDeleted"))
          .or(type("CustomerDeposition"))
          .or(type("PurchaseCompleted"))
      .fromNowOn();

   factCast.subscribe(req, this::handle );
 }

 private FactSpec type(String type){
  return FactSpec.ns("myapp").type(type);
 }

 @Transactional
 public void handle(Fact f){
    // if anything has changed, invalidate the cached value.
    // ...
    Set<UUID> aggregateIds = f.aggId();
    aggregateIds.forEach(customerCache::remove);
 }

3 - Java Optimistic Locking

Motivation

Whatever your particular way of modelling your software, in order to be able to enforce invariants in your aggregates, you’d need to coordinate writes to it. In simple monoliths, you do that by synchronizing write access to the aggregate. When Software-Systems become distributed (or at least replicated), this coordination obviously needs to be externalized.

Pessimistic Locking

While pessimistic locking makes sure every change is strictly serializable, it has obvious drawbacks in terms of throughput and complexity (timeouts) as well as the danger of deadlock, when the scope of the lock expands to more than one aggregate. This is why we chose to implement a means of optimistic locking in FactCast.

Optimistic Locking

In general, the idea of optimistic locking is to make a change and before publishing it, make sure there was no potentially contradicting change in between. If there was, the process can safely be retried, as there was nothing published yet.

Transferred to FactCast, this means to express a body of code that:

  1. creates an understanding of the published state of the aggregates in question
  2. invokes its business logic according to that state
  3. creates the effects: either fails (if business logic decides to do so), or publishes new Fact(s)
  4. rechecks, if the state recorded in 1. is still unchanged and then
  5. either publishes the prepared Facts or retries by going back to 1.

Usage

a simple example

This code checks if an account with id newAccountId already exists, and if not - creates it by publishing the Fact accordingly.

factcast.lock("myBankNamespace")
        .on(newAccountId)
        .attempt(() -> {
            // check and maybe abort
            if (repo.findById(newAccountId) !=null)
                return Attempt.abort("Already exists.");
            else
              return Attempt.publish(
                Fact.builder()
                .ns("myBankNamespace")
                .type("AccountCreated")
                .aggId(newAccountId)
                .build("{...}")
              );
        });

You may probably guess what happens, remembering the above steps. Let’s dive into details with a more complex scenario.

a complete example

The unavoidable imaginary example, of two BankAccounts and a money transfer between them:

factcast.lock("myBankNamespace")
        .on(sourceAccountId,targetAccountId)
        .optimistic()            // this is optional, defaults to optimistic, currently the only mode supported
        .retry(100)              // this is optional, default is 10 times
        .interval(5)             // this is optional, default is no wait interval between attempts (equals to 0)
        .attempt(() -> {

            // fetch the latest state
            Account source = repo.findById(sourceAccountId);
            Account target = repo.findById(targetAccountId);

            // run businesslogic on it
            if (source.amount() < amountToTransfer)
                return Attempt.abort("Insufficient funds.");

            if (target.isClosed())
                return Attempt.abort("Target account is closed");

            // everything looks fine, create the Fact to be published
            Fact toPublish = Fact.builder()
                .ns("myBankNamespace")
                .type("transfer")
                .aggId(sourceAccountId)
                .aggId(targetAccountId)
                .build("{...}");

            // register for publishing
            return Attempt.publish(toPublish).andThen(()->{

                // this is only executed at max once, and only if publishing succeeded
                log.info("Money was transferred.");

            });
        });

Explanation

First, you tell factcast to record a state according to all events that have either sourceAccountId or targetAccountId in their list of aggIds and are on namespace myBankNamespace. While the namespace is not strictly necessary, it is encouraged to use it - but it depends on your decision on how to use namespaces and group Facts within them.

The number of retries is set to 100 here (default is 10, which for many systems is an acceptable default). In essence this means, that the attempt will be executed at max 100 times, before factcast gives up and throws an OptimisticRetriesExceededException which extends ConcurrentModificationException.

If interval is not set, it defaults to 0 with the effect, that the code passed into attempt is continuously retried without any pause until it either aborts, succeeds, or the max number of retries was hit (see above). Setting it to 5 means, that before retrying, a 5 msec wait happens.

Everything starts with passing a lambda to the attempt method. The lambda is of type

@FunctionalInterface
public interface Attempt {
    IntermediatePublishResult call() throws AttemptAbortedException;
    //...
}

so that it has to return an instance of IntermediatePublishResult. The only way to create such an instance are static methods on the same interface (abort, publish, …) in order to make it obvious. This lambda now is called according to the logic above.

Inside the lambda, you’d want to check the current state using the very latest facts from factcast (repo.findById(...)) and then check your business constraints on it (if (source.amount() < amountToTransfer)…). If the constraints do not hold, you may choose to abort the Attempt and thus abort the process. In this case, the attempt will not be retried.

On the other hand, if you choose to publish new facts using Attempt.publish(...), the state will be checked and the Fact(s) will be published if there was no change in between (otherwise a retry will be issued, see above). In the rare case, that you do not want to publish anything, you can return Attempt.withoutPublication() to accomplish this.

Optionally, you can pass a runnable using .andThen and schedule it for execution once, if and only if the publishing succeeded. Or in other words, this runnable is executed just once or never (in case of abort or OptimisticRetriesExceededException).