This the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Usage (low-level)

This section will walk you through how to use FactCast from an application programmers perspective.

Please note, that this is a low-level API. If you’re a Java Programmer and want to use a higher-level API or explore what you can do with FactCast in a more approachable way first, you should have a look at the Factus section.

1 - Java

This section will walk you through how to use FactCast from an application programmers perspective.

Please note, that this is a low-level API. If you’re a Java Programmer and want to use a higher-level API or explore what you can do with FactCast in a more approachable way first, you should have a look at the Factus section.

1.1 - Java GRPC Producer

FactCast.publish(List<Fact> factsToPublish)

In order to produce Facts, you need to create Fact Instances and publish them via FactCast.publish().

On method return, the publishing of all the Facts passed to FactCast.publish() as a parameter are expected to have been written to PostgreSQL successfully. The order of the Facts is preserved while inserting into the database. All the inserts are done in a transactional context, so that the atomicity is preserved.

If the method returns exceptionally, of the process is killed or interrupted in any way, you cannot know if the Facts have been successfully written. In that case, just repeat the call: if the write had gone through you’ll get and exception complaining about duplicate IDs, if not – you may have a chance to succeed now.

FactCast.publish(Fact toPublish)

acts the same way, as the List counterparts above, just for a List of one Fact.

Example Code

Here is some example code assuming you use the Spring GRPC Client:

@Component
class Foo{
 @Autowired
 FactCast fc;

 public void someMethod(){
   fc.publish( new SomethingHappenedFact() );
 }
}

1.2 - Java GRPC Consumer

As mentioned before, there are three main Use-Cases for subscribing to a Fact-Stream:

  • Validation of Changes against a strictly consistent Model (Catchup)
  • Creating and maintaining a Read-Model (Follow)
  • Managing volatile cached data (Ephemeral)

Here is some example code assuming you use the Spring GRPC Client:

Example Code: Catchup

@Component
class CustomerRepository{
 @Autowired
 FactCast factCast;

 // oversimplified code !
 public Customer getCustomer(UUID customerId){
   // match all Facts currently published about that customer
   SubscriptionRequest req = SubscriptionRequest.catchup(FactSpec.ns("myapp").aggId(customerId)).fromScratch();

   Customer customer = new Customer(id);
   // stream all these Facts to the customer object's handle method, and wait until the stream ends.
   factCast.subscribe(req, customer::handle ).awaitComplete();

   // the customer object should now be in its latest state, and ready for command validation
   return customer;
 }

}

class Customer {
  Money balance = new Money(0); // starting with no money.
  //...
  public void handle(Fact f){
    // apply Fact, so that the customer earns and spend some money...
  }
}

Example Code: Follow

@Component
class QueryOptimizedView {
 @Autowired
 FactCast factCast;

 @PostConstruct
 public void init(){

   UUID lastFactProcessed = persistentModel.getLastFactProcessed();

   // subscribe to all customer related changes.
   SubscriptionRequest req = SubscriptionRequest
      .follow(type("CustomerCreated"))
          .or(type("CustomerDeleted"))
          .or(type("CustomerDeposition"))
          .or(type("PurchaseCompleted"))
      .from(lastFactProcessed);

   factCast.subscribe(req, this::handle );
 }

 private FactSpec type(String type){
   return FactSpec.ns("myapp").type(type);
 }

 @Transactional
 public void handle(Fact f){
    // apply Fact, to the persistent Model
    // ...
    persistentModel.setLastFactProcessed(f.id());
 }

Example Code: Ephemeral

@Component
class CustomerCache {
 @Autowired
 FactCast factCast;

 Map<UUID,Customer> customerCache = new HashMap<>();

 @PostConstruct
 public void init(){
   // subscribe to all customer related changes.
   SubscriptionRequest req = SubscriptionRequest.
      .follow(type("CustomerCreated"))
          .or(type("CustomerDeleted"))
          .or(type("CustomerDeposition"))
          .or(type("PurchaseCompleted"))
      .fromNowOn();

   factCast.subscribe(req, this::handle );
 }

 private FactSpec type(String type){
  return FactSpec.ns("myapp").type(type);
 }

 @Transactional
 public void handle(Fact f){
    // if anything has changed, invalidate the cached value.
    // ...
    Set<UUID> aggregateIds = f.aggId();
    aggregateIds.forEach(customerCache::remove);
 }

1.3 - Java Optimistic Locking

Motivation

Whatever your particular way of modelling your software, in order to be able to enforce invariants in your aggregates, you’d need to coordinate writes to it. In simple monoliths, you do that by synchronizing write access to the aggregate. When Software-Systems become distributed (or at least replicated), this coordination obviously needs to be externalized.

Pessimistic Locking

While pessimistic locking makes sure every change is strictly serializable, it has obvious drawbacks in terms of throughput and complexity (timeouts) as well as the danger of deadlock, when the scope of the lock expands to more than one aggregate. This is why we chose to implement a means of optimistic locking in FactCast.

Optimistic Locking

In general, the idea of optimistic locking is to make a change and before publishing it, make sure there was no potentially contradicting change in between. If there was, the process can safely be retried, as there was nothing published yet.

Transferred to FactCast, this means to express a body of code that:

