This project is an exploration of what a Java API for relational database access with Reactive Streams might look like. It uses Project Reactor.
Milestone artifacts (library, source, and javadoc) can be found in Maven repositories.
<dependency>
<groupId>org.r2dbc-dao</groupId>
<artifactId>r2dbc-dao</artifactId>
<version>1.0.0</version>
</dependency>
Visit https://www.r2dbc-dao.org for updates and resources.
A quick example of configuration and execution would look like:
public final class SomeEntityDao {
private final R2dbcDao dao;
private final BiFunction<Row, RowMetadata, SomeEntity> mapper = (row, meta) -> {
SomeEntity someEntity = new SomeEntity();
someEntity.setId(row.get("id", Long.class));
someEntity.setSvalue(row.get("svalue", String.class));
return someEntity;
};
public SomeEntityDao() {
ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(DRIVER, H2_DRIVER)
.option(PASSWORD, "")
.option(URL, "mem:test;DB_CLOSE_DELAY=-1")
.option(USER, "sa")
.build());
ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactory)
.maxIdleTime(Duration.ofMinutes(30))
.initialSize(2)
.maxSize(10)
.build();
this.dao = new R2dbcDao(new ConnectionPool(configuration));
}
public Flux<Long> createTable() {
String sql = "CREATE TABLE IF NOT EXISTS some_entity (id IDENTITY PRIMARY KEY, svalue VARCHAR(255))";
return dao.execute(sql);
}
// -----------------------------------------------------------------------
// Transactional Business Logic
// -----------------------------------------------------------------------
public Mono<SomeEntity> update(SomeEntity payload) {
return dao.inTransaction(IsolationLevel.READ_COMMITTED, conn ->
findById(conn, payload.getId())
.switchIfEmpty(Mono.error(new IllegalArgumentException("Entity not found")))
.flatMap(existing -> {
SomeEntity merged = payload.merge(existing);
// We must allow the stream to complete for the transaction to commit.
return updateRow(conn, merged).thenReturn(merged);
})
).single(); // single() to ensure commit execution
}
// -----------------------------------------------------------------------
// Composable Helpers
// -----------------------------------------------------------------------
private Mono<SomeEntity> findById(Connection conn, Long id) {
return dao.select(conn, "SELECT id, svalue FROM some_entity WHERE id = $1", mapper, id)
.next();
}
private Mono<Long> updateRow(Connection conn, SomeEntity entity) {
return dao.execute(conn,
"UPDATE some_entity SET svalue = $1 WHERE id = $2",
entity.getSvalue(),
entity.getId()
).next();
}
private Mono<SomeEntity> save(Connection conn, SomeEntity entity) {
return dao.batch(conn,
c -> c.createStatement("INSERT INTO some_entity (svalue) VALUES ($1)").returnGeneratedValues("id"),
Collections.singletonList(entity),
(stmt, e) -> stmt.bind("$1", e.getSvalue()),
(row, meta) -> row.get("id", Long.class)
)
.next()
.map(id -> {
entity.setId(id);
return entity;
});
}
// -----------------------------------------------------------------------
// Public Facades
// -----------------------------------------------------------------------
public Mono<SomeEntity> save(SomeEntity entity) {
return dao.withConnection(conn -> save(conn, entity)).next();
}
public Flux<SomeEntity> saveAll(List<SomeEntity> entities) {
if (entities.isEmpty()) return Flux.empty();
return dao.batch(
conn -> conn.createStatement("INSERT INTO some_entity (svalue) VALUES ($1)").returnGeneratedValues("id"),
entities,
(stmt, entity) -> stmt.bind("$1", entity.getSvalue()),
(row, meta) -> row.get("id", Long.class)
)
.zipWithIterable(entities, (id, original) -> {
original.setId(id);
return original;
});
}
public Mono<SomeEntity> findById(Long id) {
return dao.select("SELECT id, svalue FROM some_entity WHERE id = $1", mapper, id).next();
}
public Flux<SomeEntity> findAll() {
return dao.select("SELECT id, svalue FROM some_entity", mapper);
}
public Mono<Void> deleteById(Long id) {
return dao.execute("DELETE FROM some_entity WHERE id = $1", id).then();
}
}
This project is released under version 2.0 of the Apache License.