A Reactive Approach in a High-Load Application Using the Service for Calculating Cashback as an Example
This article does not aim at describing frameworks and architecture, as they are already well documented. Rather, it is intended for those who start working with microservices and Project Reactor, and describes the main features of these technologies and what they will have to deal with and work with.
Unlike monolithic, microservice architecture is based on the allocation of small independent software services, each of which implements a separate business function. If everything is connected in a monolith (and in case of failure of one function the others can “fall off”), then microservices can provide flexibility and stability of the system. Large IT solutions can contain dozens of microservices in their architecture, and a separate independent team can work with each of them.
Example
Consider the case study from practice. The insurance company contacted us to upgrade its online application with a flexible microservice architecture. We were faced with the task of implementing cashback in the application, that is, accruing bonus points to the user for the purchase of an insurance policy.
At first glance, the task looked simple.
For each paid (extended) policy to accrue to the user a certain cashback of X% through the accounting service. The user should have access to information about the received cashback.
Upon reaching a certain total cashback, automatically transfer funds to the client through the accounting service. The payout history should be available to the user.
The main project used the Kafka message queue as a means of exchanging information between microservices, and also as the only permanent replicated information store.
When you need to implement certain functions on microservices, you always need to keep in mind the possibility of horizontal scaling. The code should work not only in a multi-threaded environment, but also in the case when several containers with a microservice are launched.
We also add that the clients of such an application are supposedly unlimited. And many of them are mobile, i.e. relatively slow.
Blocking vs Non-blocking
If you use the standard Java servlet architecture, then each request will be executed in a separate thread. This approach is generally good, especially considering the capabilities of modern servers that can handle several hundred connections at a time.
But, firstly, it is not always possible to purchase such a server. Secondly, and most importantly, something can always go wrong. For example, there will be some back-up delays or a “retry storm” situation will appear - when many users of the application, due to inaccessibility of functions, initiate repeated attempts to request from their devices. Then the number of active compounds and streams increases. In this case, cluster nodes can fall into a spiral - backup copies of threads increase the load on the server and overload the cluster. Of course, regulatory mechanisms can be built in to compensate for risks and help maintain stability during these events, but it’s clear that this is not a panacea. In addition, recovery can be quite long and risky.
Asynchronous systems work differently. In them, usually one thread corresponds to one core, and the request-response cycle is processed through events and callbacks. It turns out that where we previously “paid” for the request with a whole stream, now just another message is added.
It is clear that recovering from various back-up delays or “retry storms” is easier - processing additional messages in the queue is much easier than storing a bunch of streams. And since all requests are asynchronous, the scalability of the system is simplified. If we see a large number of messages in the queue, then we just temporarily create additional consumers.
The stability of the system is achieved due to the very nature of the asynchronous approach. First, of course, we can try to handle the exception locally. And secondly, and this is the main thing, in asynchronous software systems, components are not blocked by exception handling: an error in one component does not affect the others. Moreover, if one component fails to process the message, then the message can be processed by another component written to the same address.
Note Despite the impressive advantages of the asynchronous approach, everything has a price. Using asynchronous code, first of all, comes the complexity of development and debugging. Where we used to be able to set a breakpoint in the debug and see the entire list of calls, in the asynchronous solution, this will not work. In addition, switching to asynchronous code does not always provide an improvement in performance.
In our example, the application has many users who access from mobile devices. For this reason, we decided to use one of the asynchronous frameworks.
According to customer requirements, the project needed to be implemented in Java for the convenience of its further support. There were not so many common options for high-level abstractions, so we stopped at Project Reactor.
Work with Project Reactor
Reactor is an implementation of the Reactive Streams specification. We have already talked about this issue in sufficient detail, for example, here.
Despite the abundance of Reactor documentation, I would like to mention one of the features when working with Reactor streams. Reactor has 2 main data structures - Flux and Mono. Both of them are implementations of the Publisher interface and represent an asynchronous stream of elements, or a single asynchronous element, respectively.
Reactor streams are very similar in appearance to standard collection Java streams (java.util.stream.Stream). In Java, of course, Stream is not only and not so much a mechanism for working with collections. But it’s important to remember that Flux is not a collection.
In Java, before we start a stream from a collection, we have all its elements, we know its size, etc. Flux is better to consider as a kind of pending collection, the number of elements of which is unknown at the time of the stream.
And although we can convert Flux into a collection using standard tools, this will be a blocking operation that does not guarantee the execution of subsequent elements. As a rule, with the exception of tests, this cannot be done, since we want to minimize the number of blocking operations and the downtime of our hardware, especially for input-output operations.
Scheme of work
Let us return to our example. At the beginning of work with the software project, on the basis of the technical specifications, we determine the functional blocks that will subsequently become separate microservices.
We see that the application has 3 main processes:
Obtaining information from the outside (policies, users, payment) and accruing cashback based on it. This will be the first service - “Calculator”.
Communication with the user. The application accesses the repository in order to find the desired cashback by a specific user. This will be the second service - “Storage”.
Communication with the accounting service. Cashback accrual and payments must be made through this service. This will be the third service - “Accountant”.
So the scheme is pretty simple:
The "Calculator" calculates the cashback for each policy / user / fact of payment and sends the message in a separate queue. Services “Storage” and “Accountant” read messages from this queue. “Storage” saves the cashback and shows it to the user, and if the minimum payment threshold is reached, it initiates the transfer of funds to the user's card. "Accountant" calls an external accounting service for the physical calculation of bonuses.
Local storage organization
Of particular importance is the order of these processes. We see that our "Calculator" works on the basis of messages from external services. There may be situations when the “Calculator” on the basis of one incoming message will not be able to decide on sending. For example, he needs to check 2 external topics: policies and payment. In this case, internal storage is necessary, which we form on the basis of all external messages.
Comparing standard SQL options (PostgreSQL, MySQL) and NoSQL approach, we decided to give MongoDB preference as a local storage in this project for several reasons:
For mongoDB there is a ready-made framework for working with Project Reactor - reactive mongo.
A small number of tables and links between them.
Ease of use, there is no need to monitor the compliance of models with database tables.
And of course, we need to separate the processes of forming a local storage and making a decision about sending. How to do this if the decision to send is made on the basis of the same messages on which the internal storage is built? One of the possible options is to divide by time and start charging according to an external scheduler. We settled on this method of implementation, simple and straightforward.
Reprocessing
In order to simplify the application architecture and reduce possible risks, it is advisable to use stateless microservices without local storage. That is, regardless of what information is input, it simply goes through the stream chain.
If this is impossible for one reason or another, you can try to isolate the logic of working with state in a separate layer. In other words, put an additional level of abstraction over the logic with state. In this case, the application has a statefull segment, but it is isolated, the other parts are not state related.
Additional levels of abstraction
However, in practice this can be difficult. For example, architecture does not allow, there is no time or understanding how to do this in a particular project, and reprocessing requirements do not allow it. When you turn off and turn on the service (and reset offsets), such a service will repeatedly perform the actions already taken. That is, in our case, the “Calculator” will repeatedly discard cashback accrual messages. Moreover, even local storage does not guarantee proper operation, since it is not replicated and can be completely removed at any time along with the service.
One solution is to use a special queue of sent messages. We will read and write this queue to the local storage at the start of the service, along with all other external messages.
Other features
Another feature of Project Reactor when working with the front is that in most cases it is not enough for us to simply get any value. More often we need to get the value and then track its changes. This question is quite simple to solve with reactive mongo. The repository from the reactive mongo library has retrieval and tracking methods that will return not only the required value, but also all subsequent changes, if any.
Also pay attention to the service named "Accountant". Suppose that this service works with an external API on REST or, as in our case, on SOAP. Reprocessing requirements also apply here, and a separate history line is needed. But additional requirements for the stability of the system as a whole are also possible.
For example, what happens if the external API responds with a 500 error? In our case, we can use the standard Reactor .retryBackoff () mechanism - it will try to send the message several more times, increasing the delay between repeated messages. You can also configure the stream to catch certain errors and respond only to them.
Testing
Of course, the project does not end with the creation of work modules. We need to check its performance, in particular with the help of unit tests. For modules on Project Reactor, unit tests use StepVerifier - this is an internal component that allows you to correctly test functionality. The StepVerifier documentation is easily accessible and complete.
Integration tests in most cases involve the launch of microservices in containers, so when designing you should think about full-fledged logging. If this is not done, there is a risk that each time it takes a long time to look for the causes of the fall.
After conducting unit and integration tests, we made sure that our application is ready for asynchronous operation, horizontal scaling, resistant to unexpected shutdowns, covered with tests. In general, our project took about three weeks to develop, including debugging and customer reviews.
Conclusion
For highly loaded applications with a large number of external users, we recommend that you consider the option of asynchronous operation using a reactive approach.
Although reactive implementation does not always increase system performance, it significantly improves its scalability and stability.
Using Reactor makes it easy to implement asynchronous work, make the solution more visual and understandable for further support. At the same time, working with Project Reactor will require special attention when writing code, namely when building Flux and Mono streams, you also need to always check the documentation and conduct intermediate tests.
In this article, we examined asynchronous operation, reprocessing, calling external software services, organizing storage, testing and some other features that are important to consider in projects with microservices and Reactor. We hope that our experience was useful to you.