Spring Batch Remote Chunking

Introduction

If you want to cut to the chase and learn by doing with an integration test, then download the source from github (details at the bottom of the page) and take a look at the test RemoteChunkingIntegrationTest.java

Spring Batch is a fantastic framework that brings along more of the Spring dependency injection philosophy to the party so you can concentrate on writing business logic (and XML, naturally) rather than having to write boilerplate batch management code. Spring Batch allows developers to focus on domain concepts and rapidly create solutions for batch processing data in their domain. Using Spring Batch is a bit more advanced than the core Spring Framework, but once learned can reap many benefits through ease of management, monitoring, rapid job creation and scaling. You could never achieve all these more advanced features as quickly and elegantly writing your own batch framework.

This post covers some advanced Spring Batch topics and assumes basic knowledge of spring batch jobs & configuring them. If you’re completely new to Spring Batch you’ll probably find the information here overwhelming and should probably swot up on the reference documentation at the Spring Batch homepage:

http://static.springsource.org/spring-batch/reference/html/index.html

Spring Batch allows several strategies for scaling up batch processes. Each strategy has a trade-off in terms of data, resources available and suitability to those resources. This is what we’re going to cover in this post as it’s not a comprehensively documented subject on the forums or the net. The first thing to cover is types of scaling available in Spring Batch.Spring Batch allows several strategies for scaling up batch processes. Each strategy has a trade-off in terms of data, resources available and suitability to those resources. This is what we’re going to cover in this post as it’s not a comprehensively documented subject on the forums or the net. The first thing to cover is types of scaling available in Spring Batch.

Spring Batch Scaling Strategies

There are two general approaches to scaling any system to increase performance:

1.) Vertical Scaling – Faster machine with more RAM, CPUs, GHz

2.) Horizontal Scaling – Add more machines to distribute load remotely

Spring Batch has several scaling strategies that you can configure to leverage both approaches. Below are the built in strategies Spring Batch supports to help you scale your batch jobs. Be careful choosing your strategy as it depends on many variables including your data, the processing required and your systems architecture. You will need to have a good idea of what your data looks like, where it lives, how much processing it needs and if it can be read or written in parallel. All this is highly dependent on the domain and system knowledge you have within your business.

1.) Multithreaded Step – Configure multithreading at the step level. Suitable for vertical scaling. Only useful for single machines with multicore CPUs, complex to avoid thread safety issues.

2.) Parallel Step - Configure steps to execute in parallel. Suitable for vertical scaling. Must have branched steps configured.

3.) Remote Chunking – Perform chunk processing/writing on remote servers. Suitable for horizontal scaling. Assumes processing+write is more taxing than reading.

4.) Partitioning Step – Partitions reader data and distributes each segment of data to multiples threads or remote nodes. Suitable for horizontal scaling.

In this post, out of the 4 options we’re going to explore Remote Chunking, which is perhaps the less glamorous cousin of Partitioning Step as it can be a little more challenging to get up and running initially. Partition Step can be simpler to scale from a configuration standpoint, but to use it you need to know your data very well, and it’s not always possible to find a strategy to split up your data for parallel reading and writing. Remote Chunking in contrast does not require any special knowledge of the data to be read and can be implemented with less consideration but a bit more manual configuration. Also, Remote Chunking reads data in a single process which can potentially create a bottleneck. This is the trade off between the two built-in strategies which allow work to be distributed on remote machines.

A Note on Thread safety

As I’m sure we all know, any code with instance variables is not considered thread safe. ItemReaders, ItemProcessors and ItemWriters are not necessarily thread safe because their execution state can be stored in instance variables. Therefore, to be used in a multithreaded environment the read(), process() and write() methods may need to be wrapped in a thread safe object, like so:

1
2
3
4
5
6
public class TheadsafeItemReader implements ItemReader<Biscuit> {
 private ItemReader<Biscuit> readerDelegate;
   public synchronized Biscuit read() {
        return readerDeletegate.read();
 }
}

