Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
399 views
in Technique[技术] by (71.8m points)

spring batch - Why transaction is not getting Rollback if there is RunTimeException in JdbcBatchItemWriter

1- If I am using SkipPolicy in my code : I am using multiple JdbcBatchItemWriter with the help of CompositeItemWriter, the problem is if there is any RunTimeException with the Third JdbcBatchItemWriter (batchLogsJdbcWriter) then whole chunk is getting to Retry (Chunk is going back to the Processor and again coming to the Writer) and first & second JdbcBatchItemWriter (requestsJdbcWriter, customObjectJdbcWriter) is getting updated with the records but it should be roll back once there is any RunTimeException with the Third JdbcBatchItemWriter, which is not happening.

2- Without SkipPolicy: Record is not getting retry but Rollback is not happening which is default behavior of SpringBatch

package ca.job;

import java.io.IOException;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Map;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.batch.item.file.FlatFileFooterCallback;
import org.springframework.batch.item.file.FlatFileHeaderCallback;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.MultiResourceItemWriter;
import org.springframework.batch.item.file.ResourceSuffixCreator;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.file.builder.MultiResourceItemWriterBuilder;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.json.JsonParseException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.transaction.annotation.Transactional;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

@Configuration
public class FileCreationJob {

    private static final UBPLogger LOGGER = UBPLogger.getInstance(FileCreationJob.class);

    private JobBuilderFactory jobBuilderFactory;
    private StepBuilderFactory stepBuilderFactory;

    private JdbcBatchItemWriter<CustomObject> logsJdbcWriter;
    private JdbcBatchItemWriter<CustomObject> logsJdbcWriter1;
    private JdbcBatchItemWriter<CustomObject> requestsJdbcWriter;
    private MultiResourceItemWriter<CustomObject> multiResourceItemWriter;
    private JdbcBatchItemWriter<CustomObject> evenErrorJdbcWriter;
    private JdbcBatchItemWriter<CustomObject> customObjectJdbcWriter;
    private JdbcBatchItemWriter<CustomObject> batchLogsJdbcWriter;

    ExecutionContext jobContext = new ExecutionContext();