  1. creates an understanding of the published state of the aggregates in question
  2. invokes its business logic according to that state
  3. creates the effects: either fails (if business logic decides to do so), or publishes new Fact(s)
  4. rechecks, if the state recorded in 1. is still unchanged and then
  5. either publishes the prepared Facts or retries by going back to 1.

Usage

a simple example

This code checks if an account with id newAccountId already exists, and if not - creates it by publishing the Fact accordingly.

factcast.lock("myBankNamespace")
        .on(newAccountId)
        .attempt(() -> {
            // check and maybe abort
            if (repo.findById(newAccountId) !=null)
                return Attempt.abort("Already exists.");
            else
              return Attempt.publish(
                Fact.builder()
                .ns("myBankNamespace")
                .type("AccountCreated")
                .aggId(newAccountId)
                .build("{...}")
              );
        });

You may probably guess what happens, remembering the above steps. Let’s dive into details with a more complex scenario.

a complete example

The unavoidable imaginary example, of two BankAccounts and a money transfer between them:

factcast.lock("myBankNamespace")
        .on(sourceAccountId,targetAccountId)
        .optimistic()            // this is optional, defaults to optimistic, currently the only mode supported
        .retry(100)              // this is optional, default is 10 times
        .interval(5)             // this is optional, default is no wait interval between attempts (equals to 0)
        .attempt(() -> {

            // fetch the latest state
            Account source = repo.findById(sourceAccountId);
            Account target = repo.findById(targetAccountId);

            // run businesslogic on it
            if (source.amount() < amountToTransfer)
                return Attempt.abort("Insufficient funds.");

            if (target.isClosed())
                return Attempt.abort("Target account is closed");

            // everything looks fine, create the Fact to be published
            Fact toPublish = Fact.builder()
                .ns("myBankNamespace")
                .type("transfer")
                .aggId(sourceAccountId)
                .aggId(targetAccountId)
                .build("{...}");

            // register for publishing
            return Attempt.publish(toPublish).andThen(()->{

                // this is only executed at max once, and only if publishing succeeded
                log.info("Money was transferred.");

            });
        });

Explanation

First, you tell factcast to record a state according to all events that have either sourceAccountId or targetAccountId in their list of aggIds and are on namespace myBankNamespace. While the namespace is not strictly necessary, it is encouraged to use it - but it depends on your decision on how to use namespaces and group Facts within them.

The number of retries is set to 100 here (default is 10, which for many systems is an acceptable default). In essence this means, that the attempt will be executed at max 100 times, before factcast gives up and throws an OptimisticRetriesExceededException which extends ConcurrentModificationException.

If interval is not set, it defaults to 0 with the effect, that the code passed into attempt is continuously retried without any pause until it either aborts, succeeds, or the max number of retries was hit (see above). Setting it to 5 means, that before retrying, a 5 msec wait happens.

Everything starts with passing a lambda to the attempt method. The lambda is of type

@FunctionalInterface
public interface Attempt {
    IntermediatePublishResult call() throws AttemptAbortedException;
    //...
}

so that it has to return an instance of IntermediatePublishResult. The only way to create such an instance are static methods on the same interface (abort, publish, …) in order to make it obvious. This lambda now is called according to the logic above.

Inside the lambda, you’d want to check the current state using the very latest facts from factcast (repo.findById(...)) and then check your business constraints on it (if (source.amount() < amountToTransfer)…). If the constraints do not hold, you may choose to abort the Attempt and thus abort the process. In this case, the attempt will not be retried.

On the other hand, if you choose to publish new facts using Attempt.publish(...), the state will be checked and the Fact(s) will be published if there was no change in between (otherwise a retry will be issued, see above). In the rare case, that you do not want to publish anything, you can return Attempt.withoutPublication() to accomplish this.

Optionally, you can pass a runnable using .andThen and schedule it for execution once, if and only if the publishing succeeded. Or in other words, this runnable is executed just once or never (in case of abort or OptimisticRetriesExceededException).

2 - JavaScript

This section will walk you through how to use FactCast from an application programmers perspective.

Please note, that this is a low-level API. If you’re a Java Programmer and want to use a higher-level API or explore what you can do with FactCast in a more approachable way first, you should have a look at the Factus section.

2.1 - nodeJS GRPC Producer

Producing Facts via nodeJS is very simple due to the available gRPC NPM Module. It will generate a stub constructor called RemoteFactStore from our proto file.

const uuidV4 = require("uuid/v4");
const grpc = require("grpc");
const protoDescriptor = grpc.load("./FactStore.proto");
const RemoteFactStore =
	protoDescriptor.org.factcast.grpc.api.gen.RemoteFactStore;

// store allows us to publish, subscribe and fetchById (see proto file)
const store = new RemoteFactStore(
	"localhost:9090",
	grpc.credentials.createInsecure()
);

store.publish(
	[
		{
			header: JSON.stringify({
				id: uuidV4(),
				ns: "myapp",
			}),
			payload: JSON.stringify({
				foo: Date.now(),
			}),
		},
	],
	(err, feature) => {
		if (err) {
			console.log(err);
		}
	}
);

See the Facts page for detailed information about all possible and required header fields.

2.2 - nodeJS GRPC Consumer

const grpc = require("grpc");
const protoDescriptor = grpc.load("./FactStore.proto");
const RemoteFactStore =
	protoDescriptor.org.factcast.grpc.api.gen.RemoteFactStore;

const store = new RemoteFactStore(
	"localhost:9090",
	grpc.credentials.createInsecure()
);

const subscription = store.subscribe({
	json: JSON.stringify({
		continuous: true,
		specs: [
			{
				ns: "myapp",
			},
		],
	}),
});

subscription.on("data", (fact) => {
	console.log(fact);
});

3 - CLI

3.1 - Factcast CLI

In order to help with quick testing or debugging, FactCast provides a very simple CLI that you can use to publish Facts or subscribe and print Facts received to stdout.

Usage

Once module factcast-grpc-cli is built, it provides a self-contained fc-cli.jar in its target folder. In order to use it, you can either run

java -jar path_to/fc-cli.jar <OPTIONS> <COMMAND> <COMMAND OPTIONS>

or just execute it as

path_to/fc-cli.jar <OPTIONS> <COMMAND> <COMMAND OPTIONS>

Help output at the time of writing is

Usage: fc-cli [options] [command] [command options]
  Options:
    --debug
      show debug-level debug messages
    --address
      the address to connect to
      Default: static://localhost:9090
    --basic, -basic
      Basic-Auth Crendentials in the form "user:password"
    --no-tls
      do NOT use TLS to connect (plaintext-communication)
    --pretty
      format JSON output
  Commands:
    catchup      Read all the matching facts up to now and exit.
      Usage: catchup [options]
        Options:
          -from
            start reading AFTER the fact with the given id
        * -ns
            the namespace filtered on

