If the exception is not handled the KStream will be shut down (the default behavior). This will result in a runtime exception which will be thrown by the application code. So let us assume that one of the messages which gets published on the “raw-order-topic” has a credit card number as “magic”. This function throws an InvalidCreditCardException if the credit card number is not in a valid format. So let us consider that we have developed the credit card masking function.
You can find the complete code below link: Github Link for the Project. The focus of this article will be on dealing with User-defined exceptions, therefore I will not go into the details of the KStream binder and the implementation for masking the credit card.
Once your Kafka setup is up I would suggest you use Offset Explorer to integrate with your Kafka topics (you can uninstall it if you do not want to purchase a license for it). I would suggest you follow the Single Node Setup to bootstrap a Kafka cluster in your local system. This guide could help you to quickly get started with it. We have been tasked to read the data from the topic “raw-order-topic” to mask the credit card info present in the order payload and then publish it to other topics.įor setting up your development environment you can run Kafka in a docker container. This payload will contain the details of the order along with the credit card information using which the payment was made. Whenever an order is placed at any AlphaMart store, order details are pushed to a Kafka topic “raw-order-topic”. I would try to explain a few ways in which we could handle user-defined exceptions in KStream binder applications.įor the sake of this article, I will consider a very simple and hypothetical use case: Order Processing. When it comes to handling User-Defined Exceptions in the KStream binder app, we sometimes struggle with how should we handle these. Therefore there is a high probability that you may end up using Spring Cloud Streams Kafka Streams Binder (or we can use KStream Binder). Kafka is one of the most famous message brokers out there. When you try to invoke the terminal operation again, you get a is one of the most widely used Java Frameworks to develop Cloud-Native Applications. Stream.forEach(System.out::println) // stream is already consumed Stream.forEach(System.out::println) //invoke terminal operation You need to get a new Stream from your source to do the same action.įor example lets get a Stream and hold its reference in a variable: Stream stream = Stream.of("foo", "bar")
Their source and the computational operations which will be performedīut once you have used a terminal operation on the Stream the Stream is consumed and cannot be used again. Streams do not provide a means to directly access or manipulate theirĮlements, and are instead concerned with declaratively describing Collections are primarily concerned with theĮfficient management of, and access to, their elements. When the terminal operation is initiated, and source elements areĬollections and streams, while bearing some superficial similarities, Streams are lazy computation on the source data is only performed So in your code the Stream is a source of String data and when you do stream operations and have a terminal operation then that stream pipeline would be evaluated and is said to be consumed, till that time no memory is used to store the data from the stream source. Streams are lazily evaluated they are not like a Collection which act as a holder of data.