Skip to content

Outbox Pattern: Spring Boot Starter Implementierung

Outbox Pattern Part II: Spring Boot Starter Implementation

Outbox Pattern Spring Boot Starter – Reliable Event Delivery for Microservices Messaging

After covering the general purpose of the outbox pattern in Part I we will dive deeper into the actual implementation of the outbox starter. Let’s recap the key requirements and elements for implementing an outbox pattern and look at code snippets of the actual implementation.

Inhaltsverzeichnis

Transactional Consistency in Event-Driven Systems

The outbox pattern helps ensure that events are reliably sent while keeping the database in sync. This means that you want to store the events within the same transaction as your primary data, so if a business operation succeeds, the event is guaranteed to be written too.

The outbox table acts as a queue of events that need to be processed. It contains information about:

  • The event payload (message to be sent).

  • The status of the message (whether it’s been processed, pending, or failed).

  • Metadata like event type or timestamp.

The functionality to enqueue (add) and dequeue (process) outbox entries is needed.

Outbox Entries and Status Tracking for Distributed Systems

In our implementation all Outbox entries have the following fields which are filled and used by our custom outbox starter:

Field Description
id The ID of the entry as string
lastFailureMessage The message of the exception which occurred for the last failure
nextRetryAt The timestamp when the next retry should happen. Note that the retry job will retry all entries which are in state FAILED and where nextRetryAt timestamp is null or < now.
deleteAt The timestamp when the entry will be deleted. Set for PROCESSED entries and for FAILED if an exception is thrown during processing which is configured as ignored exception.
statusHistory The status history
(see Visibility and Monitoring)
failures The number of failed delivery attempts of this entry
status The current status of the entry
statusTimestamp The timestamp of the last status transition

The Outbox starter provides the abstract classes AbstractOutboxDocument and AbstractDefaultOutboxEntity, which implement the mentioned fields for MongoDb and for MySql respectively. Concrete implementations must extend these classes but can include additional properties.

				
					@Document