Thread safety is a challenging problem and the rather thorny issue should generally be avoided when other strategies are possible. Parallel Steps, Remote Chunking & Partitioning are strategies that mean you don’t have to worry about threading issues in your code like you do with Multithreaded Steps.

Remote Chunking

The Remote chunking scaling strategy reads all input data within a single thread on the master and sends the read items out for distributed processing/writing on any number of remote nodes. This means as a developer you don’t have to worry about ensuring thread safety in your code, and once implemented it is a reliable and simple approach to scaling your batch application. As mentioned above this would offer little or no performance gain if processing+writing were equally or less resources intensive than reading. However, this is not typically the case and it’s quite common to discover that processing+writing is considerably more taxing. If you are looking to split your input data up in to parallel sets and process/write completely in parallel then you should consider using the Partition Step strategy instead.

Below is a diagram depicting the high level interactions between the core classes and the infrastructure involved.

Opencredo Spring Batch Remote Chunking
1.) FactoryBean Replaces Writer and creates ChunkHandler on master
2.) Batch Job’s Step reads item data
3.) First chunk of item writing begins
4.) Writing delegated to substituted ChunkMessageChannelItemWriter
5.) Write items wrapped in ChunkRequest and sent to messaging gateway

1.) FactoryBean Replaces Writer and creates ChunkHandler on master
2.) Batch Job’s Step reads item data
3.) First chunk of item writing begins
4.) Writing delegated to substituted ChunkMessageChannelItemWriter
5.) Write items wrapped in ChunkRequest and sent to messaging gateway

6a.) [Asynchronous] ChunkRequest wrapped in message payload and sent to broker
7a.) [Asynchronous] ChunkProcessorChunkHandler on slave receives ChunkRequest
8a.) [Asynchronous] ItemWriter executes on slave
9a.) [Asynchronous] ChunkResponse sent back to master over broker message queue

6b.) [Asynchronous] ChunkRequest wrapped in message payload, sent to local channel
7b.) [Asynchronous] ItemWriter executes on master node & returns

Additional Module: spring-batch-integration

As part of the Spring Batch Admin project there is an additional module called ‘spring-batch-integration’. This module is essential for Remote Chunking and other remote distribution strategies. You will need to include Spring Batch Admin and this module as a dependency in your project and/or pom files to get remote chunking working out of the box.

Here’s a sample of the maven configuration to get this dependency:

1
2
3
4
5
<dependency>
  <groupId>org.springframework.batch</groupId>
    <artifactId>spring-batch-integration</artifactId>
   <version>1.2.1.RELEASE</version>
</dependency>

Side Note: If you get a chance, take a look at some of the remote scaling tests in the spring-batch-integration module as there are samples that can get you started for other scaling strategies too.

So, how does it work? Introducing: The Master Batch Node

In short, lots of Spring magic and probably some necromancy for good measure (but don’t worry I’m sure no orphan souls were harmed in the making of the Spring Batch project). You can take any vanilla Spring Batch step and add remote chunking relatively simply.

There are some existing tests in the spring-batch-integration module which demonstrate remote chunking and partitioning (see tests like RemoteChunkStepIntegrationTests.java). However, these tests all execute within a single thread. This article will show a working test with asynchronous remote slaves. There are some existing tests in the spring-batch-integration module which demonstrate remote chunking and partitioning (see tests like RemoteChunkStepIntegrationTests.java). However, these tests all execute within a single thread. This article will show a working test with asynchronous remote slaves.

Important concepts:

Master – The master node. This is the application-context/server where the batch steps are primarily configured and execution originates. The master sends chunks of read item data to slaves.
Slave – A slave node. This is the application-context/server that does the processing/writing for the aster and returns the status of finished chunk work.
Broker – The message queue for asynchronous message sending between master and slaves.

Below is a sample batch job defined on the master. The ‘reader’ and ‘writer’ beans have been omitted for brevity as I assume you already know how to define them.

1
2
3
4
5
6
7
8
9
    <step id="step">
      <tasklet>
         <chunk reader="reader" writer="writer"
                           commit-interval="5">                 
 </chunk>
     </tasklet>
    </step>
