Integrating JDBC operations into the Reactive world.

The problem

How do we integrate the inherently blocking JDBC API to the world of reactive programming?

The solution

The preferable solution is to have a dedicated, well-tuned thread pool and isolate the blocking code there.

There are also other solutions to the problem of interacting relational dabase in a reactive way, for example, implementing the wire protocol of a specific database asynchronously(e.g. postgres-async-driver). Another solution is to use the rxjava2-jdbc project, which bridges between RxJava2 and JDBC.

Since the majority of time of the query is spent in traversing database storage, which is I/O-bound, the throughput of the database is limited by hardware, trying to make the client more reactive will not bring much performance benefit.

Code samples

Main points are as follows, and a complete sample project is in github.
1.Create a scheduler dedicated for blocking JDBC operations.

@Bean
public Scheduler jdbcScheduler() {

// As is suggested in https://wiki.postgresql.org/wiki/Number_Of_Database_Connections,
// the number of active connections = ((core_count * 2) + effective_spindle_count) for PostgreSQL    
int connectionPoolSize = 4 * 2 + 1;

return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
}

2.As for database query operations, wrap the operation in the jdbc scheduler through Flux.defer() and Flux.subscribeOn() method

public Flux<City> findAll() {
 return Flux.defer(() -> Flux.fromIterable(this.cityRepository.findAll()))
  .subscribeOn(jdbcScheduler);
}

3.As for database updating operations, make all the operaions in one transaction be performed in the same thread.
Because in Spring transaction manager and hibernate session factories, ThreadLocals are used to synchronize the transaction.

public Mono<City> add(String name, String country) {
  return Mono.fromCallable(() -> transactionTemplate.execute(status -> {
    City city = new City(name, country);
    ...
    City savedCity = cityRepository.save(city);
 
    // Notice: If there are other db updating operations in the same transaction, 
    // It is necessary to call them the sequentially in a blocking way.

    return savedCity;
  })).subscribeOn(jdbcScheduler);
}