Streaming data from a Java web service with JAX-RS is pretty straight forward as we have seen in one of my previous posts. We can utilize the java.util.stream.Stream interface and its implementations for this.

And streaming data from a database with Jdbi is also easy by returning the values as a ResultIterable.

But what if we want to group/reduce the data? Can we use the row reducer from Jdbi?

No. The row reducer will read all data rows and store the result in a map. This may consume a ton of memory and may result in killing our application by running out of memory. So no more streaming when using a row reducer.

Solution

We should

  • get a Stream instance from the ResultIterable object (returned from the SQL Object interface)
  • not call any terminal operation method on the stream which would result in reading all the data and thus again ending streaming the data
  • prepare the order of the data so that we can identify when a group level break/control break occurs

We cannot use the annotation UseRowReducer from Jdbi for this as this would result in reading all the data and terminates streaming. We need to do the row reducing by ourselves.

Row Reducer

But what exactly is row reducing?

It is exactly what is says: reducing rows

Most of the time we have some “master” data row which gets some extra columns from some “detail” data, f. e. an order (master) with many items (details).

1
2
3
4
5
6
M1a  |  M1b  |  M1c  |  D1a  | D1b
M1a | M1b | M1c | D2a | D2b
M1a | M1b | M1c | D3a | D3b
M1a | M1b | M1c | D4a | D4b
M2a | M2b | M3c | D8a | D8b
M2a | M2b | M3c | D9a | D9b

Here we have two master data entries where one master data entry (M1) has four detail entries and the other (M2) has two detail entries.

For M1 we need to reduce it to one object which has a list with 4 child objects. So when the key of the master data changes we need to make a “break”, process the master data and its details data and then start with the next master data entry.

This can be done with implementing the Jbdi RowMapper interface.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CustomerControlBreakMapper implements RowMapper<Customer> {

private Customer customer;
private Customer returnedCustomer;

private RowMapper<Customer> customerMapper = BeanMapper.of(Customer.class, "c");
private RowMapper<Email> emailMapper = BeanMapper.of(Email.class, "e");
private RowMapper<EmailType> emailTypeMapper = BeanMapper.of(EmailType.class, "et");

@Override
public Customer map(ResultSet rs, StatementContext ctx) throws SQLException {
if (customer == null || !customer.getId().equals(rs.getInt("c_id"))) {
returnedCustomer = customer;

customer = customerMapper.map(rs, ctx);
} else {
returnedCustomer = null;
}

Email email = emailMapper.map(rs, ctx);
email.setEmailType(emailTypeMapper.map(rs, ctx));
customer.addEmail(email);

return returnedCustomer;
}

}

With this implementation we get a Customer object with its e-mail addresses returned on each control break. On any other row we just get null (which we can later filter out with the java.util.stream.Stream::filter method). So in the end we just get our customer objects (with the e-mail address objects) in the stream.

Putting it all together

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public Pair<Stream<Customer>, Handle> list() {

// get the database connection
Handle handle = jdbi.open();

CustomerControlBreakMapper mapper = new CustomerControlBreakMapper();
handle.registerRowMapper(mapper);

// use SQLObjects for executing the SQL
CustomerDao dao = handle.attach(CustomerDao.class);

// get the stream from the ResultIterable
Stream<Customer> customerStream = dao.list().stream();

return Pair.of(stream, handle);

}

That looks pretty manageable but …

Hmmm … the last group of data is missing in the result!

As with any control break logic we need to adjust the code a bit because there is no control break after the last group of data rows.

The Final Group

The code is almost complete as it is because we already have processed every data row and have the last customer object with all the e-mail addresses. It just isn’t returned as there is no control break after the last row.

The variable customer in the CustomerControlBreakMapper class already holds the final customer object ready to be returned. We just need to get it somehow.

For that we introduce a new interface:

1
2
3
4
5
6
7
8
9
10
11
12
import java.util.Objects;
import java.util.stream.Stream;

public interface ControlBreakSupport<T> {

T getFinalGroup();

default Stream<T> stream() {
return Stream.generate(this::getFinalGroup).limit(1).filter(Objects::nonNull);
}

}

This interface lets you create a new stream of the final group (customer) object which we can concat with the Stream instance we get from the ResultIterable.

Let’s do it!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class CustomerControlBreakMapper implements RowMapper<Customer>, ControlBreakSupport<Customer> {

private Customer customer;
private Customer returnedCustomer;

private RowMapper<Customer> customerMapper = BeanMapper.of(Customer.class, "c");
private RowMapper<Email> emailMapper = BeanMapper.of(Email.class, "e");
private RowMapper<EmailType> emailTypeMapper = BeanMapper.of(EmailType.class, "et");

@Override
public Customer getFinalGroup() {
return customer;
}

@Override
public Customer map(ResultSet rs, StatementContext ctx) throws SQLException {
if (customer == null || !customer.getId().equals(rs.getInt("c_id"))) {
returnedCustomer = customer;

customer = customerMapper.map(rs, ctx);
} else {
returnedCustomer = null;
}

Email email = emailMapper.map(rs, ctx);
email.setEmailType(emailTypeMapper.map(rs, ctx));
customer.addEmail(email);

return returnedCustomer;
}

}

Not much has changed but now we can concat the two streams and will get all the customer objects from the resulting stream.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public Pair<Stream<Customer>, Handle> list() {

// get the database connection
Handle handle = jdbi.open();

CustomerControlBreakMapper mapper = new CustomerControlBreakMapper();
handle.registerRowMapper(mapper);

// using SQLObjects for executing the SQL
CustomerDao dao = handle.attach(CustomerDao.class);

// get the stream from the ResultIterable
Stream<Customer> customerStream = dao.list().stream();

// get the stream with the final customer
Stream<Customer> finalGroupStream = mapper.stream();

// concat those two streams to one stream
Stream<Customer> stream = Stream.concat(customerStream, finalGroupStream).filter(Objects::nonNull);

return Pair.of(stream, handle);

}

Wrap Up

Streaming doesn’t exclude grouping if done right. And the result is pretty manageable with small classes and interfaces.

If you found another solution to the grouping / row reducing problem I would be really interested in hearing it.

Happy streaming!

Mihael