</job>

Adding remote chunking to the step configured on the master is actually very simple, you simply define two beans in the master context xml file: a RemoteChunkHandlerFactoryBean and ChunkMessageChannelItemWriter.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<bean id="chunkHandler"
      class="org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean">
 <property name="chunkWriter" ref="chunkWriter" />
 <property name="step" ref="step" />
</bean>
<bean id="chunkWriter"
      class="org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter"
      scope="step">
  <property name="messagingOperations" ref="messagingGateway" />
    <property name="replyChannel" ref="replies" />
    <property name="throttleLimit" value="50" />
  <property name="maxWaitTimeouts" value="30000" />
</bean>

ChunkHandlerChunkProcessor – (‘chunkHandler’ bean) This tongue-twisting class receives messages via a service activator containing a ChunkRequest object as the payload. It then calls the configured ItemProcessor and ItemWriter before sending back a ChunkResponse object.

RemoteChunkHandlerFactoryBean - This factory creates the ChunkHandler, but before doing so performs a brief arcane ritual to replace the existing ItemWriter configured in the step with our new ChunkMessageChannelItemWriter object.

ChunkMessageChannelItemWriter – This mouthful is substituted for the originally configured ItemWriter in the step (The factory bean above does the replacement). When called it wraps up the request to execute a write in a ChunkRequest object and sends it out over messaging infrastructure.The ChunkMessageChannelItemWriteris where all the magic remote sending happens (like a santa bean delivering presents to the slaves). Eagle eyed readers will notice the ‘messagingOperations’ and ‘replyChannel’ properties which is how the master is configured to send ChunkRequest objects over the wire to remote nodes and get responses back.

IMPORTANT NOTE: It seems you must have a service activator configured to send messages to the ‘chunkHandler’ bean on the master node. Remote chunking will not work without this service activator and the ‘chunkHandler’. Therefore, the master node’s ItemWriter will get used as a slave in addition to the remote slaves because it’s connected to the ‘requests’ messaging channel. If you don’t want to the master to perform item writing there are a couple of solutions, but the simplest will be to disconnected it from the messaging pipeline. To do this you keep the service activator, but supply the service activator with a channel that has no input to ensure the application runs correctly. See example diagrams below.

Master Node with ItemWriter configured (This will perform writes on the master node):

Spring Batch Remote Chunking Messaging Infrastructure Master Detail

Master node with ItemWriter disabled (this configuration ensures writes are only performed on remote slaves):

Spring Batch Remote Chunking Messaging Infrastructure Master Detail With Deactivated Chunkhandler

NOTE: The ‘no_requests’ channel which is disconnected from the rest of the messaging infrastructure means the ItemWriter on the master is not used.

The only thing left to configure for the master now is the messaging infrastructure.

Master Batch Integration Configuration

Next we configure the master so it can communicate with remote slaves. To add the configuration below your project will require a dependency on the spring-integration and spring-jms jars. You will also need to add the spring integration and JMS namespaces to your xml files to be able to use the custom namespace needed (see the project download sample xml for an example).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<bean id="connectionFactory"
      class="org.apache.activemq.ActiveMQConnectionFactory" >
    <property name="brokerURL">
       <value>tcp://localhost:61616</value>
    </property>
</bean>
  
<bean id="messagingGateway"
      class="org.springframework.integration.core.MessagingTemplate">
    <property name="defaultChannel" ref="requests" />
 <property name="receiveTimeout" value="5000" />
</bean> 
    
<int-jms:outbound-channel-adapter id="requests"
         connection-factory="connectionFactory" destination-name="requests">
</int-jms:outbound-channel-adapter>
    
<int-jms:inbound-channel-adapter id="jmsReplies"
         connection-factory="connectionFactory" destination-name="replies"
         channel="replies" >
 <integration:poller fixed-delay="200"></integration:poller>
</int-jms:inbound-channel-adapter>
 
<integration:channel id="replies">
    <integration:queue />
