- Handling Exceptions in Project Reactor
- Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:
- 1. Overview
- 2. Maven Dependencies
- 3. Throwing Exceptions Directly in a Pipeline Operator
- 4. Handling Exceptions in the handle Operator
- 5. Handling Exceptions in the flatMap Operator
- 6. Avoiding NullPointerException
- 7. Conclusion
Handling Exceptions in Project Reactor
The Kubernetes ecosystem is huge and quite complex, so it’s easy to forget about costs when trying out all of the exciting tools.
To avoid overspending on your Kubernetes cluster, definitely have a look at the free K8s cost monitoring tool from the automation platform CAST AI. You can view your costs in real time, allocate them, calculate burn rates for projects, spot anomalies or spikes, and get insightful reports you can share with your team.
Connect your cluster and start monitoring your K8s costs right away:
We rely on other people’s code in our own work. Every day.
It might be the language you’re writing in, the framework you’re building on, or some esoteric piece of software that does one thing so well you never found the need to implement it yourself.
The problem is, of course, when things fall apart in production — debugging the implementation of a 3rd party library you have no intimate knowledge of is, to say the least, tricky.
Lightrun is a new kind of debugger.
It’s one geared specifically towards real-life production environments. Using Lightrun, you can drill down into running applications, including 3rd party dependencies, with real-time logs, snapshots, and metrics.
Learn more in this quick, 5-minute Lightrun tutorial:
Slow MySQL query performance is all too common. Of course it is. A good way to go is, naturally, a dedicated profiler that actually understands the ins and outs of MySQL.
The Jet Profiler was built for MySQL only, so it can do things like real-time query performance, focus on most used tables or most frequent queries, quickly identify performance issues and basically help you optimize your queries.
Critically, it has very minimal impact on your server’s performance, with most of the profiling work done separately — so it needs no server changes, agents or separate services.
Basically, you install the desktop application, connect to your MySQL server, hit the record button, and you’ll have results within minutes:
DbSchema is a super-flexible database designer, which can take you from designing the DB with your team all the way to safely deploying the schema.
The way it does all of that is by using a design model, a database-independent image of the schema, which can be shared in a team using GIT and compared or deployed on to any database.
And, of course, it can be heavily visual, allowing you to interact with the database using diagrams, visually compose queries, explore the data, generate random data, import data or build HTML5 database reports.
The Kubernetes ecosystem is huge and quite complex, so it’s easy to forget about costs when trying out all of the exciting tools.
To avoid overspending on your Kubernetes cluster, definitely have a look at the free K8s cost monitoring tool from the automation platform CAST AI. You can view your costs in real time, allocate them, calculate burn rates for projects, spot anomalies or spikes, and get insightful reports you can share with your team.
Connect your cluster and start monitoring your K8s costs right away:
Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:
We’re looking for a new Java technical editor to help review new articles for the site.
1. Overview
In this tutorial, we’ll look at several ways to handle exceptions in Project Reactor. Operators introduced in the code examples are defined in both the Mono and Flux classes. However, we’ll only focus on methods in the Flux class.
2. Maven Dependencies
io.projectreactor reactor-core3.4.9
3. Throwing Exceptions Directly in a Pipeline Operator
The simplest way to handle an Exception is by throwing it. If something abnormal happens during the processing of a stream element, we can throw an Exception with the throw keyword as if it were a normal method execution.
Let’s assume we need to parse a stream of Strings to Integers. If an element isn’t a numeric String, we’ll need to throw an Exception.
It’s a common practice to use the map operator for such a conversion:
Function mapper = input -> < if (input.matches("\\D")) < throw new NumberFormatException(); >else < return Integer.parseInt(input); >>; Flux inFlux = Flux.just("1", "1.5", "2"); Flux outFlux = inFlux.map(mapper);
As we can see, the operator throws an Exception if an input element is invalid. When we throw the Exception this way, Reactor catches it and signals an error downstream:
StepVerifier.create(outFlux) .expectNext(1) .expectError(NumberFormatException.class) .verify();
This solution works, but it’s not elegant. As specified in the Reactive Streams specification, rule 2.13, an operator must return normally. Reactor helped us by converting the Exception to an error signal. However, we could do better.
Essentially, reactive streams rely on the onError method to indicate a failure condition. In most cases, this condition must be triggered by an invocation of the error method on the Publisher. Using an Exception for this use case brings us back to traditional programming.
4. Handling Exceptions in the handle Operator
Similar to the map operator, we can use the handle operator to process items in a stream one by one. The difference is that Reactor provides the handle operator with an output sink, allowing us to apply more complicated transformations.
Let’s update our example from the previous section to use the handle operator:
BiConsumer> handler = (input, sink) -> < if (input.matches("\\D")) < sink.error(new NumberFormatException()); >else < sink.next(Integer.parseInt(input)); >>; Flux inFlux = Flux.just("1", "1.5", "2"); Flux outFlux = inFlux.handle(handler);
Unlike the map operator, the handle operator receives a functional consumer, called once for each element. This consumer has two parameters: an element coming from upstream and a SynchronousSink that builds an output to be sent downstream.
If the input element is a numeric String, we call the next method on the sink, providing it with the Integer converted from the input. If it isn’t a numeric String, we’ll indicate the situation by calling the error method with an Exception object.
Notice that an invocation of the error method will cancel the subscription to the upstream and invoke the onError method on the downstream. Such collaboration of error and onError is the standard way to handle Exceptions in reactive streams.
Let’s verify the output stream:
StepVerifier.create(outFlux) .expectNext(1) .expectError(NumberFormatException.class) .verify();
5. Handling Exceptions in the flatMap Operator
Another commonly used operator that supports error handling is flatMap. This operator transforms input elements into Publishers, then flattens the Publishers into a new stream. We can take advantage of these Publishers to signify an erroneous state.
Let’s try the same example using flatMap:
Function> mapper = input -> < if (input.matches("\\D")) < return Mono.error(new NumberFormatException()); >else < return Mono.just(Integer.parseInt(input)); >>; Flux inFlux = Flux.just("1", "1.5", "2"); Flux outFlux = inFlux.flatMap(mapper); StepVerifier.create(outFlux) .expectNext(1) .expectError(NumberFormatException.class) .verify();
Unsurprisingly, the result is the same as before.
Notice the only difference between handle and flatMap regarding error handling is that the handle operator calls the error method on a sink, while flatMap calls it on a Publisher.
If we’re dealing with a stream represented by a Flux object, we can also use concatMap to handle errors. This method behaves in much the same way as flatMap, but it doesn’t support asynchronous processing.
6. Avoiding NullPointerException
This section covers the handling of null references, which often cause NullPointerExceptions, a commonly encountered Exception in Java. To avoid this exception, we usually compare a variable with null and direct the execution to a different way if that variable is actually null. It’s tempting to do the same in reactive streams:
Function mapper = input -> < if (input == null) < return 0; >else < return Integer.parseInt(input); >>;
We may think that a NullPointerException won’t occur because we already handled the case when the input value is null. However, the reality tells a different story:
Flux inFlux = Flux.just("1", null, "2"); Flux outFlux = inFlux.map(mapper); StepVerifier.create(outFlux) .expectNext(1) .expectError(NullPointerException.class) .verify();
Apparently, a NullPointerException triggered an error downstream, meaning our null check didn’t work.
To understand why that happened, we need to go back to the Reactive Streams specification. Rule 2.13 of the specification says that “calling onSubscribe, onNext, onError or onComplete MUST return normally except when any provided parameter is null in which case it MUST throw a java.lang.NullPointerException to the caller”.
As required by the specification, Reactor throws a NullPointerException when a null value reaches the map function.
Therefore, there’s nothing we can do about a null value when it reaches a certain stream. We can’t handle it or convert it to a non-null value before passing it downstream. Therefore, the only way to avoid a NullPointerException is to make sure that null values won’t make it to the pipeline.
7. Conclusion
In this article, we’ve walked through Exception handling in Project Reactor. We discussed a couple of examples and clarified the process. We also covered a special case of exception that can happen when processing a reactive stream — NullPointerException.
As usual, the source code for our application is available over on GitHub.
Slow MySQL query performance is all too common. Of course it is. A good way to go is, naturally, a dedicated profiler that actually understands the ins and outs of MySQL.
The Jet Profiler was built for MySQL only, so it can do things like real-time query performance, focus on most used tables or most frequent queries, quickly identify performance issues and basically help you optimize your queries.
Critically, it has very minimal impact on your server’s performance, with most of the profiling work done separately — so it needs no server changes, agents or separate services.
Basically, you install the desktop application, connect to your MySQL server, hit the record button, and you’ll have results within minutes: