Jun 262017
 

Java 8 introduced a lot of cool features, 1 of the most useful of which was the stream() method. This nifty little method lets you treat an Iterable as a stream, enabling cool things like lambdas operating over a list. Related to stream() is parallelStream(). This lets you group your stream into smaller streams that are run in, you guessed it, parallel. Specifically, your data is processed in a thread pool the size of the number of cores on your machine, minus the one running your app. That’s a handy piece of information you’re going to want to keep in mind before you start throwing this nifty little call around in your code. 

The fact that the parallelStream() runs on background threads means that you should really only be using this method with code that’s going to run on a server with a decent number of cores – and by “decent number of cores” I mean “at least 4.” At 2 cores or less, you’re basically just moving your processing to a single background thread, which is probably not what you wanted when you typed the method parallelStream() into your code editor.

You also need to be more explicit about monitoring the progress of whatever processing you’re doing. Now that your logic is running on background threads, you can’t depend on your main code block waiting until the processing is done before continuing on (this bit me in the butt not too long ago). If you need to make sure all your data processing is done (even if it’s running in parallel) before moving on, you’re going to have to monitor your thread pool to see when it finally empties out.

Here’s a simple example of why it’s important to be able to track the status of whatever you’re doing in your parallelStream() call. We had a process running that was meant to go through and batch process data for each customer. So we go through our data, group it by customer, and parallelStream() the actual data processing logic. Free little performance boost right there, right? Wrong – we were running this on a c4.large instance on AWS – which meant we had 2 CPU cores. Subtract out the core running the service in question, that meant we only had a single thread doing our data processing. Adding additional logging told us that bumping the box up to a larger size (8 cores) gave us a slight performance boost, but that our real limiting factor in performance was the RAM on our database. In this particular instance, blindly calling parallelStream() left us thinking we were handling data in multi-customer batches, and we had no visibility into what was actually running to confirm that. In that particular instance, running our (allegedly) parallel processing in serial wasn’t the bottleneck, although we did see a slight improvement when we actually enabled real parallel job running.

Here’s another little case study, where parallelStream() was actually our culprit. We have a service that reads events from a Kinesis stream, updates a couple of running totals, and writes the event data to our database. What were we doing in our main application thread? You guessed it:

public void accept(List<Event> events) {
    
    events.stream()
        .collect(Collectors.groupingBy(Event::customerIdField))
        .values()
        .parallelStream()
        .forEach(myDataProcessor::process);

}

Not only was this also running on a pithy little dual-core machine (I suppose we should get points for consistency there?), but because the actual bulk of the work, the part that was actually taking time to do, was happening in a background thread. That meant that we were constantly going back to Kinesis saying “Yeah, I’m ready for more data” regardless of whether we finished processing the last batch or not. As a result, our monitoring, which was based on how far behind the latest record we were in Kinesis, looked great, while eventually our single thread processing events in the background fell so far behind it became visible to customers. Another issue you may have noticed was that based on the code snippet above, if we ever had to bounce the app (say, as part of deploying an update), any events waiting to be processed on the background thread just got dropped and lost.

Our solution was to once again bump up the number of threads on the instance (you should be leery of using parallelStream() just because of that alone), and to use a custom thread pool for the background threads managed by parallelStream() call. It’s really just another ForkJoinPool (what parallelStream() uses by default), but by explicitly passing it in we have a reference to it. That means we can see the number of jobs in the pool, so we don’t go back to Kinesis until we’re actually done processing the batch we just read. We can also check against this pool during shutdown events, forcing the shutdown call to wait for the thread pool to drain completely (i.e. finish processing the last Kinesis event), before stopping our event consumption.

The primary moral I’ve learned from using parallelStream() is that you need to be aware of where that code is running. Specifically, you need to be running this code on a pretty beefy box otherwise you’re not really going to get anything out of it. If you are running on a machine with enough cores to make parallelStream() worthwhile , you’ll also want to make sure you have lots of logging output, so you can identify a particular thread for debugging purposes later, not to mention just general-purpose monitoring. Lastly, if you’re concerned about data integrity and making sure you don’t drop any data, you’ll want to create a shutdown hook that waits for the background thread pool to empty out. parallelStream() is still a useful method, but there’s some subtleties to it that aren’t intuitive, but can come back to hurt you if you’re not careful. Hopefully, now you’re a little bit more aware of these potential issues and are ready to use parallelStream() responsibly.

 Posted by at 2:44 PM