</integration:channel>
    
<task:executor id="poolExecutor" pool-size="5-10" queue-capacity="0" />

In the configuration above we create all the messaging infrastructure needed to send the ChunkRequests over an external message queue using spring integration and JMS. All outgoing requests are sent out on the ‘requests’ message queue. Once the slaves finish their processing they return the responses over the ‘replies’ message queue. The responses are then routed back to the ChunkMessageChannelItemWriter bean to have their statuses checked.

That’s all the configuration for the master, now we can go on to setting up and configuring the slave nodes.

Setting Up a Spring batch Remote Chunking Slave

If you have separate project modules for your slaves, they will also have a dependency on the ‘spring-batch-integration’ module which is part of the Spring batch Admin project (See above for maven dependencies on the master). The setup for the slaves is pretty simple which means they can easily run as a standalone application with their own spring application context files.

Below is the configuration needed to get started with slave processing/writing for this sample. In the example we’re not doing any item processing, so a dummy passthrough processor is configured.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<bean id="chunkHandler"
      class="org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler">
  <property name="chunkProcessor" >
    <bean class="org.springframework.batch.core.step.item.SimpleChunkProcessor">
      <property name="itemWriter" ref="itemWriter"/>
      <property name="itemProcessor">
        <bean class="org.springframework.batch.item.support.PassThroughItemProcessor"/>
      </property>
    </bean>
  </property>
</bean>
    
<bean id="itemWriter"
      class="com.opencredo.sandbox.gawain.springbatch.remote.chunking.TestItemWriter"/>

Pretty standard fare so far, don’t you think? Just a couple of very ordinary beans. The ChunkProcessorChunkHandler accepts ChunkRequest objects, calls the ItemWriter, and returns a ChunkResponse object back to the master.

Wiring up the Slave to the Messaging Queue

On the slave, a Spring Integration Service Activator takes messages from the broker (ultimately, via a few channels) and sends them to the ChunkHandlerChunkProcessor bean. The chunkHandler then does the processing+writing, and returns the ChunkResponse object back over the broker to the master. It’s really quite simple, eh? Below is the spring configuration for the messaging infrastructure.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" >
  <property name="brokerURL">
       <value>tcp://localhost:61616</value>
    </property>
</bean>
  
<int-jms:inbound-channel-adapter id="jmsRequests" connection-factory="connectionFactory"
         destination-name="requests" channel="requests" >
    <integration:poller fixed-delay="500"></integration:poller>
</int-jms:inbound-channel-adapter>
 
<integration:channel id="requests"/>
  
<integration:service-activator id="srvActivator" input-channel="requests"
             output-channel="replies" ref="chunkHandler" method="handleChunk"/>
<integration:channel id="replies"/>
   
<int-jms:outbound-channel-adapter id="outgoingReplies" connection-factory="connectionFactory"
         destination-name="replies" channel="replies">
</int-jms:outbound-channel-adapter>

The really handy thing about the way this is setup is that you can start as many slaves as you like for dynamic scaling. By default, the behavior of the messaging infrastructure will be what’s called the Competing Consumer Pattern in EIP lingo. As each slave polls the broker it reads a ChunkRequest message of the queue and starts processing. The next slave will read the next available ChunkRequest and so on, thus naturally load balancing the processing/writing task among an arbitrary number of nodes.

A Chunk is Distributed!

That about wraps it up. The sample code above is almost all that is needed to do to get Remote Chunking working (I’ve omitted the config to setup an in-memory broker but it is included in the project). As ever, Spring have taken out all the heavy lifting so you can focus on solutions, not problems.

To learn more about this example checkout the code on github (link at bottom of page) and the IntegrationTest (RemoteChunkingIntegrationTest) that fires off chunks to multiple asynchronous slaves for remote processing. An in memory queue is used in this test, but it can easily be commented out and replaced with a standalone activemq instance.

