In here, you will find some examples that you can use as a simple blueprint to get started building projections. We make use of some abstract classes here that might be more convenient to use. Feel free to study the implementations of those abstracts to see what is going on, especially when you plan to implement projections with different datastore than what we use in the examples.
1 - UserNames (Spring/JDBC)
Here is an example for a managed projection externalizing its state to a relational database (PostgreSQL here) using Spring transactional management.
The example projects a list of used UserNames in the System.
Preparation
We need to store two things in our JDBC Datastore:
- the actual list of UserNames, and
- the fact-stream-position of your projection.
Therefore we create the necessary tables (probably using liquibase/flyway or similar tooling of your choice):
CREATE TABLE users (
name TEXT,
id UUID,
PRIMARY KEY (id));
CREATE TABLE fact_stream_positions (
projection_name TEXT,
fact_stream_position UUID,
PRIMARY KEY (projection_name));
Given a unique projection name, we can use fact_stream_positions as a common table for all our JDBC managed projections.
TODO
provide example for acquireWriteToken based on JDBCConstructing
Since we decided to use a managed projection, we extended the AbstractSpringTxManagedProjection
class.
To configure transaction management, our managed projection exposes the injected transaction manager to the rest of Factus by calling the parent constructor.
@ProjectionMetaData(serial = 1)
@SpringTransactional
public class UserNames extends AbstractSpringTxManagedProjection {
private final JdbcTemplate jdbcTemplate;
public UserNames(
@NonNull PlatformTransactionManager platformTransactionManager, JdbcTemplate jdbcTemplate) {
super(platformTransactionManager);
this.jdbcTemplate = jdbcTemplate;
}
...
As we’re making use of Spring here, we inject a PlatformTransactionManager
and a JdbcTemplate
here in order to communicate with the database in a transactional way.
Two remarks:
- As soon as your project uses the
spring-boot-starter-jdbc
dependency, Spring Boot will automatically provide you with a JDBC-aware PlatformTransactionManager. - To ensure that the database communication participates in the managed transaction,
the database access mechanism must be also provided by Spring. Thus, we suggest using the
JdbcTemplate
.
Configuration
The @SpringTransactional
annotation provides various configuration options:
Parameter Name | Description | Default Value |
---|---|---|
bulkSize | bulk size | 50 |
timeoutInSeconds | timeout in seconds | 30 |
Updating the projection
The two possible abstract base classes, AbstractSpringTxManagedProjection
or AbstractSpringTxSubscribedProjection
,
both require the following methods to be implemented:
Method Signature | Description |
---|---|
public UUID factStreamPosition() | read the last position in the Fact stream from the database |
public void factStreamPosition(@NonNull UUID factStreamPosition) | write the current position of the Fact stream to the database |
public WriterToken acquireWriteToken(@NonNull Duration maxWait) | coordinates write access to the projection, see here for details |
The first two methods tell Factus how to read and write the Fact stream’s position from the database.
Writing the fact position
Provided the table fact_stream_positions
exists, here is an example of how to write the Fact position:
@Override
public void factStreamPosition(@NonNull UUID factStreamPosition) {
jdbcTemplate.update(
"INSERT INTO fact_stream_positions (projection_name, fact_stream_position) " +
"VALUES (?, ?) " +
"ON CONFLICT (projection_name) DO UPDATE SET fact_stream_position = ?",
getScopedName().asString(),
factStreamPosition,
factStreamPosition);
}
For convenience, an UPSERT statement (Postgres syntax) is used, which INSERTs the UUID the first time and subsequently only UPDATEs the value.
To avoid hard-coding a unique name for the projection, the provided method getScopedName()
is employed.
The default implementation makes sure the name is unique and includes the serial of the projection.
Reading the fact position
To read the last Fact stream position, we simply select the previously written value:
@Override
public UUID factStreamPosition() {
try {
return jdbcTemplate.queryForObject(
"SELECT fact_stream_position FROM fact_stream_positions WHERE projection_name = ?",
UUID.class,
getScopedName().asString());
} catch (IncorrectResultSizeDataAccessException e) {
// no position yet, just return null
return null;
}
}
In case no previous Fact position exists, null
is returned.
Applying Facts
When processing the UserCreated event, we add a new row to the users
tables, filled with event data:
@Handler
void apply(UserCreated e) {
jdbcTemplate.update(
"INSERT INTO users (name, id) VALUES (?,?);",
e.getUserName(),
e.getAggregateId());
}
When handling the UserDeleted event we do the opposite and remove the appropriate row:
@Handler
void apply(UserDeleted e) {
jdbcTemplate.update("DELETE FROM users where id = ?", e.getAggregateId());
}
We have finished the implementation of the event-processing part of our projection. What is missing is a way to make the projection’s data accessible for users.
Querying the projection
Users of our projections (meaning “other code”) contact the projection via it’s public API. Currently, there is no public method offering “user names”. So let’s change that:
public List<String> getUserNames() {
return jdbcTemplate.query("SELECT name FROM users", (rs, rowNum) -> rs.getString(1));
}
Using The Projection
Calling code that wants to talk to the projection, now just needs to call the getUserNames
method:
// create a local instance or get a Spring Bean from the ApplicationContext, depending on your code organization
UserNames userNameProjection = new UserNames(platformTransactionManager, jdbcTemplate);
// depending on many factors you *may* want to update the projection before querying it
factus.update(userNameProjection);
List<String> userNames = userNameProjection.getUserNames();
First, we create an instance of the projection and provide it with all required dependencies. As an alternative, you may want to let Spring manage the lifecycle of the projection and let the dependency injection mechanism provide you an instance.
Next, we call update(...)
on the projection to fetch the latest events from the Fact stream. Note that when you use a pre-existing (maybe Spring managed singleton) instance of the projection, this step is optional and depends on your use-case. As last step, we ask
the projection to provide us with user names by calling getUserNames()
.
Full Example
To study the full example see
- the UserNames projection using
@SpringTransactional
, - example code using this projection and
- the Factus integration tests including managed- and subscribed projections.
2 - UserNames (Redis Transactional)
Here is a projection that handles UserCreated and UserDeleted events. It solves the same problem as the example we’ve seen in Spring transactional projections. However, this time we use Redis as our data store and Redisson as the access API.
Configuration
The @RedisTransactional
annotation provides various configuration options:
Parameter Name | Description | Default Value |
---|---|---|
bulkSize | bulk size | 50 |
timeout | timeout in milliseconds until a transaction is interrupted and rolled back | 30000 |
responseTimeout | timeout in milliseconds for Redis response. Starts to countdown when transaction has been successfully submitted | 5000 |
retryAttempts | maximum attempts to send transaction | 5 |
retryInterval | time interval in milliseconds between retry attempts | 3000 |
Constructing
Since we decided to use a managed projection, we extended the AbstractRedisTxManagedProjection
class.
To configure the connection to Redis via Redisson, we injected RedissonClient
in the constructor, calling the parent constructor.
@ProjectionMetaData(revision = 1)
@RedisTransactional
public class UserNames extends AbstractRedisTxManagedProjection {
public UserNames(RedissonClient redisson) {
super(redisson);
}
...
FactStreamPosition and Lock-Management are automatically taken care of by the underlying AbstractRedisManagedProjection
.
In contrast to non-atomic projections, when applying Facts to the Redis data structure, the instance variable userNames
cannot be used
as this would violate the transactional semantics. Instead, accessing and updating the
state is carried out on a transaction derived data-structure (Map
here) inside the handler methods.
Updating the projection
Applying Events
Received events are processed inside the methods annotated with @Handler
(the handler methods). To participate in
the transaction, these methods have an additional RTransaction
parameter which represents the current transaction.
Let’s have a closer look at the handler for the UserCreated
event:
@Handler
void apply(UserCreated e, RTransaction tx){
Map<UUID, String> userNames=tx.getMap(getRedisKey());
userNames.put(e.getAggregateId(),e.getUserName());
}
Note
RTransaction handling is the responsibility of Factus. As developers, you must not call e.g.commit()
or rollback()
yourself.In the previous example, the method getRedisKeys()
was used to retrieve the Redis key of the projection. Let’s have a
closer look at this method in the next section.
Default redisKey
The data structures provided by Redisson all require a unique identifier which is used to store them in Redis. The method getRedisKey()
provides an
automatically generated name, assembled from the class name of the projection and the serial number configured with
the @ProjectionMetaData
.
Additionally, an AbstractRedisManagedProjection
or a AbstractRedisSubscribedProjection
, as well as their transactional
(Tx
) counterparts, maintain the following keys in Redis:
getRedisKey() + "_state_tracking"
- contains the UUID of the last position of the Fact streamgetRedisKey() + "_lock"
- shared lock that needs to be acquired to update the projection.
Redisson API Datastructures vs. Java Collections
As seen in the above example, some Redisson data structures also implement the appropriate Java Collections interface.
For example, you can assign
a Redisson RMap
also to a standard Java Map
:
// 1) use specific Redisson type
RMap<UUID, String> = tx.getMap(getRedisKey());
// 2) use Java Collections type
Map<UUID, String> = tx.getMap(getRedisKey());
There are good reasons for either variant, 1)
and 2)
:
Redisson specific | plain Java |
---|---|
extended functionality which e.g. reduces I/O load. (e.g. see RMap.fastPut(...) and RMap.fastRemove(...) | standard, intuitive |
only option when using data-structures which are not available in standard Java Collections (e.g. RedissonListMultimap) | easier to test |
Full Example
@ProjectionMetaData(revision = 1)
@RedisTransactional
public class UserNames extends AbstractRedisTxManagedProjection {
private final Map<UUID, String> userNames;
public UserNames(RedissonClient redisson) {
super(redisson);
userNames = redisson.getMap(getRedisKey());
}
public List<String> getUserNames() {
return new ArrayList<>(userNames.values());
}
@Handler
void apply(UserCreated e, RTransaction tx) {
tx.getMap(getRedisKey()).put(e.getAggregateId(), e.getUserName());
}
@Handler
void apply(UserDeleted e, RTransaction tx) {
tx.getMap(getRedisKey()).remove(e.getAggregateId());
}
}
To study the full example, see