    follow      read all matching facts and keep connected while listening for
            new ones
      Usage: follow [options]
        Options:
          -from
            start reading AFTER the fact with the given id
          -fromNowOn
            read only future facts
        * -ns
            the namespace filtered on

    publish      publish a fact
      Usage: publish [options]
        Options:
        * --header, -h
            Filename of an existing json file to read the header from
        * --payload, -p
            Filename of an existing json file to read the payload from

    enumerateNamespaces      lists all namespaces in the factstore in no
            particular order
      Usage: enumerateNamespaces

    enumerateTypes      lists all types used with a namespace in no particular
            order
      Usage: enumerateTypes namespace

    serialOf      get the serial of a fact identified by id
      Usage: serialOf id

3.2 - Schema Registry CLI

This CLI provides a convenient way to create a suitable Schema Registry for your FactCast installation. It will give you the ability to validate events against examples and to make sure that there’s always an upcast and if necessary a downcast transformation.

It produces a human and a machine-readable output. You will have to use hugo in order to get a proper static website.

A working example can be found here.

Build the example

The example will be built during mvn install, but you can reach the same via

$ java -jar target/fc-schema-cli.jar build -p ../factcast-examples/factcast-example-schema-registry/src/main/resources

build validates and builds the example and also produces a output directory that contains the static website. Inside this folder run

$ hugo server

to get quick feedback or

$ hugo

in order to create the deployable schema registry (located at output/public).

About CI Pipelines and Artifacts

We propose to the following pipeline

Build -> Package -> Upload

Build:

  • runs the fc-schema-cli to build the registry
  • fails on wrong input/broken schema

Package:

  • runs $ hugo in order to produce the artifact

Upload:

  • uploads output/public to static file server (like S3)

Available commands and options

$ java -jar target/fc-schema-cli.jar -h

███████╗ █████╗  ██████╗████████╗ ██████╗ █████╗ ███████╗████████╗
██╔════╝██╔══██╗██╔════╝╚══██╔══╝██╔════╝██╔══██╗██╔════╝╚══██╔══╝
█████╗  ███████║██║        ██║   ██║     ███████║███████╗   ██║
██╔══╝  ██╔══██║██║        ██║   ██║     ██╔══██║╚════██║   ██║
██║     ██║  ██║╚██████╗   ██║   ╚██████╗██║  ██║███████║   ██║
╚═╝     ╚═╝  ╚═╝ ╚═════╝   ╚═╝    ╚═════╝╚═╝  ╚═╝╚══════╝   ╚═╝

Usage: fc-schema [-hV] [COMMAND]
Tool for working with the FactCast Schema Registry spec
  -h, --help      Show this help message and exit.
  -V, --version   Print version information and exit.
Commands:
  validate  Validate your current events
  build     Validates and builds your registry

3.3 - 3rd Party CLI

As an alternative to the Factcast CLI there is the Python based PyFactCast. It is still in early development, but you might want to check it out.