看了一下博客,好几个月没写什么文章了。再不写,估计Google再也不来了!^_^闲话少说,这次的主题就讲一下Spring Batch。Spring Batch是Spring与Accenture的合作产物。作为前Accenture员工,感觉还是很亲切的。当然这是不是我在工作中技术选型的主要原因,更重要的是看中他的强大!
业务背景:
金融系统中有一种表叫台账表,这也是在接触这个领域后才知道的。而针对这张表,每天会进行定时全表遍历进行计算利息,费用等相关操作,也就是俗称的跑批。而跑批无非就是遍历循环表中数据再进行运算而已,可是一旦数据量上去了,很多看似简单的事情往往实现起来就不再是原先的逻辑了。一张上亿行数的表,查询出来,放到内存?那肯定是不明智的做法。当然应该采用游标或者分页的查询方式去实现更好的性能。而自己去写这类方法,其实并不难,如果只是简单地为了分页查询,确实可以自己去写一个。而之所以此处选择Spring Batch,是因为它不仅仅支持这一点特性,它能做的还有很多很多。
系统背景:
系统是典型的Java Web系统架构,Spring MVC与MyBatis。当然其他诸如服务治理,消息消费等就不赘述了,此处跟那些没有太大关系。而之所以有这篇文章的出现,正是因为MyBatis。因为在我使用过程中,查询大量资料,唯独与MyBatis集成的相关资料特别少。官方的jar包里也是针对ibatis做的封装(并且已经标注为废弃状态),而MyBatis自身针对Spring Batch做了分页Writer和Reader,此次我们主要就是用这两个类。外加用Java Config自动生成Step与Job方式,避免使用xml配置,对于有代码洁癖的人来讲,过多的配置文件感觉很乱很脏。
实现细节:
配置文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd"> <!-- 数据源配置文件,此处略 --> <import resource="spring-config-db.xml"></import> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> <!-- Persist batch metadata in-memory --> <!--<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">--> <!--<property name="transactionManager" ref="transactionManager" />--> <!--</bean>--> <!-- Persist batch metadata in database --> <bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean"> <property name="transactionManager" ref="transactionManager"/> <property name="databaseType" value="MySQL"/> <property name="dataSource" ref="dataSource"/> </bean> <!-- Mybatis ItemReader 其中queryId:对应Mapper xml文件中的id,此处为空,因要在代码中动态填充 --> <bean id="itemReader" class="org.mybatis.spring.batch.MyBatisPagingItemReader" scope="prototype"> <property name="sqlSessionFactory" ref="resellSqlSessionFactory" /> <property name="pageSize" value="1000"/> <property name="queryId" value="" /> </bean> <!-- Mybatis ItemWriter 其中statementId:对应Mapper xml文件中的id,此处为空,因要在代码中动态填充 --> <bean id="itemWriter" class="org.mybatis.spring.batch.MyBatisBatchItemWriter" scope="prototype"> <property name="sqlSessionFactory" ref="resellSqlSessionFactory"/> <property name="sqlSessionTemplate" ref="sqlSessionTemplate"/> <property name="statementId" value=""/> </bean> <!-- Job创建工厂 --> <bean id="jobBuilderFactory" class="org.uugu.core.common.support.CustomJobBuilderFactory"> <constructor-arg index="0" ref="jobRepository"/> </bean> <!-- Step创建工厂 --> <bean id="stepBuilderFactory" class="org.springframework.batch.core.configuration.annotation.StepBuilderFactory"> <constructor-arg index="0" ref="jobRepository"/> <constructor-arg index="1" ref="transactionManager"/> </bean> <!-- Spring线程池 --> <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="5"/> <property name="maxPoolSize" value="10"/> <property name="queueCapacity" value="30"/> </bean> <!--<bean id="daySettleInfoTasklet" class="org.uugu.core.service.daysettle.batchleInfoTasklet" lazy-init="true">--> <!--</bean>--> <!--<batch:job id="daySettleInfoJob" job-repository="jobRepository">--> <!--<batch:step id="step1">--> <!--<tasklet>--> <!--<chunk reader="itemReader" processor="daySettleInfoProcessor" writer="itemWriter"--> <!--commit-interval="2"/>--> <!--</tasklet>--> <!--<!–<tasklet ref="daySettleInfoTasklet"/>–>--> <!--</batch:step>--> <!--</batch:job>--> </beans> |
Processor代码:其实这个没什么特殊的敌方
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
@Component @Scope("prototype") public class DemoProcessor implements ItemProcessor<B, A> { private Map<String, Object> parameterValues; @Override public A process(B item) throws Exception { A a = new A(); a.setDay((Date) parameterValues.get("date")); return a; } public Map<String, Object> getParameterValues() { return parameterValues; } public void setParameterValues(Map<String, Object> parameterValues) { this.parameterValues = parameterValues; } } |
Service代码:此处演示如何动态创建Job
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
@Service("demoService") public class DemoServiceImpl implements DemoService { @Autowired private DemoProcessor demoProcessor; //业务逻辑处理Processor // @Autowired // private CompositeItemProcessor compositeItemProcessor; @Autowired private SimpleJobLauncher jobLauncher; @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private MyBatisBatchItemWriter aWriter; @Autowired private MyBatisPagingItemReader itemReader; @Autowired private ThreadPoolTaskExecutor taskExecutor; @Autowired private SecondWriter secondWriter; @Override public void calculateFee(Date date) { //Set Reader QueryId itemReader.setQueryId("org.uugu.core.dao.financing.BDao.findBPaginated"); //创建findBPaginated这个xml对应ID的sql中使用到的参数 Map<String, Object> params = Maps.newHashMap(); params.put("verifyStatus", 0); params.put("status", "1,2"); itemReader.setParameterValues(params); //Set Writer StatementId aWriter.setStatementId("org.uugu.core.dao.daysettle.ADao.saveA"); //Create Parameter for Processor Map<String, Object> param = Maps.newHashMap(); param.put("date", date); daySettleInfoProcessor.setParameterValues(param); //使用符合Writer,实现多Writer,可像多个数据库中写入数据,当然第二个Writer的写入操作要自己代码实现,注意同事务中MyBatis的ExcuteType不可改变 CompositeItemWriter compositeItemWriter = new CompositeItemWriter(); List delegates = Lists.newArrayList(); delegates.add(aWriter); delegates.add(secondWriter); compositeItemWriter.setDelegates(delegates); //Build Step Step step = stepBuilderFactory.get("calculateDaySettleInfoStep1").<B, A>chunk(1000) //1000次处理提交一次事务 .reader(itemReader) .processor(demoProcessor) .writer(compositeItemWriter) // .taskExecutor(taskExecutor) //If you need can be used. .faultTolerant() .skipLimit(10) //可以异常跳过10次,多了就任务失败 .skip(NullPointerException.class) //空指针异常可以跳过,继续执行 .listener(new DemoSkipListener()) .build(); //Build Job Job job = jobBuilderFactory.get("calculateDaySettleInfoJob1").incrementer(new RunIdIncrementer()).start(step).build(); JobParameters parameters = new JobParametersBuilder() .addString("runDay", DateUtil.formatDate(date, "yyyy-MM-dd")) //以日期为参数,可保证一天只能执行一次 .toJobParameters(); try { //Start Job JobExecution result = jobLauncher.run(job, parameters); System.out.println("===" + result.toString()); } catch (Exception e) { e.printStackTrace(); } } } |
接口就不贴了,自己抽象出来就可以了。
如有问题,欢迎及时指出。如转载请指明出处。
itemReader超过了1000笔在job中是不翻页的啊