1 - Java
This section will walk you through how to use FactCast from an application programmers perspective.
Please note, that this is a low-level API. If you’re a Java Programmer and want to use a higher-level API or explore what you can do with FactCast in a more approachable way first, you should have a look at the Factus section.
1.1 - Java GRPC Producer
FactCast.publish(List<Fact> factsToPublish)
In order to produce Facts, you need to create Fact Instances and publish them via FactCast.publish()
.
On method return, the publishing of all the Facts passed to FactCast.publish()
as a parameter are expected to have been written to PostgreSQL successfully. The order of the Facts is preserved while inserting into the database. All the inserts are done in a transactional context, so that the atomicity is preserved.
If the method returns exceptionally, of the process is killed or interrupted in any way, you cannot know if the Facts have been successfully written. In that case, just repeat the call: if the write had gone through you’ll get and exception complaining about duplicate IDs, if not – you may have a chance to succeed now.
FactCast.publish(Fact toPublish)
acts the same way, as the List counterparts above, just for a List of one Fact.
Example Code
Here is some example code assuming you use the Spring GRPC Client:
@Component
class Foo{
@Autowired
FactCast fc;
public void someMethod(){
fc.publish( new SomethingHappenedFact() );
}
}
1.2 - Java GRPC Consumer
As mentioned before, there are three main Use-Cases for subscribing to a Fact-Stream:
- Validation of Changes against a strictly consistent Model (Catchup)
- Creating and maintaining a Read-Model (Follow)
- Managing volatile cached data (Ephemeral)
Here is some example code assuming you use the Spring GRPC Client:
Example Code: Catchup
@Component
class CustomerRepository{
@Autowired
FactCast factCast;
// oversimplified code !
public Customer getCustomer(UUID customerId){
// match all Facts currently published about that customer
SubscriptionRequest req = SubscriptionRequest.catchup(FactSpec.ns("myapp").aggId(customerId)).fromScratch();
Customer customer = new Customer(id);
// stream all these Facts to the customer object's handle method, and wait until the stream ends.
factCast.subscribe(req, customer::handle ).awaitComplete();
// the customer object should now be in its latest state, and ready for command validation
return customer;
}
}
class Customer {
Money balance = new Money(0); // starting with no money.
//...
public void handle(Fact f){
// apply Fact, so that the customer earns and spend some money...
}
}
Example Code: Follow
@Component
class QueryOptimizedView {
@Autowired
FactCast factCast;
@PostConstruct
public void init(){
UUID lastFactProcessed = persistentModel.getLastFactProcessed();
// subscribe to all customer related changes.
SubscriptionRequest req = SubscriptionRequest
.follow(type("CustomerCreated"))
.or(type("CustomerDeleted"))
.or(type("CustomerDeposition"))
.or(type("PurchaseCompleted"))
.from(lastFactProcessed);
factCast.subscribe(req, this::handle );
}
private FactSpec type(String type){
return FactSpec.ns("myapp").type(type);
}
@Transactional
public void handle(Fact f){
// apply Fact, to the persistent Model
// ...
persistentModel.setLastFactProcessed(f.id());
}
Example Code: Ephemeral
@Component
class CustomerCache {
@Autowired
FactCast factCast;
Map<UUID,Customer> customerCache = new HashMap<>();
@PostConstruct
public void init(){
// subscribe to all customer related changes.
SubscriptionRequest req = SubscriptionRequest.
.follow(type("CustomerCreated"))
.or(type("CustomerDeleted"))
.or(type("CustomerDeposition"))
.or(type("PurchaseCompleted"))
.fromNowOn();
factCast.subscribe(req, this::handle );
}
private FactSpec type(String type){
return FactSpec.ns("myapp").type(type);
}
@Transactional
public void handle(Fact f){
// if anything has changed, invalidate the cached value.
// ...
Set<UUID> aggregateIds = f.aggId();
aggregateIds.forEach(customerCache::remove);
}
1.3 - Java Optimistic Locking
Motivation
Whatever your particular way of modelling your software, in order to be able to enforce invariants in your aggregates, you’d need to coordinate writes to it. In simple monoliths, you do that by synchronizing write access to the aggregate. When Software-Systems become distributed (or at least replicated), this coordination obviously needs to be externalized.
Pessimistic Locking
While pessimistic locking makes sure every change is strictly serializable, it has obvious drawbacks in terms of throughput and complexity (timeouts) as well as the danger of deadlock, when the scope of the lock expands to more than one aggregate. This is why we chose to implement a means of optimistic locking in FactCast.
Optimistic Locking
In general, the idea of optimistic locking is to make a change and before publishing it, make sure there was no potentially contradicting change in between. If there was, the process can safely be retried, as there was nothing published yet.
Transferred to FactCast, this means to express a body of code that:
- creates an understanding of the published state of the aggregates in question
- invokes its business logic according to that state
- creates the effects: either fails (if business logic decides to do so), or publishes new Fact(s)
- rechecks, if the state recorded in 1. is still unchanged and then
- either publishes the prepared Facts or retries by going back to 1.
Usage
a simple example
This code checks if an account with id newAccountId already exists, and if not - creates it by publishing the Fact accordingly.
factcast.lock("myBankNamespace")
.on(newAccountId)
.attempt(() -> {
// check and maybe abort
if (repo.findById(newAccountId) !=null)
return Attempt.abort("Already exists.");
else
return Attempt.publish(
Fact.builder()
.ns("myBankNamespace")
.type("AccountCreated")
.aggId(newAccountId)
.build("{...}")
);
});
You may probably guess what happens, remembering the above steps. Let’s dive into details with a more complex scenario.
a complete example
The unavoidable imaginary example, of two BankAccounts and a money transfer between them:
factcast.lock("myBankNamespace")
.on(sourceAccountId,targetAccountId)
.optimistic() // this is optional, defaults to optimistic, currently the only mode supported
.retry(100) // this is optional, default is 10 times
.interval(5) // this is optional, default is no wait interval between attempts (equals to 0)
.attempt(() -> {
// fetch the latest state
Account source = repo.findById(sourceAccountId);
Account target = repo.findById(targetAccountId);
// run businesslogic on it
if (source.amount() < amountToTransfer)
return Attempt.abort("Insufficient funds.");
if (target.isClosed())
return Attempt.abort("Target account is closed");
// everything looks fine, create the Fact to be published
Fact toPublish = Fact.builder()
.ns("myBankNamespace")
.type("transfer")
.aggId(sourceAccountId)
.aggId(targetAccountId)
.build("{...}");
// register for publishing
return Attempt.publish(toPublish).andThen(()->{
// this is only executed at max once, and only if publishing succeeded
log.info("Money was transferred.");
});
});
Explanation
First, you tell factcast to record a state according to all events that have either sourceAccountId or targetAccountId in their list of aggIds and are on namespace myBankNamespace. While the namespace is not strictly necessary, it is encouraged to use it - but it depends on your decision on how to use namespaces and group Facts within them.
The number of retries is set to 100 here (default is 10, which for many systems is an acceptable default). In essence this means, that the attempt will be executed at max 100 times, before factcast gives up and throws an OptimisticRetriesExceededException
which extends ConcurrentModificationException
.
If interval is not set, it defaults to 0 with the effect, that the code passed into attempt is continuously retried without any pause until it either aborts, succeeds, or the max number of retries was hit (see above).
Setting it to 5 means, that before retrying, a 5 msec wait happens.
WARNING: Setting interval to non-zero makes your code block a thread. The above combination of 100 retries with a 5 msec interval means, that - at worst - your code could block longer than half a second.
Everything starts with passing a lambda to the attempt method. The lambda is of type
@FunctionalInterface
public interface Attempt {
IntermediatePublishResult call() throws AttemptAbortedException;
//...
}
so that it has to return an instance of IntermediatePublishResult
. The only way to create such an instance are static methods on the same interface (abort
, publish
, …) in order to make it obvious.
This lambda now is called according to the logic above.
Inside the lambda, you’d want to check the current state using the very latest facts from factcast (repo.findById(...)
) and then check your business constraints on it (if (source.amount() < amountToTransfer)
…).
If the constraints do not hold, you may choose to abort the Attempt and thus abort the process. In this case, the attempt will not be retried.
On the other hand, if you choose to publish new facts using Attempt.publish(...)
, the state will be checked and the Fact(s) will be published if there was no change in between (otherwise a retry will be issued, see above).
In the rare case, that you do not want to publish anything, you can return Attempt.withoutPublication()
to accomplish this.
Optionally, you can pass a runnable using .andThen
and schedule it for execution once, if and only if the publishing succeeded. Or in other words, this runnable is executed just once or never (in case of abort or OptimisticRetriesExceededException).
2 - JavaScript
This section will walk you through how to use FactCast from an application programmers perspective.
Please note, that this is a low-level API. If you’re a Java Programmer and want to use a higher-level API or explore what you can do with FactCast in a more approachable way first, you should have a look at the Factus section.
2.1 - nodeJS GRPC Producer
Producing Facts via nodeJS is very simple due to the available
gRPC NPM Module. It will generate a stub constructor called RemoteFactStore
from our proto file.
const uuidV4 = require("uuid/v4");
const grpc = require("grpc");
const protoDescriptor = grpc.load("./FactStore.proto");
const RemoteFactStore =
protoDescriptor.org.factcast.grpc.api.gen.RemoteFactStore;
// store allows us to publish, subscribe and fetchById (see proto file)
const store = new RemoteFactStore(
"localhost:9090",
grpc.credentials.createInsecure()
);
store.publish(
[
{
header: JSON.stringify({
id: uuidV4(),
ns: "myapp",
}),
payload: JSON.stringify({
foo: Date.now(),
}),
},
],
(err, feature) => {
if (err) {
console.log(err);
}
}
);
See the
Facts page for detailed information about all possible and required header fields.
2.2 - nodeJS GRPC Consumer
const grpc = require("grpc");
const protoDescriptor = grpc.load("./FactStore.proto");
const RemoteFactStore =
protoDescriptor.org.factcast.grpc.api.gen.RemoteFactStore;
const store = new RemoteFactStore(
"localhost:9090",
grpc.credentials.createInsecure()
);
const subscription = store.subscribe({
json: JSON.stringify({
continuous: true,
specs: [
{
ns: "myapp",
},
],
}),
});
subscription.on("data", (fact) => {
console.log(fact);
});
3.1 - Factcast CLI
In order to help with quick testing or debugging, FactCast provides a very simple CLI that you can use to publish Facts or subscribe and print Facts received to stdout.
Usage
Once module factcast-grpc-cli is built, it provides a self-contained fc-cli.jar in its target folder. In order to use it, you can either run
java -jar path_to/fc-cli.jar <OPTIONS> <COMMAND> <COMMAND OPTIONS>
or just execute it as
path_to/fc-cli.jar <OPTIONS> <COMMAND> <COMMAND OPTIONS>
Help output at the time of writing is
Usage: fc-cli [options] [command] [command options]
Options:
--debug
show debug-level debug messages
--address
the address to connect to
Default: static://localhost:9090
--basic, -basic
Basic-Auth Crendentials in the form "user:password"
--no-tls
do NOT use TLS to connect (plaintext-communication)
--pretty
format JSON output
Commands:
catchup Read all the matching facts up to now and exit.
Usage: catchup [options]
Options:
-from
start reading AFTER the fact with the given id
* -ns
the namespace filtered on
follow read all matching facts and keep connected while listening for
new ones
Usage: follow [options]
Options:
-from
start reading AFTER the fact with the given id
-fromNowOn
read only future facts
* -ns
the namespace filtered on
publish publish a fact
Usage: publish [options]
Options:
* --header, -h
Filename of an existing json file to read the header from
* --payload, -p
Filename of an existing json file to read the payload from
enumerateNamespaces lists all namespaces in the factstore in no
particular order
Usage: enumerateNamespaces
enumerateTypes lists all types used with a namespace in no particular
order
Usage: enumerateTypes namespace
serialOf get the serial of a fact identified by id
Usage: serialOf id
3.2 - Schema Registry CLI
This CLI provides a convenient way to create a suitable Schema Registry for your FactCast installation.
It will give you the ability to validate events against examples and to make sure that there’s always an upcast and if
necessary a downcast transformation.
It produces a human and a machine-readable output. You will have to use hugo in order to get a
proper static website.
A working example can be found here.
Build the example
The example will be built during mvn install
, but you can reach the same via
$ java -jar target/fc-schema-cli.jar build -p ../factcast-examples/factcast-example-schema-registry/src/main/resources
build
validates and builds the example and also produces a output
directory that contains the static website. Inside this folder run
$ hugo server
to get quick feedback or
$ hugo
in order to create the deployable schema registry (located at output/public
).
About CI Pipelines and Artifacts
We propose to the following pipeline
Build -> Package -> Upload
Build:
- runs the fc-schema-cli to build the registry
- fails on wrong input/broken schema
Package:
- runs
$ hugo
in order to produce the artifact
Upload:
- uploads
output/public
to static file server (like S3)
Available commands and options
$ java -jar target/fc-schema-cli.jar -h
███████╗ █████╗ ██████╗████████╗ ██████╗ █████╗ ███████╗████████╗
██╔════╝██╔══██╗██╔════╝╚══██╔══╝██╔════╝██╔══██╗██╔════╝╚══██╔══╝
█████╗ ███████║██║ ██║ ██║ ███████║███████╗ ██║
██╔══╝ ██╔══██║██║ ██║ ██║ ██╔══██║╚════██║ ██║
██║ ██║ ██║╚██████╗ ██║ ╚██████╗██║ ██║███████║ ██║
╚═╝ ╚═╝ ╚═╝ ╚═════╝ ╚═╝ ╚═════╝╚═╝ ╚═╝╚══════╝ ╚═╝
Usage: fc-schema [-hV] [COMMAND]
Tool for working with the FactCast Schema Registry spec
-h, --help Show this help message and exit.
-V, --version Print version information and exit.
Commands:
validate Validate your current events
build Validates and builds your registry
3.3 - 3rd Party CLI
As an alternative to the Factcast CLI
there is the Python based PyFactCast.
It is still in early development, but you might want to check it out.