@Data
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class TestOutboxDocument extends AbstractOutboxDocument {
    private String customProperty;
    @Builder
    public TestOutboxDocument(String customProperty) {
        this.customProperty = customProperty;
    }
				
			

Enqueuing and Dequeuing Outbox Entries in Spring Boot

In the outbox pattern, the application generates events in response to certain business actions, like a database update or state change. These events are temporarily stored in an outbox table within the same database, ensuring that the event is recorded as part of the same transaction that made the change.

We use the name ENQUEUE when writing an entry in the outbox table

We use the name DEQUEUE when processing an outbox entry

Lets take a closer look on the steps involved

Enqueue: Writing Reliable Messages to the Outbox Table

Together with a change in the database (e.g., creating an order, updating a user profile), an entry is created and stored as ENQUEUED in the outbox table by calling the enqueue method of the corresponding Enqueuer in the same transaction.

				
					/**
 * Provides methods to enqueue an outbox entry.
 * @param <T> The type of the outbox entry.
 */
public interface Enqueuer<T extends OutboxEntry> extends QueueProcessor<T> {

    /**
     * Enqueue a new outbox entry with status {@link com.nitrobox.outbox.core.model.QueueStatus#ENQUEUED}
     */
    T enqueue(T outboxEntry);

}
				
			

The starter auto-configures an EnqueueAdapter bean which can be used to create the Enqueuer by calling EnqueueAdapter#createEnqueuer.

				
					@Configuration
public class OutboxConfiguration {

    @Bean
    public Enqueuer<TestOutboxDocument> testEnqueuer(EnqueueAdapter enqueueAdapter) {
        return enqueueAdapter.createEnqueuer(TestOutboxDocument.class);
    }
}
				
			

Dequeue: Processing Outbox Entries for Guaranteed Delivery

A worker or a separate process polls the outbox table to check for ENQUEUED entries and starts the processing of it by calling the corresponding Dequeuer.

				
					/**
 * Provides methods to dequeue an outbox entry.
 *
 * @param <T> The type of the outbox entry.
 */
public interface Dequeuer<T extends OutboxEntry> extends QueueProcessor<T> {

    /**
     * Dequeue and process an {@link QueueStatus#ENQUEUED} entry. Uses configured circuit breaker.
     *
     * @param id The entry id. Might be <code>null</code> to dequeue the oldest ENQUEUED entry.
     * @return The id of the dequeued entry or empty if not processed.
     */
    Optional<String> dequeueEnqueued(String id);
    
}
				
			

In our outbox implementation, a worker scheduled with spring TaskScheduler periodically queries the outbox table to see if there are any new ENQUEUED entries that need to be processed.

While this approach works, it comes with a few significant drawbacks:

  • Inefficiency: Polling requires constant checks, which leads to unnecessary database queries. This results in additional overhead, especially if there are few new events to process.

  • Latency: Since the worker only checks at specific intervals, there may be a delay between when an event is created and when it is enqueued into the message queue.

  • Resource Consumption: Polling consumes system resources like CPU and database connections, even when there are no new entries to process, leading to inefficiency and higher operational costs.

  • Burst Problem: Instead of evenly distributing events over time, the polling mechanism leads to bursts where events are processed all at once, creating periods of inactivity followed by periods of heavy load. This uneven distribution can negatively impact the responsiveness of the system.

The need for constant polling can create a situation where resources are wasted on querying the database even when there are no new events. This issue becomes more pronounced as the system scales, especially when there is a high frequency of events being generated. In this case, the system needs to check more often for new updates to stay current. So as the event frequency increases, the number of these redundant, unnecessary queries also rises.

A Better Solution: Spring Application Events

Instead of relying on polling, our solution to improve the efficiency of the dequeuing process is to use Spring application events. By using Spring’s event-driven approach, the system can trigger the dequeuing process immediately when a new event is generated, without having to rely on a constant polling loop.

Since the processing of Spring application events is not guaranteed—such as in cases where the application is stopped or crashes—we use a combination of Spring application events and a scheduled worker. The events are used to process enqueued entries immediately, while the scheduled worker picks up any remaining entries in case the events were lost due to one of these issues. Only tasked with catching edge cases, the worker can be scheduled less frequently.

Processing of outbox entries

The starter auto-configures a DequeueAdapter bean which can be used to create a Dequeuer by calling DequeueAdapter#createDequeuer.

				
					@Component
@RequiredArgsConstructor
public class TestDequeueDBAdapter {

    private final DequeueAdapter dequeueAdapter;

    private Dequeuer<TestOutboxDocument> dequeuer;

    @PostConstruct
    public void init() {
        dequeuer = dequeueAdapter.createDequeuer(TestOutboxDocument.class, message -> {
            //TODO implement consumer
        }, false);
        dequeuer.registerFailureHandler((message, e) -> {
            //TODO handle failure
        });
    }

    public ScheduledFuture<?> startAutomaticRetry() {
        return dequeueAdapter.startAutomaticRetry(TestOutboxDocument.class);
    }

    public void dequeueEnqueued(String id) {
        dequeuer.dequeueEnqueued(id);
    }
}
				
			

When creating a Dequeuer a consumer has to be provided which is called whenever an outbox entry is dequeued.

  • If the consumer terminates without an exception the entry is marked as PROCESSED if the outbox is configured to process entries synchronously.

  • If the consumer terminates with an exception the entry is marked as FAILED and will be retried if the automatic retry is running and the exception is not ignored.

The automatic retry job is started per default but can also be started manually

In addition, a failure handler can be registered to respond to failures during processing, such as logging error messages.

One of the primary concerns when processing outbox entries is duplicate processing. If multiple instances of a background process attempt to process the same entry simultaneously, it could lead to:

  • Re-sending the same message: The downstream service may receive the same event multiple times, which can lead to unexpected behavior, such as duplicated records or repeated transactions.

  • Inconsistent system state: If the event affects the state of multiple systems, duplicate processing could create inconsistencies or race conditions.

  • Data integrity issues: Repeated actions like making duplicate payments or inserting duplicate records can seriously harm data integrity.

Locking outbox entries before processing them ensures that only one instance of the background process can work on each entry at any given time, thereby preventing duplicates from being sent or processed.

In our implementation, the entries to be processed are selected based on a criteria, which defines a database query for retrieving the entry. For example a MongoDB criteria used by the polling process can be as simple as:

				
					Criteria.where(FIELD_STATUS).is(QueueStatus.ENQUEUED)
				
			

The criteria is passed to the processOutboxEntry method which:

  1. Atomically updates the state of the entry to PROCESSING by calling executeStatusUpdate(criteria, QueueStatus.PROCESSING) where executeStatusUpdate for MongoDB looks like

				
					T executeStatusUpdate(Criteria criteria, QueueStatus newStatus) {
    Update update = buildStatusUpdate(newStatus, config.getExpireAfter());
    Query query = Query.query(criteria).with(Sort.by(Direction.ASC, FIELD_STATUS_TIMESTAMP));
    return mongoTemplate.findAndModify(query, update, RETURN_NEW, config.getOutboxClazz());
}
				
			
  1. We lock the entry by using LockModeTyp.PESSIMISTIC_WRITEfor MySQL.

  2. Calls the configured consumer

  3. Updates the state of the entry to PROCESSED (or FAILED in case of an error)

This is how the Java code to process the outbox entries looks:

				
					private Optional<String> processOutboxEntry(Criteria criteria) throws OutboxProcessingException {
    T updated = executeStatusUpdate(criteria, QueueStatus.PROCESSING);
    if (updated == null) {
        //another pod is already processing the entry or entry cannot be found.
        return Optional.empty();
    }

    Criteria nextCriteria = idAndStatusCriteria(updated.getId(), QueueStatus.PROCESSING);

    DequeueMeta dequeueMeta = DequeueMeta.builder()
            .attempt(updated.getFailures() + 1)
            .maxAttempts(config.getRetryConfig().getMaxAttempts())
            .build();

    try {
        log.debug("Processing {}", outboxEntry(updated.getId(), getOutboxClass()));

        DequeueMessage<T> message = DequeueMessage.<T>builder()
          .content(updated)
          .meta(dequeueMeta)
          .build();

        consumer.andThen(t -> executeStatusUpdate(nextCriteria, QueueStatus.PROCESSED))
          .accept(message);

        log.debug("Processed {}", outboxEntry(updated.getId(), getOutboxClass()));
        return Optional.of(updated.getId());
    } catch (Exception e) {
        T failedEntry = executeFailureUpdate(nextCriteria, RETURN_NEW, updated, e);
        throw new OutboxProcessingException(failedEntry.getId(), e);
    }
}
				
			

Improving Reliable Message Delivery: Failure Handling and Retry Logic

To effectively handle delivery failures, it’s crucial to implement retry mechanisms for failed or stuck entries. This creates a need for strategies such as exponential backoff, dead-letter queues for events that continuously fail, and the possibility of manual intervention when required.

Error Handling and Resilience Strategies

hen an exception is thrown during processing the entry failure count is incremented and stored as failures. In addition resilience4j – RetryConfig is used to calculate nextRetryAt

				
					public LocalDateTime calculateNextRetryTime(LocalDateTime currentRetryAt, int attempt, Throwable exception) {
    LocalDateTime currentRetry = Objects.requireNonNullElseGet(currentRetryAt, Clock::nowAsLocal);
    LocalDateTime nextRetryTime = null;
    if (attempt != 0) {
        long interval = getRetryConfig().getIntervalBiFunction().apply(attempt, Either.left(exception));
        nextRetryTime = currentRetry.plus(interval, MILLIS);
    }
    return nextRetryTime;
}
				
			

A scheduled job periodically retries all entries where

nextRetryAt < now AND attempt < maxAttempts

Circuit Breaker for Microservices Stability

A circuit breaker is a important addition to failure handling in the outbox pattern. When there are repeated failures in delivering messages, the circuit breaker detects the issue and „opens“ to stop further attempts temporarily. This prevents the system from repeatedly trying to send a failing message and overwhelming resources.

  • When the circuit breaker is open, the system will stop sending events for a set period, giving the external system time to recover. Once the system is stable again, the circuit breaker will „close“ and allow normal processing to resume.

When an entry is processed we wrap the processing in a resilience4j – CircuitBreaker

				
					try {
    return circuitBreaker.executeSupplier(() -> processOutboxMessage(criteria));
} catch (OutboxProcessingException e) {
    //set entry status to FAILED and increment failures
} catch (CallNotPermittedException e) {
    log.debug("Circuit breaker prevents processing of {}", outboxEntry(objectId, getOutboxClass()));
    //set entry status to FAILED without incrementing failures
}
				
			

Visibility and Monitoring in Microservices Messaging Systems

When new entries are added to the outbox, monitoring becomes essential. By keeping a close eye on the status of these entries, we can quickly identify whether they are being successfully processed, and more importantly, detect any failures in the delivery process.

Understanding and implementing effective monitoring is key to preventing bottlenecks and ensuring seamless communication across the platform.

  • You need to know:

    • When new events are added to the outbox.

    • Whether events are being successfully dispatched.

    • If there are any failures in the delivery process.

    • How long processing of entries takes

  • This means

    • tracking of outbox entries through its lifecycle

    • setting up logging, metrics, and alerting to ensure you’re aware of potential issues in your event publishing pipeline.

To accomplish this, whenever the status of an outbox entry is changed, we also record the change in a status history, which is structured as follows

				
					[
    {
        "status": "PROCESSED",
        "timestamp": "2025-01-24T05:00:10.655"
    },
    {
        "status": "PROCESSING",
        "timestamp": "2025-01-24T05:00:10.435"
    },
    {
        "status": "ENQUEUED",
        "timestamp": "2025-01-24T05:00:10.41"
    }
]
				
			

We use Micrometer metrics which makes it possible to monitor the amount of FAILED entries for each outbox and

Grafana Dashboard showing the current amount of FAILED entries and the FAILED entries over time
Grafana Dashboard showing the current amount of FAILED entries and the FAILED entries over time

We use Micrometer tracing with custom ObservationConventions to trace enqueuing and dequeing of entries and also maintain the ObservationContext.

Spans of a Trace showing enqueuing and dequeueing in Grafana Tempo
Spans of a Trace showing enqueuing and dequeueing in Grafana Tempo

Outbox Cleanup for Processed Events in Distributed Architectures

Once events are successfully sent, they should be marked as PROCESSED and should be deleted after some time to avoid re-processing.

When an entry has been processed successfully the status is set to PROCESSED and the deleteAt timestamp is set to Clock.nowAsLocal().plus(getExpireAfter())

A scheduled job periodically deletes all entries where deleteAt < now.

Scaling Outbox Processing in Event-Driven Microservices

As your system grows, you may need to handle increased throughput and scale your outbox processing. This could involve scaling the background processors, or distributing load between multiple consumers.

To distribute the load we have to following mechanisms in place:

  • The worker which polls for ENQUEUED entries is running on each instance.

  • There is a configurable thread pool which is responsible to process the spring application events which are triggered when new entries are enqueued.

Note: The retry job is only running on a single instance to prevent overloading the system. To achieve this we are using ShedLock which makes sure that scheduled tasks are executed at most once at the same time. If a task is being executed on one node, it acquires a lock which prevents execution of the same task from another node (or thread).

Conclusion – Implementing a Scalable and Reliable Spring Boot Outbox Pattern

We outlined the key steps and considerations for implementing the Outbox Pattern, although many details were left out for brevity. By following the guidance provided, you should now have a solid understanding of how to implement an Outbox starter in your system. While the specifics of your implementation may vary based on your architecture and needs, the core principles discussed here will help you build a reliable and scalable solution for inter-service communication.

About the Author

David Albrecht is Senior Software Engineer at Nitrobox. You can find David on GitHub and LinkedIn.