乐视pro3耳机没有反应:Spring batch的批处理框架简单介绍
来源:百度文库 编辑:偶看新闻 时间:2024/05/03 12:29:01
有关spring batch的介绍我就不多说了,可以去下面的网址看看:
http://www.infoq.com/cn/news/2008/07/spring-batch-zh
刚接触到spring batch的时候无从下手,javaeye有关的帖子博文也非常的少,很郁闷只能看它自己提供的文档,说真的,那文档帮助不大,顶多就是让你知道spring batch靠这么几个类玩的。没办法只能自己一步步看代码调试,走了不少弯路呢。
这篇文章简单介绍一下spring batch是怎么处理单个文件的。
首先看下spring batch关键的几个类:
JobLauncher负责启动Job,Job中干事的是Step,想让Step干活,就要给它点工具,itemReader,ItemProcessor,itemWriter就是它需要的工具,这些工具可以是你自己提供,以可以用spring batch现成的。就看实际情况了。
接下来就具体看看这些类究竟包含了什么东西,干了哪些事情。
先看配置文件:
Xml代码id="jobLauncher" - class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
-
name="jobRepository" ref="jobRepository" /> id="jobRepository"class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" - p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager" />
jobLauncher的配置很简单,只是将需要的jobRepository注进来而已。复杂的是job和step的配置
Xml代码-
id="checkSheetFileImportJob" parent="simpleJob"> -
name="steps"> -
-
id="checkSheetTransactionLoadStep" parent="simpleStep"> -
name="commitInterval" value="3"/> -
name="allowStartIfComplete" value="true"/> -
name="itemReader" ref="checkSheetTransactionItemReader"/> -
name="itemWriter" ref="checkSheetTransactionWriter"/> -
id="checkSheetTransactionWriter" class="com.longtop.netbank.checksheet.CheckSheetTransactionJDBCWriter"> -
name="dataSource" ref="dataSource"/> -
name="resource" ref="checkSheetFileResource"> -
id="checkSheetTransactionItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"> -
name="resource" ref="checkSheetFileResource"/> -
name="linesToSkip" value="0"/> -
name="lineMapper"> -
class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> -
name="lineTokenizer"> -
- class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
-
name="names" - value="accountNo,transactionDate,transactionType,oppositeAccountNo,oppositeAccountNickName,summary,lending,transactionAmount,balance"/>
-
name="fieldSetMapper"> -
class="com.longtop.netbank.checksheet.CheckSheetTransactionFieldSetMapper"/>
Xml代码
-
id="simpleJob" class="org.springframework.batch.core.job.SimpleJob" abstract="true"> -
name="jobRepository" ref="jobRepository" /> -
name="restartable" value="true" /> id="simpleStep" class="org.springframework.batch.core.step.item.SimpleStepFactoryBean" - abstract="true">
-
name="transactionManager" ref="transactionManager" /> -
name="jobRepository" ref="jobRepository" /> -
name="commitInterval" value="1" />
job是springbatch自带的,我们配置steps属性,以便执行批处理。
在示例中,step的工作就是通过itemReader读取数据文件,然后用itemProcessor进行处理,然后通过itemWriter写入到数据库中,示例中的step也是用的springbatch自带的类SimpleStepFactoryBean。
对于SimpleStepFactoryBean需要花功夫好好的看看它究竟做了哪些事情。
Simplestepbeanfactory代码
- public class SimpleStepFactoryBean
implements FactoryBean, BeanNameAware { - private int commitInterval = 0;
- /**
- * Create a {@link Step} from the configuration provided.
- *
- * @see org.springframework.beans.factory.FactoryBean#getObject()
- */
- public final Object getObject() throws Exception {
- TaskletStep step = new TaskletStep(getName());
- applyConfiguration(step);
- return step;
- }
- public Class
getObjectType() { - return Step.class;
- }
- ...
- }
public class SimpleStepFactoryBeanimplements FactoryBean, BeanNameAware {private int commitInterval = 0;/*** Create a {@link Step} from the configuration provided.** @see org.springframework.beans.factory.FactoryBean#getObject()*/public final Object getObject() throws Exception {TaskletStep step = new TaskletStep(getName());applyConfiguration(step);return step;}public Class getObjectType() {return Step.class;}...}
我们可以知道这个类返回的是TaskletStep对象,并通过applyConfiguration方法设置TaskletStep对象的属性。applyConfiguration比较长,这里就不贴出来了。再讲到后面的时候再回过头来介绍这个方法。
commitInterval 大家通过单词的意思就应该知道该变量是控制itemReader每次读取的量,和itemWriter每次写入的量,简单的说就是:
如果commitInterval=10,数据文件有30个数据,当读到写到24个数据的时候抛出了异常,那么成功写入数据库的数据只有20条,第21--第24条数据放弃。下次如果执行断点恢复时,就从第21条数据开始
Java代码
- /**
- * @return a {@link CompletionPolicy} consistent with the commit interval
- * and injected policy (if present).
- */
- private CompletionPolicy getChunkCompletionPolicy() {
- Assert.state(!(chunkCompletionPolicy != null && commitInterval != 0),
- "You must specify either a chunkCompletionPolicy or a commitInterval but not both.");
- Assert.state(commitInterval >= 0, "The commitInterval must be positive or zero (for default value).");
- if (chunkCompletionPolicy != null) {
- return chunkCompletionPolicy;
- }
- if (commitInterval == 0) {
- logger.info("Setting commit interval to default value (" + DEFAULT_COMMIT_INTERVAL + ")");
- commitInterval = DEFAULT_COMMIT_INTERVAL;
- }
- return new SimpleCompletionPolicy(commitInterval);
- }
/*** @return a {@link CompletionPolicy} consistent with the commit interval* and injected policy (if present).*/private CompletionPolicy getChunkCompletionPolicy() {Assert.state(!(chunkCompletionPolicy != null && commitInterval != 0),"You must specify either a chunkCompletionPolicy or a commitInterval but not both.");Assert.state(commitInterval >= 0, "The commitInterval must be positive or zero (for default value).");if (chunkCompletionPolicy != null) {return chunkCompletionPolicy;}if (commitInterval == 0) {logger.info("Setting commit interval to default value (" + DEFAULT_COMMIT_INTERVAL + ")");commitInterval = DEFAULT_COMMIT_INTERVAL;}return new SimpleCompletionPolicy(commitInterval);}
这个方法就是设置将commitInterval设置到SimpleCompetionPolicy中以便以后使用。 SimpleCompetionPolicy是用来控制循环什么时候结束,控制的方法就是通过判断执行的次数是否超过了commitInterval。
下面就大概的讲下执行步骤(只贴类名和部分代码,类名方便查找源代码):
1 jobLauncher调用job:
Java代码- public class SimpleJobLauncher implements JobLauncher, InitializingBean {
- /**
- * Run the provided job with the given {@link JobParameters}. The
- * {@link JobParameters} will be used to determine if this is an execution
- * of an existing job instance, or if a new one should be created.
- *
- * @param job the job to be run.
- * @param jobParameters the {@link JobParameters} for this particular
- * execution.
- * @return JobExecutionAlreadyRunningException if the JobInstance already
- * exists and has an execution already running.
- * @throws JobRestartException if the execution would be a re-start, but a
- * re-start is either not allowed or not needed.
- * @throws JobInstanceAlreadyCompleteException if this instance has already
- * completed successfully
- */
- public JobExecution run(final Job job, final JobParameters jobParameters)
- throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
- Assert.notNull(job, "The Job must not be null.");
- Assert.notNull(jobParameters, "The JobParameters must not be null.");
- boolean exists = jobRepository.isJobInstanceExists(job.getName(), jobParameters);
- if (exists && !job.isRestartable()) {
- throw new JobRestartException("JobInstance already exists and is not restartable");
- }
- /**
- * There is a very small probability that a non-restartable job can be
- * restarted, but only if another process or thread manages to launch
- * and fail a job execution for this instance between the last assertion
- * and the next method returning successfully.
- */
- final JobExecution jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
- taskExecutor.execute(new Runnable() {
- public void run() {
- try {
- job.execute(jobExecution);
- }
- catch (Throwable t) {
- logger.info("Job: [" + job + "] failed with the following parameters: [" + jobParameters + "]", t);
- rethrow(t);
- }
- }
- private void rethrow(Throwable t) {
- if (t instanceof RuntimeException) {
- throw (RuntimeException) t;
- }
- throw new RuntimeException(t);
- }
- });
- return jobExecution;
- }
public class SimpleJobLauncher implements JobLauncher, InitializingBean {/*** Run the provided job with the given {@link JobParameters}. The* {@link JobParameters} will be used to determine if this is an execution* of an existing job instance, or if a new one should be created.** @param job the job to be run.* @param jobParameters the {@link JobParameters} for this particular* execution.* @return JobExecutionAlreadyRunningException if the JobInstance already* exists and has an execution already running.* @throws JobRestartException if the execution would be a re-start, but a* re-start is either not allowed or not needed.* @throws JobInstanceAlreadyCompleteException if this instance has already* completed successfully*/public JobExecution run(final Job job, final JobParameters jobParameters)throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {Assert.notNull(job, "The Job must not be null.");Assert.notNull(jobParameters, "The JobParameters must not be null.");boolean exists = jobRepository.isJobInstanceExists(job.getName(), jobParameters);if (exists && !job.isRestartable()) {throw new JobRestartException("JobInstance already exists and is not restartable");}/*** There is a very small probability that a non-restartable job can be* restarted, but only if another process or thread manages to launch* and fail a job execution for this instance between the last assertion* and the next method returning successfully.*/final JobExecution jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);taskExecutor.execute(new Runnable() {public void run() {try {job.execute(jobExecution);}catch (Throwable t) {logger.info("Job: [" + job + "] failed with the following parameters: [" + jobParameters + "]", t);rethrow(t);}}private void rethrow(Throwable t) {if (t instanceof RuntimeException) {throw (RuntimeException) t;}throw new RuntimeException(t);}});return jobExecution;}
执行job前先判断下job实例是否已经存在或能否重复执行。不满足条件就抛异常。这段代码很简单不用多讲。
2 job执行step
Java代码
- public class SimpleJob extends AbstractJob {
- public void execute(JobExecution execution) {
- ...
- getCompositeListener().beforeJob(execution);
- StepExecution lastStepExecution = handleSteps(steps, execution);
- ...
- try {
- getCompositeListener().afterJob(execution);
- }
- ...
- }
public class SimpleJob extends AbstractJob {public void execute(JobExecution execution) {...getCompositeListener().beforeJob(execution);StepExecution lastStepExecution = handleSteps(steps, execution);...try {getCompositeListener().afterJob(execution);}...}
代码很多,贴了一写关键的。在job执行step之前,会先执行JobExecutionListener的beforeJob方法。执行结束之后又会执行JobExecutionListener的afterJob方法。可以根据需要来决定这些方法干什么事情。如果这个job有多个step
那么程序会根据最后一个step的stepExecution来更新jobExecution.
3 step的执行
Java代码
- public abstract class AbstractStep implements Step, InitializingBean, BeanNameAware {
- /**
- * Template method for step execution logic - calls abstract methods for
- * resource initialization ({@link #open(ExecutionContext)}), execution
- * logic ({@link #doExecute(StepExecution)}) and resource closing ({@link #close(ExecutionContext)}).
- */
- public final void execute(StepExecution stepExecution) throws JobInterruptedException,
- UnexpectedJobExecutionException 『
- ...
- }
- }
public abstract class AbstractStep implements Step, InitializingBean, BeanNameAware {/*** Template method for step execution logic - calls abstract methods for* resource initialization ({@link #open(ExecutionContext)}), execution* logic ({@link #doExecute(StepExecution)}) and resource closing ({@link #close(ExecutionContext)}).*/public final void execute(StepExecution stepExecution) throws JobInterruptedException,UnexpectedJobExecutionException 『...}}
step就是执行这个方法干活的。首先是将数据文件放入流中:
Java代码- open(stepExecution.getExecutionContext());
open(stepExecution.getExecutionContext());
也就是调用 FlatFileItemReader的open方法,因为在配置文件中itemReader的实现类型就是FlatFileItemReader,在SimpleStepBeanFacotry的applyConfiguration中讲FlatFileItemReader配置到TaskletStep中。(详情请看代码)。
然后调用TaskletStep的doExecute方法
Java代码- exitStatus = doExecute(stepExecution);
exitStatus = doExecute(stepExecution);
看下该方法的内容(代码不贴了,请看源码)
Java代码- return stepOperations.iterate(ReapCallBack callback)
return stepOperations.iterate(ReapCallBack callback)
这个callback就是从流中读取commitInterval指定个数的数据并写入到数据库中它是调用tasklet.execute方法
Java代码- exitStatus = tasklet.execute(contribution, attributes);
exitStatus = tasklet.execute(contribution, attributes);
该tasklet类型是SimpleChunkOrientedTasklet,至于为什么是这个类型,就要看看SimpleStepBeanFactory的applyConfiguration方法了。
SimpleChunkOrientedTasklet的execute简单明了,不多说了。
这里有一点饶人的地方就是stepOperations.iterate了,看代码的时候需要明确的一点是stepOperations(RepeatTemplate)是使用CompletionPolicy接口来控制循环的,对于TaskletStep和SimpleChunkOrientedTasklet使用哪个CompletionPolicy来控制循环,那么还是要看下SimpleStepBeanFactory的
applyConfiguration方法。
上面介绍的就是一个job的执行过程。因为是第一次写。所以很多地方都写有说清楚,如果有疑问,请提出来。
附件是demo,要创建一个springbatch的数据就可以运行。当然这个demo有小陷阱(每次执行的时候会删除数据表重新创建)大家可以根据需要修改一下,当然如果每次都删除数据表的话,就看不到spring batch断点回复的功能了。
因为附件太大不能上传,各位只能自己去下了。