Spring Batch Deep Dive
Spring Batch is the de-facto standard for batch processing on the JVM. It provides a robust, scalable framework for reading, processing, and writing large volumes of data with built-in support for transaction management, job restart, skip/retry logic, and resource management.
1. Core Architecture
Spring Batch separates concerns into three layers: Application (your business logic), Batch Core (runtime classes for launching and controlling jobs), and Batch Infrastructure (readers, writers, retry templates).
+-------------------------------------------------------------+
| Application Layer |
| (Your Jobs, Steps, Readers, Processors, Writers) |
+-------------------------------------------------------------+
| Batch Core Layer |
| (Job, Step, JobLauncher, JobRepository, StepExecution) |
+-------------------------------------------------------------+
| Batch Infrastructure |
| (ItemReader, ItemWriter, RetryTemplate, RepeatTemplate) |
+-------------------------------------------------------------+1.1 Job and Step Model
A Job is a container for Steps. Each Step encapsulates an independent, sequential phase of batch processing.
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
public class BatchJobConfig {
@Bean
public Job importCustomerJob(JobRepository jobRepository,
Step validateStep,
Step importStep,
Step reportStep) {
return new JobBuilder("importCustomerJob", jobRepository)
.start(validateStep)
.next(importStep)
.next(reportStep)
.build();
}
@Bean
public Step validateStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager) {
return new StepBuilder("validateStep", jobRepository)
.tasklet((contribution, chunkContext) -> {
System.out.println("Validating input files...");
// Validation logic here
return RepeatStatus.FINISHED;
}, transactionManager)
.build();
}
}1.2 JobInstance and JobExecution
A JobInstance represents a logical run of a job (identified by job name + job parameters). A JobExecution represents a single attempt to run that instance. If a job fails and is restarted with the same parameters, it creates a new JobExecution under the same JobInstance.
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
@Service
public class JobTriggerService {
private final JobLauncher jobLauncher;
private final Job importCustomerJob;
public JobTriggerService(JobLauncher jobLauncher, Job importCustomerJob) {
this.jobLauncher = jobLauncher;
this.importCustomerJob = importCustomerJob;
}
public void triggerJob(String fileName) throws Exception {
JobParameters params = new JobParametersBuilder()
.addString("inputFile", fileName)
.addLocalDateTime("runTime", LocalDateTime.now())
.toJobParameters();
JobExecution execution = jobLauncher.run(importCustomerJob, params);
System.out.println("Job Status: " + execution.getStatus());
System.out.println("Job Exit: " + execution.getExitStatus());
}
}2. Chunk-Oriented Processing
The chunk model is the heart of Spring Batch. It reads items one at a time, processes them, and writes them in chunks (batches). Each chunk is wrapped in a transaction.
Read Item 1 -> Process Item 1
Read Item 2 -> Process Item 2
Read Item 3 -> Process Item 3
... (chunk-size items)
Write [Item1, Item2, Item3, ...] <-- single transaction2.1 Basic Chunk Step
@Bean
public Step importStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
ItemReader<CustomerCsv> reader,
ItemProcessor<CustomerCsv, Customer> processor,
ItemWriter<Customer> writer) {
return new StepBuilder("importStep", jobRepository)
.<CustomerCsv, Customer>chunk(100, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant()
.skipLimit(10)
.skip(FlatFileParseException.class)
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.listener(new ChunkLogListener())
.build();
}2.2 Chunk Size Tuning
Chunk size directly affects performance and memory usage. Larger chunks reduce transaction overhead but increase memory consumption and rollback cost on failure.
// Small chunk — good for complex processing, lower memory
.<Input, Output>chunk(10, transactionManager)
// Medium chunk — balanced for most ETL workloads
.<Input, Output>chunk(100, transactionManager)
// Large chunk — maximize throughput for simple transformations
.<Input, Output>chunk(1000, transactionManager)
// Dynamic chunk size based on CompletionPolicy
@Bean
public Step dynamicChunkStep(JobRepository jobRepository,
PlatformTransactionManager txManager) {
return new StepBuilder("dynamicChunkStep", jobRepository)
.<String, String>chunk(new TimeoutTerminationPolicy(2000), txManager)
.reader(itemReader())
.writer(itemWriter())
.build();
}3. Tasklet Processing
Tasklets are for steps that don't fit the read-process-write pattern: file cleanup, stored procedure calls, sending notifications.
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
@Component
public class FileDeletionTasklet implements Tasklet {
private final String directory;
public FileDeletionTasklet(@Value("${batch.temp.dir}") String directory) {
this.directory = directory;
}
@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
Path dir = Path.of(directory);
try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir, "*.tmp")) {
int count = 0;
for (Path file : stream) {
Files.delete(file);
count++;
}
contribution.incrementWriteCount(count);
System.out.println("Deleted " + count + " temp files");
}
return RepeatStatus.FINISHED;
}
}
@Bean
public Step cleanupStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
FileDeletionTasklet tasklet) {
return new StepBuilder("cleanupStep", jobRepository)
.tasklet(tasklet, transactionManager)
.build();
}4. ItemReader Implementations
4.1 FlatFileItemReader (CSV/TSV)
@Bean
public FlatFileItemReader<CustomerCsv> csvReader(
@Value("#{jobParameters['inputFile']}") String inputFile) {
return new FlatFileItemReaderBuilder<CustomerCsv>()
.name("customerCsvReader")
.resource(new FileSystemResource(inputFile))
.linesToSkip(1) // skip header
.delimited()
.delimiter(",")
.names("id", "firstName", "lastName", "email", "balance")
.fieldSetMapper(fieldSet -> {
CustomerCsv c = new CustomerCsv();
c.setId(fieldSet.readLong("id"));
c.setFirstName(fieldSet.readString("firstName"));
c.setLastName(fieldSet.readString("lastName"));
c.setEmail(fieldSet.readString("email"));
c.setBalance(fieldSet.readBigDecimal("balance"));
return c;
})
.build();
}4.2 JdbcCursorItemReader
@Bean
public JdbcCursorItemReader<Order> jdbcCursorReader(DataSource dataSource) {
return new JdbcCursorItemReaderBuilder<Order>()
.name("orderJdbcReader")
.dataSource(dataSource)
.sql("""
SELECT o.id, o.customer_id, o.total_amount, o.status, o.created_at
FROM orders o
WHERE o.status = 'PENDING'
AND o.created_at >= CURRENT_DATE - INTERVAL '30 days'
ORDER BY o.created_at
""")
.rowMapper((rs, rowNum) -> {
Order order = new Order();
order.setId(rs.getLong("id"));
order.setCustomerId(rs.getLong("customer_id"));
order.setTotalAmount(rs.getBigDecimal("total_amount"));
order.setStatus(rs.getString("status"));
order.setCreatedAt(rs.getTimestamp("created_at").toLocalDateTime());
return order;
})
.fetchSize(500) // JDBC fetch size for streaming
.saveState(true) // enable restart
.build();
}4.3 JdbcPagingItemReader
@Bean
public JdbcPagingItemReader<Transaction> jdbcPagingReader(DataSource dataSource) {
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);
PostgresPagingQueryProvider queryProvider = new PostgresPagingQueryProvider();
queryProvider.setSelectClause("id, account_id, amount, type, created_at");
queryProvider.setFromClause("transactions");
queryProvider.setWhereClause("type = :type AND created_at >= :startDate");
queryProvider.setSortKeys(sortKeys);
Map<String, Object> params = new HashMap<>();
params.put("type", "DEBIT");
params.put("startDate", LocalDate.now().minusDays(7));
return new JdbcPagingItemReaderBuilder<Transaction>()
.name("transactionPagingReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(params)
.pageSize(200)
.rowMapper(new BeanPropertyRowMapper<>(Transaction.class))
.build();
}4.4 JPA ItemReader
@Bean
public JpaPagingItemReader<Product> jpaReader(EntityManagerFactory emf) {
return new JpaPagingItemReaderBuilder<Product>()
.name("productJpaReader")
.entityManagerFactory(emf)
.queryString("SELECT p FROM Product p WHERE p.active = true ORDER BY p.id")
.pageSize(100)
.build();
}
// With named query and parameters
@Bean
public JpaPagingItemReader<Invoice> invoiceReader(EntityManagerFactory emf) {
Map<String, Object> params = new HashMap<>();
params.put("status", InvoiceStatus.UNPAID);
params.put("dueDate", LocalDate.now());
return new JpaPagingItemReaderBuilder<Invoice>()
.name("invoiceJpaReader")
.entityManagerFactory(emf)
.queryString("SELECT i FROM Invoice i WHERE i.status = :status AND i.dueDate <= :dueDate")
.parameterValues(params)
.pageSize(50)
.build();
}4.5 Kafka ItemReader
@Bean
public KafkaItemReader<String, OrderEvent> kafkaReader(
ConsumerFactory<String, OrderEvent> consumerFactory) {
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch-consumer");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
return new KafkaItemReaderBuilder<String, OrderEvent>()
.name("orderEventKafkaReader")
.consumerProperties(props)
.topic("order-events")
.partitions(0, 1, 2, 3)
.pollTimeout(Duration.ofSeconds(5))
.saveState(true)
.build();
}4.6 JSON ItemReader
@Bean
public JsonItemReader<Employee> jsonReader() {
return new JsonItemReaderBuilder<Employee>()
.name("employeeJsonReader")
.resource(new ClassPathResource("data/employees.json"))
.jsonObjectReader(new JacksonJsonObjectReader<>(Employee.class))
.build();
}5. ItemProcessor Chains
5.1 Single Processor
@Component
public class CustomerValidationProcessor
implements ItemProcessor<CustomerCsv, Customer> {
private final CustomerRepository repository;
public CustomerValidationProcessor(CustomerRepository repository) {
this.repository = repository;
}
@Override
public Customer process(CustomerCsv item) throws Exception {
// Return null to skip/filter items
if (item.getEmail() == null || !item.getEmail().contains("@")) {
return null; // filtered out
}
// Check for duplicates
if (repository.existsByEmail(item.getEmail())) {
return null; // skip duplicate
}
// Transform CSV DTO to entity
Customer customer = new Customer();
customer.setFirstName(item.getFirstName().trim());
customer.setLastName(item.getLastName().trim());
customer.setEmail(item.getEmail().toLowerCase().trim());
customer.setBalance(item.getBalance());
customer.setCreatedAt(LocalDateTime.now());
return customer;
}
}5.2 Composite Processor (Chain)
@Bean
public CompositeItemProcessor<RawTransaction, EnrichedTransaction> compositeProcessor() {
CompositeItemProcessor<RawTransaction, EnrichedTransaction> composite =
new CompositeItemProcessor<>();
composite.setDelegates(List.of(
validationProcessor(),
enrichmentProcessor(),
classificationProcessor()
));
return composite;
}
@Bean
public ItemProcessor<RawTransaction, ValidatedTransaction> validationProcessor() {
return item -> {
if (item.getAmount() == null || item.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
return null; // filter invalid
}
ValidatedTransaction validated = new ValidatedTransaction();
BeanUtils.copyProperties(item, validated);
validated.setValidatedAt(Instant.now());
return validated;
};
}
@Bean
public ItemProcessor<ValidatedTransaction, EnrichedTransaction> enrichmentProcessor() {
return item -> {
EnrichedTransaction enriched = new EnrichedTransaction();
BeanUtils.copyProperties(item, enriched);
enriched.setCurrency(lookupCurrency(item.getAccountId()));
enriched.setCategory(categorize(item.getMerchant()));
return enriched;
};
}
@Bean
public ItemProcessor<EnrichedTransaction, EnrichedTransaction> classificationProcessor() {
return item -> {
if (item.getAmount().compareTo(new BigDecimal("10000")) > 0) {
item.setRiskLevel(RiskLevel.HIGH);
item.setRequiresReview(true);
} else {
item.setRiskLevel(RiskLevel.NORMAL);
item.setRequiresReview(false);
}
return item;
};
}6. ItemWriter Implementations
6.1 JPA Writer
@Bean
public JpaItemWriter<Customer> jpaWriter(EntityManagerFactory emf) {
JpaItemWriter<Customer> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(emf);
writer.setUsePersist(true); // use persist instead of merge
return writer;
}6.2 JDBC Batch Writer
@Bean
public JdbcBatchItemWriter<Customer> jdbcWriter(DataSource dataSource) {
return new JdbcBatchItemWriterBuilder<Customer>()
.dataSource(dataSource)
.sql("""
INSERT INTO customers (first_name, last_name, email, balance, created_at)
VALUES (:firstName, :lastName, :email, :balance, :createdAt)
ON CONFLICT (email) DO UPDATE SET
balance = EXCLUDED.balance,
updated_at = NOW()
""")
.beanMapped()
.build();
}6.3 Flat File Writer
@Bean
public FlatFileItemWriter<ReportLine> csvWriter() {
return new FlatFileItemWriterBuilder<ReportLine>()
.name("reportCsvWriter")
.resource(new FileSystemResource("output/report.csv"))
.headerCallback(writer -> writer.write("ID,Name,Amount,Status,Date"))
.delimited()
.delimiter(",")
.names("id", "name", "amount", "status", "date")
.footerCallback(writer -> writer.write("Generated: " + LocalDateTime.now()))
.build();
}6.4 Composite Writer (Multiple Destinations)
@Bean
public CompositeItemWriter<ProcessedOrder> compositeWriter(
JpaItemWriter<ProcessedOrder> dbWriter,
FlatFileItemWriter<ProcessedOrder> fileWriter,
KafkaItemWriter<String, ProcessedOrder> kafkaWriter) {
CompositeItemWriter<ProcessedOrder> composite = new CompositeItemWriter<>();
composite.setDelegates(List.of(dbWriter, fileWriter, kafkaWriter));
return composite;
}
@Bean
public KafkaItemWriter<String, ProcessedOrder> kafkaWriter(KafkaTemplate<String, ProcessedOrder> template) {
KafkaItemWriter<String, ProcessedOrder> writer = new KafkaItemWriter<>();
writer.setKafkaTemplate(template);
writer.setItemKeyMapper(order -> String.valueOf(order.getId()));
writer.setDelete(false);
return writer;
}6.5 REST API Writer
@Component
public class RestApiItemWriter implements ItemWriter<Customer> {
private final RestClient restClient;
public RestApiItemWriter(RestClient.Builder builder) {
this.restClient = builder
.baseUrl("https://api.crm.example.com")
.defaultHeader("Authorization", "Bearer " + System.getenv("CRM_TOKEN"))
.build();
}
@Override
public void write(Chunk<? extends Customer> chunk) throws Exception {
for (Customer customer : chunk) {
restClient.post()
.uri("/api/v1/customers")
.contentType(MediaType.APPLICATION_JSON)
.body(customer)
.retrieve()
.toBodilessEntity();
}
}
}7. Partitioning for Parallel Processing
Partitioning splits a Step into multiple sub-steps that run in parallel. Each partition processes a subset of the data.
7.1 Column-Range Partitioner
@Component
public class ColumnRangePartitioner implements Partitioner {
private final JdbcTemplate jdbcTemplate;
public ColumnRangePartitioner(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Long min = jdbcTemplate.queryForObject("SELECT MIN(id) FROM orders", Long.class);
Long max = jdbcTemplate.queryForObject("SELECT MAX(id) FROM orders", Long.class);
if (min == null || max == null) {
return Map.of();
}
long range = (max - min) / gridSize + 1;
Map<String, ExecutionContext> partitions = new HashMap<>();
long start = min;
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putLong("minId", start);
context.putLong("maxId", Math.min(start + range - 1, max));
context.putString("partitionName", "partition" + i);
partitions.put("partition" + i, context);
start += range;
}
return partitions;
}
}7.2 Partitioned Step Configuration
@Bean
public Step partitionedImportStep(JobRepository jobRepository,
ColumnRangePartitioner partitioner,
Step workerStep) {
return new StepBuilder("partitionedImportStep", jobRepository)
.partitioner("workerStep", partitioner)
.step(workerStep)
.gridSize(8)
.taskExecutor(batchTaskExecutor())
.build();
}
@Bean
public Step workerStep(JobRepository jobRepository,
PlatformTransactionManager txManager,
DataSource dataSource) {
JdbcPagingItemReader<Order> reader = new JdbcPagingItemReaderBuilder<Order>()
.name("partitionedOrderReader")
.dataSource(dataSource)
.selectClause("id, customer_id, total_amount, status")
.fromClause("orders")
.whereClause("id >= :minId AND id <= :maxId")
.sortKeys(Map.of("id", Order.ASCENDING))
.pageSize(200)
.rowMapper(new BeanPropertyRowMapper<>(Order.class))
.build();
return new StepBuilder("workerStep", jobRepository)
.<Order, ProcessedOrder>chunk(100, txManager)
.reader(reader)
.processor(orderProcessor())
.writer(orderWriter())
.build();
}
@Bean
public TaskExecutor batchTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(25);
executor.setThreadNamePrefix("batch-partition-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}7.3 Multi-Threaded Step (Simpler Alternative)
@Bean
public Step multiThreadedStep(JobRepository jobRepository,
PlatformTransactionManager txManager) {
return new StepBuilder("multiThreadedStep", jobRepository)
.<Input, Output>chunk(100, txManager)
.reader(synchronizedReader()) // must be thread-safe
.processor(processor())
.writer(writer())
.taskExecutor(batchTaskExecutor())
.throttleLimit(8)
.build();
}
@Bean
public SynchronizedItemStreamReader<Input> synchronizedReader() {
SynchronizedItemStreamReader<Input> syncReader = new SynchronizedItemStreamReader<>();
syncReader.setDelegate(actualReader());
return syncReader;
}8. Skip and Retry Policies
8.1 Skip Policy
@Bean
public Step faultTolerantStep(JobRepository jobRepository,
PlatformTransactionManager txManager) {
return new StepBuilder("faultTolerantStep", jobRepository)
.<RawRecord, ProcessedRecord>chunk(100, txManager)
.reader(reader())
.processor(processor())
.writer(writer())
.faultTolerant()
// Skip configuration
.skipLimit(50)
.skip(FlatFileParseException.class)
.skip(ValidationException.class)
.noSkip(DatabaseUnavailableException.class)
// Retry configuration
.retryLimit(3)
.retry(DeadlockLoserDataAccessException.class)
.retry(OptimisticLockingFailureException.class)
.noRetry(ValidationException.class)
// Listeners for monitoring
.listener(skipListener())
.listener(retryListener())
.build();
}8.2 Custom Skip Policy
public class FileErrorSkipPolicy implements SkipPolicy {
private static final int MAX_SKIP = 100;
private final AtomicInteger skipCount = new AtomicInteger(0);
@Override
public boolean shouldSkip(Throwable t, long skipCount) throws SkipLimitExceededException {
if (t instanceof FlatFileParseException parseEx) {
if (this.skipCount.incrementAndGet() > MAX_SKIP) {
throw new SkipLimitExceededException(MAX_SKIP, t);
}
// Log the problematic line
System.err.println("Skipping line " + parseEx.getLineNumber()
+ ": " + parseEx.getInput());
return true;
}
return false; // don't skip other exceptions
}
}8.3 Skip Listener for Auditing
@Component
public class BatchSkipListener implements SkipListener<RawRecord, ProcessedRecord> {
private final SkipRecordRepository skipRepo;
public BatchSkipListener(SkipRecordRepository skipRepo) {
this.skipRepo = skipRepo;
}
@Override
public void onSkipInRead(Throwable t) {
skipRepo.save(new SkipRecord("READ", t.getMessage(), Instant.now()));
}
@Override
public void onSkipInProcess(RawRecord item, Throwable t) {
skipRepo.save(new SkipRecord("PROCESS", item.toString(),
t.getMessage(), Instant.now()));
}
@Override
public void onSkipInWrite(ProcessedRecord item, Throwable t) {
skipRepo.save(new SkipRecord("WRITE", item.toString(),
t.getMessage(), Instant.now()));
}
}9. JobRepository
The JobRepository persists metadata about job executions, step executions, and execution contexts. Spring Batch requires a relational database for this metadata.
9.1 Configuration
@Configuration
public class BatchInfraConfig {
@Bean
public JobRepository jobRepository(DataSource dataSource,
PlatformTransactionManager txManager) throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(dataSource);
factory.setTransactionManager(txManager);
factory.setTablePrefix("BATCH_"); // default table prefix
factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
factory.setMaxVarCharLength(2500);
factory.afterPropertiesSet();
return factory.getObject();
}
// For development/testing only — in-memory repository
@Bean
@Profile("test")
public JobRepository inMemoryJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("/org/springframework/batch/core/schema-h2.sql")
.build());
factory.setTransactionManager(new ResourcelessTransactionManager());
factory.afterPropertiesSet();
return factory.getObject();
}
}9.2 Schema Tables
-- Core tables created by Spring Batch schema scripts
-- BATCH_JOB_INSTANCE — one row per unique job name + parameters combination
-- BATCH_JOB_EXECUTION — one row per attempt to run a job instance
-- BATCH_JOB_EXECUTION_PARAMS — job parameters for each execution
-- BATCH_STEP_EXECUTION — one row per step attempt
-- BATCH_STEP_EXECUTION_CONTEXT — serialized execution context for restart
-- BATCH_JOB_EXECUTION_CONTEXT — serialized job-level execution context
-- Useful queries for monitoring
SELECT je.JOB_EXECUTION_ID, ji.JOB_NAME, je.STATUS, je.EXIT_CODE,
je.START_TIME, je.END_TIME,
TIMESTAMPDIFF(SECOND, je.START_TIME, je.END_TIME) AS duration_seconds
FROM BATCH_JOB_EXECUTION je
JOIN BATCH_JOB_INSTANCE ji ON je.JOB_INSTANCE_ID = ji.JOB_INSTANCE_ID
WHERE je.START_TIME >= CURRENT_DATE
ORDER BY je.START_TIME DESC;
-- Failed step details
SELECT se.STEP_NAME, se.STATUS, se.READ_COUNT, se.WRITE_COUNT,
se.SKIP_COUNT, se.ROLLBACK_COUNT, se.EXIT_MESSAGE
FROM BATCH_STEP_EXECUTION se
WHERE se.JOB_EXECUTION_ID = ?
AND se.STATUS = 'FAILED';10. Conditional Flow
@Bean
public Job conditionalJob(JobRepository jobRepository,
Step extractStep,
Step transformStep,
Step loadStep,
Step errorStep,
Step notifyStep) {
return new JobBuilder("conditionalJob", jobRepository)
.start(extractStep)
.on("FAILED").to(errorStep)
.on("*").to(transformStep) // any other status
.from(transformStep)
.on("COMPLETED WITH SKIPS").to(notifyStep)
.on("COMPLETED").to(loadStep)
.on("FAILED").to(errorStep)
.from(loadStep)
.on("*").to(notifyStep)
.from(errorStep)
.on("*").fail()
.end()
.build();
}11. Scheduling Batch Jobs
11.1 With @Scheduled
@Component
@EnableScheduling
public class BatchScheduler {
private final JobLauncher jobLauncher;
private final Job dailyReportJob;
public BatchScheduler(JobLauncher jobLauncher,
@Qualifier("dailyReportJob") Job dailyReportJob) {
this.jobLauncher = jobLauncher;
this.dailyReportJob = dailyReportJob;
}
@Scheduled(cron = "0 0 2 * * ?") // 2:00 AM daily
public void runDailyReport() {
try {
JobParameters params = new JobParametersBuilder()
.addLocalDate("reportDate", LocalDate.now().minusDays(1))
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
JobExecution execution = jobLauncher.run(dailyReportJob, params);
if (execution.getStatus() == BatchStatus.FAILED) {
// Alert on failure
alertService.sendAlert("Daily report job failed: "
+ execution.getExitStatus().getExitDescription());
}
} catch (Exception e) {
alertService.sendAlert("Failed to launch daily report: " + e.getMessage());
}
}
}11.2 With Quartz
@Configuration
public class QuartzBatchConfig {
@Bean
public JobDetail batchJobDetail() {
return JobBuilder.newJob(BatchQuartzJob.class)
.withIdentity("importJob", "batchGroup")
.usingJobData("jobName", "importCustomerJob")
.storeDurably()
.build();
}
@Bean
public Trigger batchJobTrigger(JobDetail batchJobDetail) {
return TriggerBuilder.newTrigger()
.forJob(batchJobDetail)
.withIdentity("importTrigger", "batchGroup")
.withSchedule(CronScheduleBuilder
.cronSchedule("0 0 3 * * ?") // 3 AM daily
.withMisfireHandlingInstructionFireAndProceed())
.build();
}
}
public class BatchQuartzJob extends QuartzJobBean {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private ApplicationContext context;
@Override
protected void executeInternal(JobExecutionContext quartzContext) {
String jobName = quartzContext.getMergedJobDataMap().getString("jobName");
try {
Job job = context.getBean(jobName, Job.class);
JobParameters params = new JobParametersBuilder()
.addLong("quartzFireTime",
quartzContext.getFireTime().getTime())
.toJobParameters();
jobLauncher.run(job, params);
} catch (Exception e) {
throw new RuntimeException("Batch job failed: " + jobName, e);
}
}
}12. Monitoring Batch Jobs
12.1 Job Execution Listener
@Component
public class JobCompletionListener implements JobExecutionListener {
private static final Logger log = LoggerFactory.getLogger(JobCompletionListener.class);
private final MeterRegistry meterRegistry;
private final NotificationService notificationService;
public JobCompletionListener(MeterRegistry meterRegistry,
NotificationService notificationService) {
this.meterRegistry = meterRegistry;
this.notificationService = notificationService;
}
@Override
public void beforeJob(JobExecution jobExecution) {
log.info("Job {} starting with parameters: {}",
jobExecution.getJobInstance().getJobName(),
jobExecution.getJobParameters());
meterRegistry.counter("batch.job.started",
"job", jobExecution.getJobInstance().getJobName()).increment();
}
@Override
public void afterJob(JobExecution jobExecution) {
String jobName = jobExecution.getJobInstance().getJobName();
Duration duration = Duration.between(
jobExecution.getStartTime(), jobExecution.getEndTime());
meterRegistry.timer("batch.job.duration", "job", jobName)
.record(duration);
// Aggregate step stats
long totalRead = 0, totalWritten = 0, totalSkipped = 0;
for (StepExecution step : jobExecution.getStepExecutions()) {
totalRead += step.getReadCount();
totalWritten += step.getWriteCount();
totalSkipped += step.getSkipCount();
}
log.info("Job {} finished: status={}, duration={}s, read={}, written={}, skipped={}",
jobName, jobExecution.getStatus(), duration.toSeconds(),
totalRead, totalWritten, totalSkipped);
if (jobExecution.getStatus() == BatchStatus.FAILED) {
notificationService.notifyFailure(jobName,
jobExecution.getAllFailureExceptions());
}
}
}12.2 REST Endpoint for Job Status
@RestController
@RequestMapping("/api/batch")
public class BatchMonitoringController {
private final JobExplorer jobExplorer;
private final JobOperator jobOperator;
public BatchMonitoringController(JobExplorer jobExplorer,
JobOperator jobOperator) {
this.jobExplorer = jobExplorer;
this.jobOperator = jobOperator;
}
@GetMapping("/jobs")
public List<String> listJobs() {
return jobExplorer.getJobNames();
}
@GetMapping("/jobs/{jobName}/executions")
public List<JobExecutionSummary> getExecutions(
@PathVariable String jobName,
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size) {
List<JobInstance> instances = jobExplorer.getJobInstances(jobName, page * size, size);
return instances.stream()
.flatMap(i -> jobExplorer.getJobExecutions(i).stream())
.sorted(Comparator.comparing(JobExecution::getStartTime).reversed())
.map(this::toSummary)
.toList();
}
@PostMapping("/jobs/{jobName}/stop/{executionId}")
public ResponseEntity<String> stopJob(@PathVariable long executionId) throws Exception {
jobOperator.stop(executionId);
return ResponseEntity.ok("Stop signal sent");
}
@PostMapping("/jobs/{jobName}/restart/{executionId}")
public ResponseEntity<Long> restartJob(@PathVariable long executionId) throws Exception {
long newExecutionId = jobOperator.restart(executionId);
return ResponseEntity.ok(newExecutionId);
}
private JobExecutionSummary toSummary(JobExecution je) {
return new JobExecutionSummary(
je.getId(),
je.getJobInstance().getJobName(),
je.getStatus().toString(),
je.getStartTime(),
je.getEndTime(),
je.getStepExecutions().stream()
.mapToLong(StepExecution::getWriteCount).sum()
);
}
}13. Performance Tuning
13.1 Key Configuration Properties
# application.yml
spring:
batch:
jdbc:
initialize-schema: always # never in production
table-prefix: BATCH_
job:
enabled: false # don't auto-run on startup
datasource:
hikari:
maximum-pool-size: 20 # enough for partitioned steps
minimum-idle: 5
connection-timeout: 30000
# For chunk steps
batch:
chunk-size: 500
thread-pool-size: 8
partition-grid-size: 1013.2 Performance Checklist
/**
* Performance tuning summary:
*
* 1. CHUNK SIZE: Start at 100, benchmark up to 1000.
* Larger = fewer transactions, more memory, bigger rollback window.
*
* 2. JDBC FETCH SIZE: Match or exceed chunk size.
* reader.setFetchSize(chunkSize) to avoid round-trips.
*
* 3. JDBC BATCH WRITE: Ensure batch updates are enabled.
* spring.jpa.properties.hibernate.jdbc.batch_size=100
*
* 4. PARTITIONING: For data-parallel workloads, partition by
* primary key ranges. Grid size = 2-4x CPU cores.
*
* 5. MULTI-THREADED STEP: Simpler than partitioning but reader
* must be thread-safe. Use SynchronizedItemStreamReader.
*
* 6. ASYNC PROCESSOR/WRITER: Offload expensive processing.
* Use AsyncItemProcessor + AsyncItemWriter pair.
*
* 7. SKIP vs FAIL: Configure skip for recoverable errors.
* Every rollback-and-retry is expensive.
*
* 8. INDEXES: Ensure WHERE clause columns used in readers
* are properly indexed.
*
* 9. DISABLE LOGGING in tight loops. Use DEBUG only when needed.
*
* 10. JOB REPOSITORY: Use a separate datasource for metadata
* to avoid contention with business data.
*/13.3 Async Processing
@Bean
public Step asyncStep(JobRepository jobRepository,
PlatformTransactionManager txManager) {
return new StepBuilder("asyncStep", jobRepository)
.<RawData, Future<ProcessedData>>chunk(100, txManager)
.reader(reader())
.processor(asyncProcessor())
.writer(asyncWriter())
.build();
}
@Bean
public AsyncItemProcessor<RawData, ProcessedData> asyncProcessor() {
AsyncItemProcessor<RawData, ProcessedData> async = new AsyncItemProcessor<>();
async.setDelegate(expensiveProcessor()); // the actual processor
async.setTaskExecutor(new SimpleAsyncTaskExecutor("async-proc-"));
return async;
}
@Bean
public AsyncItemWriter<ProcessedData> asyncWriter() {
AsyncItemWriter<ProcessedData> async = new AsyncItemWriter<>();
async.setDelegate(actualWriter());
return async;
}14. Testing Batch Jobs
@SpringBatchTest
@SpringBootTest
class ImportCustomerJobTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JobRepositoryTestUtils jobRepositoryTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@BeforeEach
void cleanup() {
jobRepositoryTestUtils.removeJobExecutions();
jdbcTemplate.execute("DELETE FROM customers");
}
@Test
void testFullJob() throws Exception {
// Given — test CSV file in src/test/resources
JobParameters params = new JobParametersBuilder()
.addString("inputFile", "classpath:test-data/customers.csv")
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
// When
JobExecution execution = jobLauncherTestUtils.launchJob(params);
// Then
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
assertThat(execution.getExitStatus()).isEqualTo(ExitStatus.COMPLETED);
int count = jdbcTemplate.queryForObject(
"SELECT COUNT(*) FROM customers", Integer.class);
assertThat(count).isEqualTo(5);
}
@Test
void testImportStepOnly() throws Exception {
JobExecution execution = jobLauncherTestUtils.launchStep("importStep");
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
StepExecution stepExecution = execution.getStepExecutions()
.iterator().next();
assertThat(stepExecution.getReadCount()).isEqualTo(5);
assertThat(stepExecution.getWriteCount()).isEqualTo(5);
assertThat(stepExecution.getSkipCount()).isZero();
}
}15. Complete Real-World Example: ETL Pipeline
@Configuration
@RequiredArgsConstructor
public class DailySettlementBatchConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager txManager;
private final DataSource dataSource;
private final EntityManagerFactory emf;
@Bean
public Job dailySettlementJob(Step fetchTransactionsStep,
Step calculateFeesStep,
Step generateReportStep,
Step archiveStep) {
return new JobBuilder("dailySettlementJob", jobRepository)
.incrementer(new RunIdIncrementer())
.validator(new DefaultJobParametersValidator(
new String[]{"settlementDate"}, // required
new String[]{"dryRun"} // optional
))
.listener(new JobCompletionListener())
.start(fetchTransactionsStep)
.next(calculateFeesStep)
.next(generateReportStep)
.next(archiveStep)
.build();
}
@Bean
@StepScope
public JdbcPagingItemReader<Transaction> transactionReader(
@Value("#{jobParameters['settlementDate']}") LocalDate settlementDate) {
Map<String, Order> sortKeys = Map.of("id", Order.ASCENDING);
return new JdbcPagingItemReaderBuilder<Transaction>()
.name("transactionReader")
.dataSource(dataSource)
.selectClause("id, merchant_id, amount, currency, status, created_at")
.fromClause("transactions")
.whereClause("DATE(created_at) = :settlementDate AND status = 'CAPTURED'")
.sortKeys(sortKeys)
.parameterValues(Map.of("settlementDate", settlementDate))
.pageSize(500)
.rowMapper(new BeanPropertyRowMapper<>(Transaction.class))
.build();
}
@Bean
public Step fetchTransactionsStep(JdbcPagingItemReader<Transaction> reader) {
return new StepBuilder("fetchTransactionsStep", jobRepository)
.<Transaction, SettlementLine>chunk(200, txManager)
.reader(reader)
.processor(settlementProcessor())
.writer(settlementWriter())
.faultTolerant()
.skipLimit(20)
.skip(CurrencyConversionException.class)
.listener(new SettlementStepListener())
.build();
}
@Bean
public ItemProcessor<Transaction, SettlementLine> settlementProcessor() {
return transaction -> {
SettlementLine line = new SettlementLine();
line.setTransactionId(transaction.getId());
line.setMerchantId(transaction.getMerchantId());
line.setGrossAmount(transaction.getAmount());
// Calculate fees
BigDecimal feeRate = getFeeRate(transaction.getMerchantId());
BigDecimal fee = transaction.getAmount().multiply(feeRate)
.setScale(2, RoundingMode.HALF_UP);
line.setFee(fee);
line.setNetAmount(transaction.getAmount().subtract(fee));
line.setSettledAt(Instant.now());
return line;
};
}
@Bean
public JpaItemWriter<SettlementLine> settlementWriter() {
JpaItemWriter<SettlementLine> writer = new JpaItemWriter<>();
writer.setEntityManagerFactory(emf);
return writer;
}
}This example demonstrates the full lifecycle of a Spring Batch application. Start with simple tasklet or chunk steps, add fault tolerance, then scale with partitioning or async processing as data volumes grow. Monitor everything through the JobRepository and custom metrics to keep batch pipelines healthy in production.