    @Autowired
    public NM1GIF2FileCreationJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
        super();
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
    }

    @Bean
    public Job multiThreadedJob(Step step1) {
        return this.jobBuilderFactory.get(Constants.JOB_NAME).incrementer(new RunIdIncrementer()).start(step1).build();
    }

    @Bean
    public Step step1(ItemStreamReader<CustomObject> logsJdbcDBReader,
            ItemProcessor<CustomObject, CustomObject> legacyProcessBilling,
            ClassifierCompositeItemWriter<CustomObject> classifierCompositeItemWriter) {

        return this.stepBuilderFactory.get("step1").<CustomObject, CustomObject>chunk(chunkSize)
                .reader(logsJdbcDBReader).processor(billing)
                .listener(new ItemErrorHandlingSupportFactory(jobContext, eventIntegrationLogsDAOImpl,
                        eventRequestDAOImpl, eventErrorsDAOImpl, eventBatchLogDAOImpl, ubpComposeErrorResponse)
                                .asItemWriterListener())
                .writer(classifierCompositeItemWriter)
                .faultTolerant()
                .skip(Exception.class)
                .skipLimit(skipCount)
                .noRetry(Exception.class)
                .build();
    }

    @Bean
    @StepScope
    public ItemStreamReader<CustomObject> logsJdbcDBReader(
            @Qualifier(value = "dataSource") DataSource dataSource) throws Exception {
        System.out.println("Reading the data from EVENT_INTEGRATION_LOGS Table");
        loadDefaultJobContexts();
        if ("true".equalsIgnoreCase(readErrorFlag)) {
            throw new RuntimeException();
        }
        return new JdbcPagingItemReaderBuilder<CustomObject>().name("pagingItemReader")
                .dataSource(dataSource).pageSize(pageSize).queryProvider(queryProvider(dataSource))
                .rowMapper(new CustomObjectDBRowMapper()).build();
    }


    @Bean
    @StepScope
    public PagingQueryProvider queryProvider(DataSource cloudDataSource) throws Exception {
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
        provider.setDataSource(cloudDataSource);
        provider.setSelectClause(selectClause);
        provider.setFromClause(fromClause);
        provider.setWhereClause(whereClause);
        provider.setSortKey(sortKey);
        System.out.println("Sql Query : " + selectClause + " " + fromClause + " " + whereClause + " " + sortKey);
        return provider.getObject();
    }


    @Bean
    @StepScope
    public ItemProcessor<CustomObject, CustomObject> billing(
            @Value("#{stepExecution}") StepExecution stepExecution) {

        return data -> {
                       // Performing Operation
            return data;

        };
    }


    @Bean
    @StepScope
    public ClassifierCompositeItemWriter<CustomObject> classifierCompositeItemWriter() {
        ClassifierCompositeItemWriter<CustomObject> classifierCompositeItemWriter = new ClassifierCompositeItemWriter<>();

        try {
            classifierCompositeItemWriter.setClassifier(new CustomObjectClassifier(
                    compositeItemWriterSuccess(multiResourceItemWriter, logsJdbcWriter,
                            requestsJdbcWriter, customObjectJdbcWriter, batchLogsJdbcWriter),
                    compositeItemWriterFailure(logsJdbcWriter1, requestsJdbcWriter,
                            evenErrorJdbcWriter)));
            return classifierCompositeItemWriter;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    @Bean
    @StepScope
    public CompositeItemWriter<CustomObject> compositeItemWriterSuccess(
            @Qualifier("multiResourceItemWriter") MultiResourceItemWriter<CustomObject> multiResourceItemWriter,
            @Qualifier("logsJdbcWriterSuccess") JdbcBatchItemWriter<CustomObject> logsJdbcWriterSuccess,
            @Qualifier("requestsJdbcWriter") JdbcBatchItemWriter<CustomObject> requestsJdbcWriter,
            @Qualifier("customObjectJdbcWriter") JdbcBatchItemWriter<CustomObject> customObjectJdbcWriter,
            @Qualifier("batchLogsJdbcWriter") JdbcBatchItemWriter<CustomObject> batchLogsJdbcWriter) {
        CompositeItemWriter<CustomObject> compositeItemWriter = new CompositeItemWriter<>();
        compositeItemWriter.setDelegates(Arrays.asList(requestsJdbcWriter, customObjectJdbcWriter,
                batchLogsJdbcWriter, logsJdbcWriterSuccess, multiResourceItemWriter));
        return compositeItemWriter;
    }

    @Bean
    @StepScope
    public CompositeItemWriter<CustomObject> compositeItemWriterFailure(
            @Qualifier("logsJdbcWriterFailure") JdbcBatchItemWriter<CustomObject> logsJdbcWriterFailure,
            @Qualifier("requestsJdbcWriter") JdbcBatchItemWriter<CustomObject> requestsJdbcWriter,
            @Qualifier("evenErrorJdbcWriter") JdbcBatchItemWriter<CustomObject> evenErrorJdbcWriter) {
        CompositeItemWriter<CustomObject> compositeItemWriter = new CompositeItemWriter<>();
        compositeItemWriter.setDelegates(
                Arrays.asList(logsJdbcWriterFailure, requestsJdbcWriter, evenErrorJdbcWriter));
        return compositeItemWriter;
    }

    
    @Bean
    @StepScope
    public JdbcBatchItemWriter<CustomObject> logsJdbcWriterSuccess(
            @Qualifier(value = "dataSource") DataSource dataSource) {

        return new JdbcBatchItemWriterBuilder<CustomObject>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .itemPreparedStatementSetter((item, ps) -> {
                    ps.setString(1, item.getStatus());
                    ps.setString(2, Constants.CREATED_BY);
                    ps.setTimestamp(3, DateUtils.getCurrentTimestamp());
                    ps.setString(4, nm1MappingFields.fieldExtractor(item));
                    ps.setBigDecimal(5, item.getEventSeqId());
                }).dataSource(dataSource).sql(updateCustomObjectWithMapping).build();
    }

    @Bean
    @StepScope
    public JdbcBatchItemWriter<CustomObject> logsJdbcWriterFailure(
            @Qualifier(value = "dataSource") DataSource dataSource) {

        return new JdbcBatchItemWriterBuilder<CustomObject>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .itemPreparedStatementSetter((item, ps) -> {
                    ps.setString(1, item.getStatus());
                    ps.setString(2, Constants.CREATED_BY);
                    ps.setTimestamp(3, DateUtils.getCurrentTimestamp());
                    ps.setBigDecimal(4, item.getSeqId());
                }).dataSource(dataSource).sql(updateCustomObjectWitoutMapping).build();
    }

    
    @Bean
    @StepScope
    public JdbcBatchItemWriter<CustomObject> requestsJdbcWriter(
            @Qualifier(value = "dataSource") DataSource dataSource) {

        return new JdbcBatchItemWriterBuilder<CustomObject>()
                .ite

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

2.1m questions

2.1m answers

60 comments

56.7k users

...