1 - Java GRPC Producer
FactCast.publish(List<Fact> factsToPublish)
In order to produce Facts, you need to create Fact Instances and publish them via FactCast.publish()
.
On method return, the publishing of all the Facts passed to FactCast.publish()
as a parameter are expected to have been written to PostgreSQL successfully. The order of the Facts is preserved while inserting into the database. All the inserts are done in a transactional context, so that the atomicity is preserved.
If the method returns exceptionally, of the process is killed or interrupted in any way, you cannot know if the Facts have been successfully written. In that case, just repeat the call: if the write had gone through you’ll get and exception complaining about duplicate IDs, if not – you may have a chance to succeed now.
FactCast.publish(Fact toPublish)
acts the same way, as the List counterparts above, just for a List of one Fact.
Example Code
Here is some example code assuming you use the Spring GRPC Client:
@Component
class Foo{
@Autowired
FactCast fc;
public void someMethod(){
fc.publish( new SomethingHappenedFact() );
}
}
2 - Java GRPC Consumer
As mentioned before, there are three main Use-Cases for subscribing to a Fact-Stream:
- Validation of Changes against a strictly consistent Model (Catchup)
- Creating and maintaining a Read-Model (Follow)
- Managing volatile cached data (Ephemeral)
Here is some example code assuming you use the Spring GRPC Client:
Example Code: Catchup
@Component
class CustomerRepository{
@Autowired
FactCast factCast;
// oversimplified code !
public Customer getCustomer(UUID customerId){
// match all Facts currently published about that customer
SubscriptionRequest req = SubscriptionRequest.catchup(FactSpec.ns("myapp").aggId(customerId)).fromScratch();
Customer customer = new Customer(id);
// stream all these Facts to the customer object's handle method, and wait until the stream ends.
factCast.subscribe(req, customer::handle ).awaitComplete();
// the customer object should now be in its latest state, and ready for command validation
return customer;
}
}
class Customer {
Money balance = new Money(0); // starting with no money.
//...
public void handle(Fact f){
// apply Fact, so that the customer earns and spend some money...
}
}
Example Code: Follow
@Component
class QueryOptimizedView {
@Autowired
FactCast factCast;
@PostConstruct
public void init(){
UUID lastFactProcessed = persistentModel.getLastFactProcessed();
// subscribe to all customer related changes.
SubscriptionRequest req = SubscriptionRequest
.follow(type("CustomerCreated"))
.or(type("CustomerDeleted"))
.or(type("CustomerDeposition"))
.or(type("PurchaseCompleted"))
.from(lastFactProcessed);
factCast.subscribe(req, this::handle );
}
private FactSpec type(String type){
return FactSpec.ns("myapp").type(type);
}
@Transactional
public void handle(Fact f){
// apply Fact, to the persistent Model
// ...
persistentModel.setLastFactProcessed(f.id());
}
Example Code: Ephemeral
@Component
class CustomerCache {
@Autowired
FactCast factCast;
Map<UUID,Customer> customerCache = new HashMap<>();
@PostConstruct
public void init(){
// subscribe to all customer related changes.
SubscriptionRequest req = SubscriptionRequest.
.follow(type("CustomerCreated"))
.or(type("CustomerDeleted"))
.or(type("CustomerDeposition"))
.or(type("PurchaseCompleted"))
.fromNowOn();
factCast.subscribe(req, this::handle );
}
private FactSpec type(String type){
return FactSpec.ns("myapp").type(type);
}
@Transactional
public void handle(Fact f){
// if anything has changed, invalidate the cached value.
// ...
Set<UUID> aggregateIds = f.aggId();
aggregateIds.forEach(customerCache::remove);
}
3 - Java Optimistic Locking
Motivation
Whatever your particular way of modelling your software, in order to be able to enforce invariants in your aggregates, you’d need to coordinate writes to it. In simple monoliths, you do that by synchronizing write access to the aggregate. When Software-Systems become distributed (or at least replicated), this coordination obviously needs to be externalized.
Pessimistic Locking
While pessimistic locking makes sure every change is strictly serializable, it has obvious drawbacks in terms of throughput and complexity (timeouts) as well as the danger of deadlock, when the scope of the lock expands to more than one aggregate. This is why we chose to implement a means of optimistic locking in FactCast.
Optimistic Locking
In general, the idea of optimistic locking is to make a change and before publishing it, make sure there was no potentially contradicting change in between. If there was, the process can safely be retried, as there was nothing published yet.
Transferred to FactCast, this means to express a body of code that:
- creates an understanding of the published state of the aggregates in question
- invokes its business logic according to that state
- creates the effects: either fails (if business logic decides to do so), or publishes new Fact(s)
- rechecks, if the state recorded in 1. is still unchanged and then
- either publishes the prepared Facts or retries by going back to 1.
Usage
a simple example
This code checks if an account with id newAccountId already exists, and if not - creates it by publishing the Fact accordingly.
factcast.lock("myBankNamespace")
.on(newAccountId)
.attempt(() -> {
// check and maybe abort
if (repo.findById(newAccountId) !=null)
return Attempt.abort("Already exists.");
else
return Attempt.publish(
Fact.builder()
.ns("myBankNamespace")
.type("AccountCreated")
.aggId(newAccountId)
.build("{...}")
);
});
You may probably guess what happens, remembering the above steps. Let’s dive into details with a more complex scenario.
a complete example
The unavoidable imaginary example, of two BankAccounts and a money transfer between them:
factcast.lock("myBankNamespace")
.on(sourceAccountId,targetAccountId)
.optimistic() // this is optional, defaults to optimistic, currently the only mode supported
.retry(100) // this is optional, default is 10 times
.interval(5) // this is optional, default is no wait interval between attempts (equals to 0)
.attempt(() -> {
// fetch the latest state
Account source = repo.findById(sourceAccountId);
Account target = repo.findById(targetAccountId);
// run businesslogic on it
if (source.amount() < amountToTransfer)
return Attempt.abort("Insufficient funds.");
if (target.isClosed())
return Attempt.abort("Target account is closed");
// everything looks fine, create the Fact to be published
Fact toPublish = Fact.builder()
.ns("myBankNamespace")
.type("transfer")
.aggId(sourceAccountId)
.aggId(targetAccountId)
.build("{...}");
// register for publishing
return Attempt.publish(toPublish).andThen(()->{
// this is only executed at max once, and only if publishing succeeded
log.info("Money was transferred.");
});
});
Explanation
First, you tell factcast to record a state according to all events that have either sourceAccountId or targetAccountId in their list of aggIds and are on namespace myBankNamespace. While the namespace is not strictly necessary, it is encouraged to use it - but it depends on your decision on how to use namespaces and group Facts within them.
The number of retries is set to 100 here (default is 10, which for many systems is an acceptable default). In essence this means, that the attempt will be executed at max 100 times, before factcast gives up and throws an OptimisticRetriesExceededException
which extends ConcurrentModificationException
.
If interval is not set, it defaults to 0 with the effect, that the code passed into attempt is continuously retried without any pause until it either aborts, succeeds, or the max number of retries was hit (see above).
Setting it to 5 means, that before retrying, a 5 msec wait happens.
WARNING: Setting interval to non-zero makes your code block a thread. The above combination of 100 retries with a 5 msec interval means, that - at worst - your code could block longer than half a second.
Everything starts with passing a lambda to the attempt method. The lambda is of type
@FunctionalInterface
public interface Attempt {
IntermediatePublishResult call() throws AttemptAbortedException;
//...
}
so that it has to return an instance of IntermediatePublishResult
. The only way to create such an instance are static methods on the same interface (abort
, publish
, …) in order to make it obvious.
This lambda now is called according to the logic above.
Inside the lambda, you’d want to check the current state using the very latest facts from factcast (repo.findById(...)
) and then check your business constraints on it (if (source.amount() < amountToTransfer)
…).
If the constraints do not hold, you may choose to abort the Attempt and thus abort the process. In this case, the attempt will not be retried.
On the other hand, if you choose to publish new facts using Attempt.publish(...)
, the state will be checked and the Fact(s) will be published if there was no change in between (otherwise a retry will be issued, see above).
In the rare case, that you do not want to publish anything, you can return Attempt.withoutPublication()
to accomplish this.
Optionally, you can pass a runnable using .andThen
and schedule it for execution once, if and only if the publishing succeeded. Or in other words, this runnable is executed just once or never (in case of abort or OptimisticRetriesExceededException).