Java GRPC Consumer
As mentioned before, there are three main Use-Cases for subscribing to a Fact-Stream:
- Validation of Changes against a strictly consistent Model (Catchup)
- Creating and maintaining a Read-Model (Follow)
- Managing volatile cached data (Ephemeral)
Here is some example code assuming you use the Spring GRPC Client:
Example Code: Catchup
@Component
class CustomerRepository{
@Autowired
FactCast factCast;
// oversimplified code !
public Customer getCustomer(UUID customerId){
// match all Facts currently published about that customer
SubscriptionRequest req = SubscriptionRequest.catchup(FactSpec.ns("myapp").aggId(customerId)).fromScratch();
Customer customer = new Customer(id);
// stream all these Facts to the customer object's handle method, and wait until the stream ends.
factCast.subscribe(req, customer::handle ).awaitComplete();
// the customer object should now be in its latest state, and ready for command validation
return customer;
}
}
class Customer {
Money balance = new Money(0); // starting with no money.
//...
public void handle(Fact f){
// apply Fact, so that the customer earns and spend some money...
}
}
Example Code: Follow
@Component
class QueryOptimizedView {
@Autowired
FactCast factCast;
@PostConstruct
public void init(){
UUID lastFactProcessed = persistentModel.getLastFactProcessed();
// subscribe to all customer related changes.
SubscriptionRequest req = SubscriptionRequest
.follow(type("CustomerCreated"))
.or(type("CustomerDeleted"))
.or(type("CustomerDeposition"))
.or(type("PurchaseCompleted"))
.from(lastFactProcessed);
factCast.subscribe(req, this::handle );
}
private FactSpec type(String type){
return FactSpec.ns("myapp").type(type);
}
@Transactional
public void handle(Fact f){
// apply Fact, to the persistent Model
// ...
persistentModel.setLastFactProcessed(f.id());
}
Example Code: Ephemeral
@Component
class CustomerCache {
@Autowired
FactCast factCast;
Map<UUID,Customer> customerCache = new HashMap<>();
@PostConstruct
public void init(){
// subscribe to all customer related changes.
SubscriptionRequest req = SubscriptionRequest.
.follow(type("CustomerCreated"))
.or(type("CustomerDeleted"))
.or(type("CustomerDeposition"))
.or(type("PurchaseCompleted"))
.fromNowOn();
factCast.subscribe(req, this::handle );
}
private FactSpec type(String type){
return FactSpec.ns("myapp").type(type);
}
@Transactional
public void handle(Fact f){
// if anything has changed, invalidate the cached value.
// ...
Set<UUID> aggregateIds = f.aggId();
aggregateIds.forEach(customerCache::remove);
}
Last modified
July 24, 2023
: - fixing typos - formatting tables (42195b792)