본문 바로가기

Programming/Spring

[Spring-batch] ItemWriter 클래스 구현기 (오류 해결)

반응형

사실 이 글을 포스팅 하기 전에, Spring batch를 공부한 내용들과 spring batch + quartz 를 연동하기 위한 설정들 등등, 좀 더 기초적인 내용들을 먼저 다루고 싶었다. 하지만 일단 프로젝트의 완성도를 위해 개발부터 진행하고 있던 와중, 나를 너무 힘들게 했던 내용이 있어 먼저 끌고 와보았다. 분명 써놓고 보면 별 것 아닌 것 처럼 보이겠지만... 이 오류를 잡기위해 며칠을 고생한 걸 생각하며,,, 이걸 글로라도 남겨야 시간이 좀 덜 아까울 것 같은,, 그런 보상심리에 포스팅을 시작한다. ㅎㅎ

 

ItemWriter<T> write() 를 execute 할 때 발생할 수 있는 오류

 

나는 JdbcBatchItemWriter 를 사용하였으며, JDBC의 Batch 기능을 사용하여 모아둔 쿼리문들을 한번에 Database로 전달하여 Database 내부에서 쿼리들이 실행한다는 점이 당연히 속도면에서 우세할 것이라고 생각하였다. (ps. 이 때 '모아둔' 의 기준은 개발자가 config 파일에서 설정해 둔 chunk size 이다.)

 

현재 개발중인 Job과 Step은 큰 로직이 없지만, 바로 내일부터 개발예정인 또 다른 Job의 경우는 조금 복잡해서 writer가 로직을 담을 지, 아닐 지 고민중이므로 나는 ItemWriter 구현을 따로 클래스로 빼서 하게 되었다. 

 

보통 클래스로 빼서 구현하게 되면, extends JdbcBatchItemWriter<T> 의 형태로 구현을 하는 경우도 있지만, 나는 클래스의 정체성?을 통일하고 싶어서 (다른 writer 클래스에서도 ~~~ implements ItemWriter<T> 의 형태로 단조롭게 쓰고 싶었다. ) ItemWriter<T>만 구현함을 명시하였다. 대신, 내가 클래스 선언부에서 ItemWriter 타입을 명시하였으므로, 클래스 안에서 JdbcBatchItemWriter 를 사용하기 위해서는 이걸 위한 setter 담당 메서드가 따로 필요하다. 

 

처음에 내가 ItemWriter를 구현했을 때는 다음과 같았다. 

어차피 Job을 정의하는 파일에서 writer()를 제대로 설정만 해놓고 JdbcBatchItemWriter<T> 객체 안에 필드만 제대로 채워준다면, override된 write()는 해당하는 private객체를 통해 불러주면 될 것이라 생각했다. 

 

그래서 맨 처음 Job을 시작할 때 생성하는 Bean ( Spring은 싱글톤 레지스트리 형식을 지향하므로 Bean으로 생성한 setUpOverallWriter()에서는 JdbcBatchItemWriter에 대한 셋팅들을 저장해 두면 효율적으로 실행할 수 있을 것이다. write()를 할 때 마다 dataSource를 재설정하고, sql을 다시 불러오고, JdbcBatchItemWriter객체를 재생성할 필요는 없으므로 ) 에서 dataSource를 파라미터로 가진 생성자와 getDelegate() 메소드를 먼저 호출하였다. 

 

ItemWriter Class

 

package org.webapp.batch.overallJob;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.webapp.model.Overall;
import javax.sql.DataSource;
import java.util.List;


@StepScope
public class SetupOverallWriter implements ItemWriter<Overall> {

    private final Logger logger = LoggerFactory.getLogger(SetupOverallWriter.class);
    private JdbcBatchItemWriter<Overall> delegate;
    private static final String sql
                        = "UPDATE overall " +
                        "SET restaurants = json_merge_preserve(restaurants, :restaurants) " +
                        "where station=:station";

    public SetupOverallWriter() {}

    public SetupOverallWriter(DataSource dataSource) {
        this.delegate = new JdbcBatchItemWriter();
        this.delegate.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
        this.delegate.setDataSource(dataSource);
        this.delegate.setJdbcTemplate(new NamedParameterJdbcTemplate(dataSource));
        this.delegate.setSql(sql);
        this.delegate.afterPropertiesSet();
    }

    @Override
    public void write(List<? extends Overall> items) throws Exception {
        logger.info("[SetupOverallJob] : ItemWriter started.");
        this.delegate.write(items);
    }

    public JdbcBatchItemWriter<Overall> getDelegate() {
        return this.delegate;
    }

}

 

ItemWriter Bean

 

...
@Bean
public ItemWriter<Overall> setupOverallWriter() {
    return new SetupOverallWriter(this.dataSource).getDelegate();
}
   

 

위 코드는 NPE를 발생시킨다. 

 

[threadPoolTaskExecutor-1] ERROR org.springframework.batch.core.step.AbstractStep - Encountered an error executing step setupOverallStep in job setupOverallJob
java.lang.NullPointerException
	at org.webapp.batch.overallJob.SetupOverallWriter.write(SetupOverallWriter.java:39)
	at org.webapp.batch.overallJob.SetupOverallWriter$$FastClassBySpringCGLIB$$8272f8b6.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
	at org.webapp.batch.overallJob.SetupOverallWriter$$EnhancerBySpringCGLIB$$14146bd2.write(<generated>)
	at org.springframework.batch.core.step.item.SimpleChunkProcessor.writeItems(SimpleChunkProcessor.java:188)
	at org.springframework.batch.core.step.item.SimpleChunkProcessor.doWrite(SimpleChunkProcessor.java:154)
	at org.springframework.batch.core.step.item.SimpleChunkProcessor.write(SimpleChunkProcessor.java:287)
	at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:212)
	at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75)
	at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
	at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
	at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
	at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
	at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
	at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
	at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
	at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
	at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
	at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:203)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
	at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
	at com.sun.proxy.$Proxy43.execute(Unknown Source)
	at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
	at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:399)
	at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135)
	at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:313)
	at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:144)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
    

