Blueprint to Bytes: Implementing Messaging with JMS and ActiveMQ
Unlocking the Power of Messaging in Distributed Systems with JMS and ActiveMQ
🙌 Catch Up
Previously in this series, we explored what CQRS and event-driven architecture are and how an implementation for a bookstore could look like.
👋 Introduction to Messaging in Distributed Systems
In the first article, we explored the basics of CQRS and event sourcing. We saw that we have a read model and a write model, or more specifically, an event store. To update our read model, we need a mechanism to update it based on changes in the event store. Here, messaging plays a key role and enables loose coupling between components. In this part of the series, we will explore how to achieve this with JMS and ActiveMQ for our Bookstore Application. Additionally, we will introduce the Transactional Outbox Pattern to ensure consistency when publishing events.
🧐 Exploring Jakarta Messaging (JMS)
Jakarta Messaging (JMS) is an API definition for messaging functionality in Java-based applications. Here, we have two concepts supported by JMS:
Point-to-Point: A message is sent from one sender to exactly one recipient.
Publish/Subscribe: A message is sent to everyone who subscribes to that topic. Sometimes this approach is also called fan-out.
With these concepts, we can implement several scenarios for efficient communication.
🔎 ActiveMQ Artemis Overview
ActiveMQ Artemis is an open-source messaging broker developed by the Apache Foundation. It enables producing and consuming messages between various systems, enabling loose coupling and scalability, as well as measures for backpressure.
Technical Design of ActiveMQ
Broker Architecture: ActiveMQ uses a broker-based architecture, where the broker handles messages between producers and consumers.
Messaging Concepts: ActiveMQ supports the aforementioned Point-to-Point and Publish/Subscribe messaging concepts.
Messaging Protocols: ActiveMQ supports several messaging protocols like AMQP, MQTT, STOMP, and OpenWire, allowing for easy integration with various systems.
Persistence: ActiveMQ can persist messages to prevent data loss, supports transactions, guarantees message delivery, and includes Dead-Letter Queues (DLQ).
Scaling: ActiveMQ is horizontally scalable by adding more brokers and distributing the workload evenly across them.
High Availability: You can set up clusters and configure primary/secondary configurations for high availability and fault tolerance.
The following illustration shows the high-level architecture of ActiveMQ Artemis. The server is the central element and manages communication with the ProtocolManager using several protocols. The Core Protocol acts as an API between the server and the supported protocols. Persistence is achieved through various mechanisms like paging and planned JDBC integration. Applications communicate via Core Clients and a JMS Facade if JMS is used.
⚙️ Setting Up ActiveMQ
For our purposes, we will set up ActiveMQ and Couchbase with Docker Compose. We use the couchbasefakeit image designed for testing and local development. This image spins up a single Couchbase Server instance and allows us to initialize it with buckets, indexes, and fake data.
FROM btburnett3/couchbasefakeit:enterprise-7.2.3
# Customize environment
ENV CB_DATARAM=256 \
CB_CLUSTER_NAME=couchbase-local \
CB_USERNAME=Administrator \
CB_PASSWORD=password
# Copy files
COPY . /startup/
[
{
"name": "bookstore-sample",
"ramQuotaMB": 100,
"bucketType": "couchbase",
"authType": "sasl",
"saslPassword": "",
"evictionPolicy": "fullEviction",
"replicaNumber": 0,
"flushEnabled": 1
},
{
"name": "default",
"ramQuotaMB": 100,
"bucketType": "couchbase",
"authType": "sasl",
"saslPassword": "",
"evictionPolicy": "fullEviction",
"replicaNumber": 0,
"flushEnabled": 1
}
]
The complete Docker Compose file looks now like this:
version: "3.8"
services:
couchbase-server:
build: ./cb-local-server
ports:
- 8091:8091
- 8092:8092
- 8093:8093
- 11210:11210
hostname: couchbase-server
container_name: couchbase-server
working_dir: /opt/couchbase
stdin_open: true
tty: true
activemq:
image: apache/activemq-artemis:latest
ports:
- 8161:8161 # Web Console # artemis:artemis
- 61616:61616 # JMS
container_name: activemq
hostname: activemq
🖇 Integrating JMS with ActiveMQ in the Bookstore Application
Now we will focus on the technical setup and integration of JMS with ActiveMQ into our application. We will show the necessary configurations, the production and consumption of events, their benefits, and how they fit into the bigger picture considering Event Sourcing and CQRS.
Introduction
To achieve loose coupling in our application and to separate the read model from the write model, we need a robust messaging system that simplifies communication between components. JMS in combination with ActiveMQ is such a solution for messaging. There are several alternatives, such as AWS SNS and SQS or Kafka.
Required Libraries
Make sure you have this library included in your build.gradle
:
implementation 'org.springframework.boot:spring-boot-starter-activemq'
Basic Configuration
ActiveMQ Configuration: Configure the connection to the ActiveMQ Broker in
application.yml
. Ensure the password matches the one in your docker-compose file.
spring:
activemq:
broker-url: tcp://localhost:61616
user: artemis
password: artemis
Spring Configuration: Define a Configuration class with beans for the Listener Factory and for converting objects consumed with the Jackson2Message Converter. The converter maps the Spring Message type to JSON and deserializes it into our objects. We also create a Configuration for the Topics we need and they are created for us by the integration. To enable the Publish/Subscribe message mechanism we use
factory.setPubSubDomain(true);
If you want your data structure that you send with subtypes, you need to support polymorphism for deserialization. This can be achieved by using JsonTypeInfo and JsonSubTypes:
We set up our topics that we send messages to like this:
Producing Events
We use the provided JmsTemplate
that we inject for creating events. We send the entire object, and the MappingJackson2MessageConverter
handles deserialization.
Consuming Events
To consume events, we use the @JmsListener
annotation and specify the topic from which we want to consume. The event is automatically deserialized.
📍 Where Do We Need Messaging in Our Bookstore Application?
In our Bookstore Application, messaging is crucial for the loose coupling of components and easier scalability. We have two main concerns where it is used:
Order Processing: We have orders, and considering our data model, we need to update the stock for a book. As this crosses our defined boundary for orders and needs to update a book, we have a handler that consumes order-related events and updates the book accordingly. Here, we have eventual consistency. In a real production environment, you can handle eventual consistency by adding some leeway time before confirming the order to ensure the stock is sufficient.
Read Model Updates: As we asynchronously update our read models, we need to publish our domain events and update accordingly.
🤝 Relation to Event Sourcing and CQRS
I have explained the concepts of Event Sourcing and CQRS previously, so if you are not familiar, check out this article:
Event Sourcing
Event Sourcing is a pattern where state changes are captured as a sequence where each event is immutable. Our application uses events to store important state changes in our aggregates.
CQRS (Command Query Responsibility Segregation)
CQRS is a pattern that separates read and write operations and often also the data store. We are not doing the latter here, but the concept should become clear. With JMS and ActiveMQ, we can asynchronously update our read models in reaction to events.
Aggregates and Event Flow
"A DDD aggregate is a cluster of domain objects that can be treated as a single unit. An example may be an order and its line items; these will be separate objects, but it's useful to treat the order (together with its line items) as a single aggregate." - Martin Fowler on DDD Aggregates
In our bookstore application, such aggregates are the Order and Book aggregate:
Book and Order Aggregates: We handle the initial creation of the aggregate and have a list of events that we have not committed to the event store yet. This works similarly for other events like
OrderCancelledEvent
orStockDecreasedEvent
.
Event Handling: The following code snippets show the segregation based on the book context. If we need to restore the whole aggregate, we load all the events for that aggregate and use such events with the rehydrate call to rebuild it. This can be optimized by snapshots if your data store grows significantly.
This snippet shows how we save a book to the domain events repository and how we save all uncommitted events to the outbox repository to publish them and allow interested consumers to listen to them.
The third example shows a listener that consumes the
BookAddedToCatalogTopic
and creates a read model for the book, which is just a current snapshot and faster for reads than rebuilding the whole aggregate.
Benefits
Loose Coupling: JMS with ActiveMQ provides loose coupling and increases maintainability.
Scalability: Asynchronous messaging enables better scalability and backpressure. We can consume messages at our speed based on current load, as long as we consume faster than we produce on average.
Integration of Event Sourcing and CQRS: This fits into the patterns, allowing us to update our event store asynchronously and handle eventual consistency.
Robustness: ActiveMQ can guarantee message delivery and supports the publish/subscribe pattern that we wanted to use.
👌 Best Practices and Troubleshooting
To use JMS and ActiveMQ efficiently and safely in production, consider these best practices:
Use persistent messages to avoid lost messages in case of message broker downtime.
Monitor your brokers with the ActiveMQ Admin UI.
Adjust security configurations with SSL and least privilege options for your users.
🔜 Upcoming Stories in this Series:
The next story in this series will focus on reliable transactions with the Transactional Outbox Pattern
Afterwards, we will focus on Couchbase and Database Migrations
📚 Resources
Spring Getting Started with JMS
🏁 Conclusion
In this article, we explored the basics of JMS and ActiveMQ and their integration into the Java bookstore sample application. We examined how to send events for asynchronous updates of our read model and how to build our aggregates from the event store. In the next article in this series, we will explore the Transactional Outbox Pattern and how to ensure consistent data while publishing events during transactions.