The goal of this project is to create a Spring Boot
application that handles users
using Event Sourcing
. So, besides the traditional create/update/delete, whenever a user is created, updated or deleted, an event informing this change is sent to Kafka
. Furthermore, we will implement another Spring Boot
application that listens to those events and saves them in Cassandra
.
Note: In
kubernetes-environment
repository, it is shown how to deploy this project inKubernetes
(Minikube
)
Spring Boot
Web Java application responsible for handling users. The user information will be stored in MySQL
. Once a user is created, updated or deleted, an event is sent to Kafka
.
user-service
can use JSON
or Avro
format to serialize data to the binary
format used by Kafka. If Avro
format is chosen, both services will benefit by the Schema Registry
that is running as Docker container. The serialization format to be used is defined by the value set to the environment variable SPRING_PROFILES_ACTIVE
.
Configuration | Format |
---|---|
SPRING_PROFILES_ACTIVE=default |
JSON |
SPRING_PROFILES_ACTIVE=avro |
Avro |
Spring Boot
Web Java application responsible for listening events from Kafka
and saving those events in Cassandra
.
Differently from user-service
, event-service
has no specific Spring profile to select the deserialization format. Spring Cloud Stream provides a stack of MessageConverters
that handle the conversion of many different types of content-types, including application/json
. Besides, as event-service
has SchemaRegistryClient
bean registered, Spring Cloud Stream auto configures an Apache Avro message converter for schema management.
In order to handle different content-types, Spring Cloud Stream has a "content-type negotiation and transformation" strategy (more here). The precedence orders are: first, content-type present in the message header; second, content-type defined in the binding; and finally, content-type is application/json
(default).
The producer (in the case user-service
) always sets the content-type in the message header. The content-type can be application/json
or application/*+avro
, depending on with which SPRING_PROFILES_ACTIVE
user-service
is started.
Run the following command in springboot-kafka-mysql-cassandra
root folder. It will re-generate the Java classes from the Avro schema present at event-service/src/main/resources/avro
.
./gradlew event-service:generateAvro
In a terminal and inside springboot-kafka-mysql-cassandra
folder, in order to build the applications docker images, you can just run the following script
./build-apps.sh
Or manually run the ./gradlew
commands for each application.
./gradlew user-service:docker -x test
Environment Variable | Description |
---|---|
MYSQL_HOST |
Specify host of the MySQL database to use (default localhost ) |
MYSQL_PORT |
Specify port of the MySQL database to use (default 3306 ) |
KAFKA_HOST |
Specify host of the Kafka message broker to use (default localhost ) |
KAFKA_PORT |
Specify port of the Kafka message broker to use (default 29092 ) |
SCHEMA_REGISTRY_HOST |
Specify host of the Schema Registry to use (default localhost ) |
SCHEMA_REGISTRY_PORT |
Specify port of the Schema Registry to use (default 8081 ) |
ZIPKIN_HOST |
Specify host of the Zipkin distributed tracing system to use (default localhost ) |
ZIPKIN_PORT |
Specify port of the Zipkin distributed tracing system to use (default 9411 ) |
./gradlew event-service:docker -x test
Environment Variable | Description |
---|---|
CASSANDRA_HOST |
Specify host of the Cassandra database to use (default localhost ) |
CASSANDRA_PORT |
Specify port of the Cassandra database to use (default 9042 ) |
KAFKA_HOST |
Specify host of the Kafka message broker to use (default localhost ) |
KAFKA_PORT |
Specify port of the Kafka message broker to use (default 29092 ) |
SCHEMA_REGISTRY_HOST |
Specify host of the Schema Registry to use (default localhost ) |
SCHEMA_REGISTRY_PORT |
Specify port of the Schema Registry to use (default 8081 ) |
ZIPKIN_HOST |
Specify host of the Zipkin distributed tracing system to use (default localhost ) |
ZIPKIN_PORT |
Specify port of the Zipkin distributed tracing system to use (default 9411 ) |
In a terminal and inside springboot-kafka-mysql-cassandra
root folder run
docker-compose up -d
Wait a little bit until all containers are Up (healthy)
. You can check by running the following command
docker-compose ps
Open a terminal and inside springboot-kafka-mysql-cassandra
root folder run following script
./start-apps.sh
Note: In order to run
user-service
withAvro
use./start-apps.sh avro
During development, it is easier to just run the applications instead of always build the docker images and run it. For it, inside springboot-kafka-mysql-cassandra
, run the following Gradle commands in different terminals
./gradlew user-service:bootRun --args='--server.port=9080'
Note: In order to run
user-service
withAvro
use./gradlew user-service:bootRun --args='--server.port=9080 --spring.profiles.active=avro'
./gradlew event-service:bootRun --args='--server.port=9081'
Application | URL |
---|---|
user-service | http://localhost:9080/swagger-ui.html |
event-service | http://localhost:9081/swagger-ui.html |
-
Open
user-service
Swagger http://localhost:9080/swagger-ui.html -
Create a new user,
POST /api/users
-
Open
event-service
Swagger http://localhost:9081/swagger-ui.html -
Get all events related to the user created, informing the user id
GET /api/events/users/{id}
-
You can also check how the event was sent by
user-service
and listened byevent-service
(as shown on the image below) usingZipkin
http://localhost:9411 -
Create new users and update/delete existing ones in order to see how the application works.
Run the command below to stop the applications
./stop-apps.sh
Then, run the following command to stop and remove docker-compose containers, networks and volumes
docker-compose down -v
Run the command below to trigger event-service
test cases
./gradlew event-service:test
Run the following command to start user-service
test cases
./gradlew user-service:test
Note: We are using
Testcontainers
to runuser-service
integration tests. It starts automatically some Docker containers before the tests begin and shuts the containers down when the tests finish.
docker exec -it mysql mysql -uroot -psecret --database userdb
select * from users;
docker exec -it cassandra cqlsh
USE mycompany;
SELECT * FROM user_events;
Zipkin
can be accessed at http://localhost:9411
Kafka Topics UI
can be accessed at http://localhost:8085
Schema Registry UI
can be accessed at http://localhost:8001
Kafka Manager
can be accessed at http://localhost:9000
Configuration
- First, you must create a new cluster. Click on
Cluster
(dropdown on the header) and then onAdd Cluster
- Type the name of your cluster in
Cluster Name
field, for example:MyZooCluster
- Type
zookeeper:2181
inCluster Zookeeper Hosts
field - Enable checkbox
Poll consumer information (Not recommended for large # of consumers)
- Click on
Save
button at the bottom of the page.
The image below shows the topics present on Kafka, including the topic com.mycompany.userservice.user
with 2
partitions, that is used by the applications of this project.
Unable to upgrade to Spring Boot
version 2.2.2
.
Spring Cloud Stream
has changed the Schema Registry
and the documentation is very poor so far.
The user-service
was ok to change. However, event-service
cannot deserialize the event.
Next time to try, those are the changes to be done:
-
FROM
implementation 'org.springframework.cloud:spring-cloud-stream-schema'
TO
implementation 'org.springframework.cloud:spring-cloud-schema-registry-client'
-
FROM
cloud: stream: schema-registry-client: endpoint: http://${SCHEMA_REGISTRY_HOST:localhost}:${SCHEMA_REGISTRY_PORT:8081}
TO
cloud: schema-registry-client: enabled: true endpoint: http://${SCHEMA_REGISTRY_HOST:localhost}:${SCHEMA_REGISTRY_PORT:8081} stream:
-
FROM
cloud: stream: schema: avro: schema-locations: - classpath:avro/userevent-message.avsc
TO
cloud: schema: avro: schema-locations: - classpath:avro/userevent-message.avsc stream:
-
FROM
SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schema-registry-client.endpoint}") String endpoint) {
TO
SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.schema-registry-client.endpoint}") String endpoint) {