Using CompletableFuture in Spring Boot for Multi-Threading

I’m Arsalan Mirbozorgi , Use CompletableFuture to learn more about multi-threading in Spring Boot.

Multi-threading is similar to multitasking, except it allows the simultaneous execution of several threads rather than numerous processes at the same time. To build asynchronous, non-blocking, and multi-threaded programs, Java 8 introduced CompletableFuture.

To support asynchronous operations, Java 5 introduced the Future interface. It was impossible to combine numerous asynchronous calculations and handle all possible faults with this interface’s methods. As a future implementation, CompletableFuture may integrate several asynchronous operations, handle any faults that may occur, and more.

Now is the time to get our hands dirty and see what happens.

The following dependencies should be added to a sample Spring Boot application.

 

 <?xml version=”1.0″ encoding=”UTF-8″?>

<project xmlns=”http://maven.apache.org/POM/4.0.0″

         xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”

         xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”>

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.techshard.future</groupId>

    <artifactId>springboot-future</artifactId>

    <version>1.0-SNAPSHOT</version>

    <parent>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-parent</artifactId>

        <version>2.1.8.RELEASE</version>

        <relativePath />

    </parent>

    <properties>

        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

    </properties>

    <dependencies>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-web</artifactId>

        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-data-jpa</artifactId>

        </dependency>

        <dependency>

            <groupId>com.h2database</groupId>

            <artifactId>h2</artifactId>

            <scope>runtime</scope>

        </dependency>

        <dependency>

            <groupId>org.slf4j</groupId>

            <artifactId>slf4j-api</artifactId>

        </dependency>

        <dependency>

            <groupId>org.projectlombok</groupId>

            <artifactId>lombok</artifactId>

            <version>1.18.10</version>

            <optional>true</optional>

        </dependency>

    </dependencies>

    <build>

        <plugins>

            <plugin>

                <groupId>org.springframework.boot</groupId>

                <artifactId>spring-boot-maven-plugin</artifactId>

            </plugin>

        </plugins>

    </build>

</project>

Sample data on automobiles will be used in this study. JPA entities Car and CarRepository will be created.

 package com.techshard.future.dao.entity;

import lombok.Data;

import lombok.EqualsAndHashCode;

import javax.persistence.*;

import javax.validation.constraints.NotNull;

import java.io.Serializable;

@Data

@EqualsAndHashCode

@Entity

public class Car implements Serializable {

    private static final long serialVersionUID = 1L;

    @Id

    @Column (name = “ID”, nullable = false)

    @GeneratedValue (strategy = GenerationType.IDENTITY)

    private long id;

    @NotNull

    @Column(nullable=false)

    private String manufacturer;

    @NotNull

    @Column(nullable=false)

    private String model;

    @NotNull

    @Column(nullable=false)

    private String type;

}

package com.techshard.future.dao.repository;

import com.techshard.future.dao.entity.Car;

import org.springframework.data.jpa.repository.JpaRepository;

import org.springframework.stereotype.Repository;

@Repository

public interface CarRepository extends JpaRepository<Car, Long> {

}

Create a configuration class that will be applied to activate and configure the asynchronous method execution presently.

 package com.techshard.future;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.annotation.EnableAsync;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration

@EnableAsync

public class AsyncConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncConfiguration.class);

    @Bean (name = “taskExecutor”)

    public Executor taskExecutor() {

        LOGGER.debug(“Creating Async Task Executor”);

        final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

        executor.setCorePoolSize(2);

        executor.setMaxPoolSize(2);

        executor.setQueueCapacity(100);

        executor.setThreadNamePrefix(“CarThread-“);

        executor.initialize();

        return executor;

    }

}

Using the @EnableAsync annotation, Spring can run @Async methods on a background thread pool. Using the taskExecutor bean, you can configure things like the number of threads an application can use, the queue limit size, and so on. Whenever the server is started, Spring will specifically search for this bean. Otherwise, Spring will generate a default implementation of SimpleAsyncTaskExecutor.

We are now going to develop a service and @Async methods.

package com.techshard.future.service;

import com.techshard.future.dao.entity.Car;

import com.techshard.future.dao.repository.CarRepository;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.scheduling.annotation.Async;

import org.springframework.stereotype.Service;

import org.springframework.web.multipart.MultipartFile;

import java.io.*;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.CompletableFuture;

@Service

public class CarService {

    private static final Logger LOGGER = LoggerFactory.getLogger(CarService.class);

    @Autowired

    private CarRepository carRepository;

    @Async

    public CompletableFuture<List<Car>> saveCars(final MultipartFile file) throws Exception {

        final long start = System.currentTimeMillis();

        List<Car> cars = parseCSVFile(file);

        LOGGER.info(“Saving a list of cars of size {} records”, cars.size());

        cars = carRepository.saveAll(cars);

        LOGGER.info(“Elapsed time: {}”, (System.currentTimeMillis() – start));

        return CompletableFuture.completedFuture(cars);

    }