찍힌 로그를 따라가보면, 결국 내가 선정한 객체로 do-write() 를 하는 과정에서 NPE가 났음을 쉽게 찾을 수 있다. 

하지만 bean 생성에서 exception이 발생하지도 않았고, ItemWriter<T> bean을 무사히 생성하고 processor 까지는 다 돌아간 상황이었다. 

 

원인을 찾기 위해 다시 한번 위로 올라가 보면, 빈에서는 JdbcBatchItemWriter 객체를 ItemWriter<T> 로 cast 하여 받는다. JdbcBatchItemWriter 역시 ItemWriter<T> 인터페이스를 구현한 클래스이므로 이 과정에서 오류는 없었지만, 나는 결국 ItemWriter의 write()를 불러버린 것과 마찬가지이다. 내가 생성 한 적이 없으니, NPE가 나는 건 당연했다.

 

따라서 bean의 리턴타입을 다시 맞춰주면 NPE는 해결된다.

 

하지만 이 과정에서 발생한 두번째 에러가 있었다. 빈을 잘 생성하고 NPE도 피했는 데, 이번에는 아예 write() 메소드가 실행조차 되지 않았다. 내가 빈에서 write() 를 직접 부르지도 않으니, custom-ItemWriter<T> 클래스까지 정의해주어야 하나, 하고 고민을 많이 했었다. 하지만 이걸 재정의하면, 분명 나중에 좀 색다른 writer 를 만들고 싶어질 때 걸림돌이 되고, 쓸모없는 클래스를 만들어주는 것 같다는 느낌이 들어 다른 방법을 찾았다.

 

최종 ItemWriter Class

 

package org.webapp.batch.overallJob;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.webapp.model.Overall;
import javax.sql.DataSource;
import java.util.List;


@StepScope
public class SetupOverallWriter implements ItemWriter<Overall> {

    private final Logger logger = LoggerFactory.getLogger(SetupOverallWriter.class);
    private JdbcBatchItemWriter<Overall> delegate;
    private DataSource dataSource;
    private static final String sql
            = "UPDATE overall " +
            "SET restaurants = json_merge_preserve(restaurants, :restaurants) " +
            "where station=:station";


    public SetupOverallWriter() {}

    public SetupOverallWriter(DataSource dataSource) {
        this.dataSource = dataSource;
    }

    @BeforeStep
    public void prepareForWriter() {
        this.delegate = new JdbcBatchItemWriter<Overall>();
        this.delegate.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Overall>());
        this.delegate.setDataSource(dataSource);
        this.delegate.setJdbcTemplate(new NamedParameterJdbcTemplate(dataSource));
        this.delegate.setSql(sql);
        this.delegate.afterPropertiesSet();

    }

    @Override
    public void write(List<? extends Overall> items) throws Exception {
        logger.info("[SetupOverallJob] : ItemWriter started.");
        this.delegate.write(items);
    }

    public JdbcBatchItemWriter<Overall> getDelegate() {
        return this.delegate;
    }

}

 

최종 ItemWriter Bean

 

@Bean
    public SetupOverallWriter setupOverallWriter() {
        return new SetupOverallWriter(this.dataSource);
    }
    

 

@beforestep 어노테이션을 활용하여 빈이 생성될 때 setter 를 실행시켰고 write()는 예정대로 override 하여 실행하였다.

 

클래스를 위와 같은 형식으로 두었으니, 당연히 빈의 리턴 타입도 클래스 타입으로 바꾸어 주어, dataSource 를 파라미터로 하여 생성하게끔 하였다. 

 

 

BeanProPertyItemSqlParameterSourceProvider 사용 시 발생할 수 있는 query에 대한 오류

 

 

로그로 write()까지 제대로 찍히고 객체도 문제 없이 채워져있는 데, DB에 insert가 제대로 되지 않는 문제였다. 이 과정에서 SQL 문법에 대한 exception이 전혀 발생하지 않았기 때문에 고생 좀 했다..

 

나는 column type을 json으로 두었고, 이를 null로 초기화해두었다. 그래서 배치프로그램을 통해 이 column 값들을 json_merge_preserve() 를 돌려 갱신시킬 계획이었다. json_merge_preserve는 두번째 파라미터를 '["test", "test2", "test3"]' 와 같은 형태로 작은 따옴표 안에 넣어야 한다. 따라서 나는 배치에서의 sql도 

이처럼 두었었다. 

 

뭔가, null로 초기화를 해둔 탓에 sql 쿼리가 제대로 돌지 않아도 별다른 exception을 발생시키지 않는다고 생각하여, write() 메소드를 디버깅하여 타고 들어가보았다.

 

result of crawling

세팅한 sql, items 내에 있는 Overall 필드가 모두 잘 들어와있지만, jdbcBatchItemWriter.java 내의 로컬변수인 parameterCount가 1로 들어와 있음을 알 수 있다. 

 

parameterCount는 결국 sql query가 execute 되기 전에 로 처리되어, 실행 할 때 파라미터로 주입되는 getter 메소드의 수를 정의한다. 나는 분명 station, restaurants 두 개를 주입했는 데 이 값이 1개로 들어왔음은, 문법에서 오류가 난 것임을 알 수 있다. (이 뒤로 쭉 따라 디버깅을 해 보았을 때 문제없이 update()를 마쳤으므로. )

반응형