When you run the integration test, have a look at the logs, you should expect to see something like this with the outputs of the ItemWriters on each node:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
TestItemReader - for count 1, reading item: http://test.com/item1
TestItemReader - for count 2, reading item: http://test.com/item2
TestItemReader - for count 3, reading item: http://test.com/item3
TestItemReader - for count 4, reading item: http://test.com/item4
TestItemReader - for count 5, reading item: http://test.com/item5
TestItemReader - for count 6, reading item: http://test.com/item6
TestItemReader - for count 7, reading item: http://test.com/item7
TestItemReader - for count 8, reading item: http://test.com/item8
TestItemReader - for count 9, reading item: http://test.com/item9
TestItemReader - for count 10, reading item: http://test.com/item10
TestItemWriter - [remote-slave-2-writer] writing item: http://test.com/item1
TestItemWriter - [remote-slave-2-writer] writing item: http://test.com/item2
TestItemWriter - [remote-slave-2-writer] writing item: http://test.com/item3
TestItemWriter - [remote-slave-2-writer] writing item: http://test.com/item4
TestItemWriter - [remote-slave-2-writer] writing item: http://test.com/item5
TestItemWriter - [master-writer] writing item: http://test.com/item6
TestItemWriter - [master-writer] writing item: http://test.com/item7
TestItemWriter - [master-writer] writing item: http://test.com/item8
TestItemWriter - [master-writer] writing item: http://test.com/item9
TestItemWriter - [master-writer] writing item: http://test.com/item10
TestItemReader - for count 11, reading item: http://test.com/item11
TestItemReader - for count 12, reading item: http://test.com/item12
TestItemReader - for count 13, reading item: http://test.com/item13
TestItemReader - for count 14, reading item: http://test.com/item14
TestItemReader - for count 15, reading item: http://test.com/item15
TestItemWriter - [remote-slave-1-writer] writing item: http://test.com/item11
TestItemWriter - [remote-slave-1-writer] writing item: http://test.com/item12
TestItemWriter - [remote-slave-1-writer] writing item: http://test.com/item13
TestItemWriter - [remote-slave-1-writer] writing item: http://test.com/item14
TestItemWriter - [remote-slave-1-writer] writing item: http://test.com/item15
TestItemReader - for count 16, reading item: http://test.com/item16
TestItemReader - for count 17, reading item: http://test.com/item17
TestItemReader - for count 18, reading item: http://test.com/item18
TestItemReader - for count 19, reading item: http://test.com/item19
TestItemReader - for count 20, reading item: http://test.com/item20
TestItemWriter - [master-writer] writing item: http://test.com/item16
TestItemWriter - [master-writer] writing item: http://test.com/item17
TestItemWriter - [master-writer] writing item: http://test.com/item18
TestItemWriter - [master-writer] writing item: http://test.com/item19
TestItemWriter - [master-writer] writing item: http://test.com/item20
RemoteChunkingIntegrationTest - job finished with status: COMPLETED
RemoteChunkingIntegrationTest - slave 1 chunks written: 2
RemoteChunkingIntegrationTest - slave 2 chunks written: 3

That’s all there is to do. With this configuration you can have fully asynchronous remote chunking working rapidly. Don’t forget to take a look at the code in the sample project. Being Spring Batch there’s hardly any implementation to plough through at all, it’s mostly XML, the majority of which was documented in this post. Look for the test RemoteChunkingIntegrationTest.java.

The test uses 3 helper classes, MasterContext, BrokerContext and SlaveContext. These are just simple classes to asynchronously start the Master, Broker and Slave processes in separate threads.

Sample Source Code on Github

Check out the project from this location:

https://github.com/opencredo/spring-batch-remote-chunking-example 

Want to know more? Please get in touch… Thanks for your interest.
  • Chris Mathias

    Yeah it would be nice to see the images. Chrome/OSX

  • Fernando Bracher Beilke

    Very good artilcle! I´ve adapted the version to my needs but for some reason the MasterBatchContext never quit because I don´t receive the BatchStatus as Complete. What I´m possibly doing wrong?

OpenCredo is an expert team of Software Development consultants. We are thought leaders in Big Data, NoSQL and Cloud Computing.