    private List<Car> parseCSVFile(final MultipartFile file) throws Exception {

        final List<Car> cars=new ArrayList<>();

        try {

            try (final BufferedReader br = new BufferedReader(new InputStreamReader(file.getInputStream()))) {

                String line;

                while ((line=br.readLine()) != null) {

                    final String[] data=line.split(“;”);

                    final Car car=new Car();

                    car.setManufacturer(data[0]);

                    car.setModel(data[1]);

                    car.setType(data[2]);

                    cars.add(car);

                }

                return cars;

            }

        } catch(final IOException e) {

            LOGGER.error(“Failed to parse CSV file {}”, e);

            throw new Exception(“Failed to parse CSV file {}”, e);

        }

    }

    @Async

    public CompletableFuture<List<Car>> getAllCars() {

        LOGGER.info(“Request to get a list of cars”);

        final List<Car> cars = carRepository.findAll();

        return CompletableFuture.completedFuture(cars);

    }

}

saveCar() and getAllCars() are both @Async methods (). A multipart file is accepted by the first of these, and the data it contains is parsed and stored in the database. The database is accessed using the second approach.

Both methods return a new CompletableFuture that has already been completed with the specified values.

Let’s get started by creating a Rest Controller and adding some endpoints:

 package com.techshard.future.controller;

import com.techshard.future.dao.entity.Car;

import com.techshard.future.service.CarService;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.http.HttpStatus;

import org.springframework.http.MediaType;

import org.springframework.http.ResponseEntity;

import org.springframework.web.bind.annotation.*;

import org.springframework.web.multipart.MultipartFile;

import java.io.File;

import java.util.List;

import java.util.concurrent.CompletableFuture;

import java.util.function.Function;

@RestController

@RequestMapping(“/api/car”)

public class CarController {

    private static final Logger LOGGER = LoggerFactory.getLogger(CarController.class);

    @Autowired

    private CarService carService;

    @RequestMapping (method = RequestMethod.POST, consumes={MediaType.MULTIPART_FORM_DATA_VALUE},

            produces={MediaType.APPLICATION_JSON_VALUE})

    public @ResponseBody ResponseEntity uploadFile(

            @RequestParam (value = “files”) MultipartFile[] files) {

        try {

            for(final MultipartFile file: files) {

                carService.saveCars(file);

            }

            return ResponseEntity.status(HttpStatus.CREATED).build();

        } catch(final Exception e) {

            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();

        }

    }

    @RequestMapping (method = RequestMethod.GET, consumes={MediaType.APPLICATION_JSON_VALUE},

            produces={MediaType.APPLICATION_JSON_VALUE})

    public @ResponseBody CompletableFuture<ResponseEntity> getAllCars() {

        return carService.getAllCars().<ResponseEntity>thenApply(ResponseEntity::ok)

                .exceptionally(handleGetCarFailure);

    }

    private static Function<Throwable, ResponseEntity<? extends List<Car>>> handleGetCarFailure = throwable -> {

        LOGGER.error(“Failed to read records: {}”, throwable);

        return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();

    };

}

A list of multipart files is accepted by this REST API. Data analysis is the second goal. The GET endpoint has a different return statement than the POST endpoint. A list of automobiles is being returned, and exceptions are being handled in the event of an error.

When the CompletableFuture fails, the function handleGetCarFailure is called; otherwise, the function returns a list of automobiles to the client if the CompletableFuture succeeds.

 

Testing the Software

Spring Boot must be run in order to be used. The POST endpoint should be tested once the server is up and running. The Postman tool’s sample screenshot.

In the headers section, make sure to include the Content-Type as multipart/form-data. Two threads start at the same moment, one for each file that you request. 

The GET endpoint can now be changed as follows:

 

 @RequestMapping (method = RequestMethod.GET, consumes={MediaType.APPLICATION_JSON_VALUE},

            produces={MediaType.APPLICATION_JSON_VALUE})

    public @ResponseBody ResponseEntity getAllCars() {

        try {

            CompletableFuture<List<Car>> cars1=carService.getAllCars();

            CompletableFuture<List<Car>> cars2=carService.getAllCars();

            CompletableFuture<List<Car>> cars3=carService.getAllCars();

            CompletableFuture.allOf(cars1, cars2, cars3).join();

            return ResponseEntity.status(HttpStatus.OK).build();

        } catch(final Exception e) {

            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();

        }

    }

 

The Async function is called three times in this example. The Future That Is Complete. The allOf() function will wait until all CompletableFutures have been completed, then the join() function will merge the results. Do not forget that this is only a demonstration.

GetAllCars() of the CarService class should now include a Thread.sleep(1000L). We’ve included a one-second delay for testing purposes only.

You may want to retry your GET request after restarting the application.The first two calls to the Async function have started concurrently, as shown in the screenshot. One second later, the third call began. Remember that only two threads can be used at the same time. The third request to the Async function will be made when at least one of the two threads is free.

 

Conclusion

The CompletableFuture has been used in a variety of ways throughout this article. In the comments part below, please share what you have in mind.

Please Send Email

Your message sent successfully
There has been an error