创建步骤的另外一种方式
ItemReader 可以理解为:数据获取。在 Step 中除了可以使用 Tasklet 创建简单的步骤,也可以使用 chunk + itemReader + itemWriter 创建一个复杂的步骤。其中:
- chunk 表示每几条数据进行一次批量处理
- itemReader 表示批量获取的数据怎么获取
- itemWriter 表示 chunk 中的数据进行怎样的输出(到文件、数据库或其他)
在使用这种方式创建步骤的时候,需要注意以下几点:
- chunk 需要指定:输入类型,输出类型、每多少条处理一次,如: <String, String>chunk(10); 表示 ItemReader 的输入类型为 String,ItemWriter 的输出类型为 String,每 10 条处理一次
- ItemReader 需要指定输入类型,如:ItemReader,指明获取到的数据为 String 类型
- ItemWriter 需要指定输出类型,如:ItemWriter,指明输出(到文件、数据库或其他)的类型为 String 类型
创建一个 String 集合类型的 ItemReader,并进行输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
@Configuration public class ItemReaderDemo { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; @Autowired public ItemReaderDemo(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) { this.jobBuilderFactory = jobBuilderFactory; this.stepBuilderFactory = stepBuilderFactory; } @Bean public Job itemReaderDemoJob() { return jobBuilderFactory.get("itemReaderDemoJob") .start(itemReaderDemoStep()) .build(); } @Bean public Step itemReaderDemoStep(){ return stepBuilderFactory.get("itemReaderDemoStep") .<String, String>chunk(2) .reader(myStringItemReader()) .writer( list -> { list.forEach(System.err::println); }) .build(); }
@Bean public MyStringItemReader myStringItemReader(){ List<String> data = Arrays.asList("Cat", "Dog", "Pig", "Duck"); return new MyStringItemReader(data); } }
public class MyStringItemReader implements ItemReader<String> { private Iterator<String> iterator; public MyStringItemReader(List<String> data) { this.iterator = data.iterator(); } @Override public String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { if (iterator.hasNext()) { return iterator.next(); } return null; } }
|
Listener
几个常用的 Listener
在上一篇博客中提到了 StepExecutionListener
,这个 Listener 可以在 Step 执行前后获取执行上下文。除此之外还有几个比较常用的 Listener:
- JobExecutionListener:在 Job 执行前后获取执行上下文
- ChunkListener:在 chunk 执行前后获取执行上下文
- ItemReadListener:在 ItemReader 执行前后获取执行上下文
- ItemWriterListener:在 ItemWriter 执行前后获取执行上下文
- ItemProcessListener:在 Processor 执行前后获取执行上下文(数据处理器)
在这些 Listener 中,都有 before、after 方法,便于在执行前后获取信息,在实现这些接口,并生成为 Spring bean
后,在需要的地方引入即可。
比较特殊的几个 Listener 方法:
- ChunkListsner:afterChunkError,在 chunk 发生错误时调用
- ItemReaderListener:onReadError,在 itemReader 发生错误时调用
- ItemWriterListener:onWriteError,在 ItemWriter 发生错误时调用
- ItemProcessListener:onProcessError,在处理器发生错误时调用
另外一种 Listener 实现方式
除了上述实现 xxListener 接口创建 Listener 之外,还有一种更为简单的方式实现 Listener:注解
- 实现 StepExecutionListener 可以使用: @BeforeStep、@AfterStep
- 实现 JobExecutionListener 可以使用:@BeforeJob、@AfterJob
- …
在这些注解中也有对应 Listener 特殊方法的注解:@AfterChunkError、@OnReadError、@OnWriteError、@OnProcessError
代码示例
实现接口方式创建 Listener
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
|
public class MyJobListener implements JobExecutionListener { @Override public void beforeJob(JobExecution jobExecution) { System.out.println("Job 执行之前,Job 名称:" + jobExecution.getJobInstance().getJobName()); }
@Override public void afterJob(JobExecution jobExecution) { System.out.println("Job 执行之后,Job 名称:" + jobExecution.getJobInstance().getJobName()); } }
|
注解方式创建 Listener
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class MyChunkListener {
@BeforeChunk public void before(ChunkContext chunkContext){ System.out.println("Step 执行之前,Step 名称:" + chunkContext.getStepContext().getStepName()); }
@AfterChunk public void after(ChunkContext chunkContext){ System.out.println("Step 执行之后,Step 名称:" + chunkContext.getStepContext().getStepName()); }
}
|
使用方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
|
@Configuration public class ListenerDemo { private final JobBuilderFactory jobBuilderFactory; private final StepBuilderFactory stepBuilderFactory; @Autowired public ListenerDemo(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) { this.jobBuilderFactory = jobBuilderFactory; this.stepBuilderFactory = stepBuilderFactory; } @Bean public Job listenerJob(){ return jobBuilderFactory.get("listenerJob") .start(listenerStep1()) .listener(new MyJobListener()) .build(); } @Bean public Step listenerStep1() { return stepBuilderFactory.get("listenerStep1") .<String, String>chunk(2) .faultTolerant() .listener(new MyChunkListener()) .reader(itemReader()) .writer(itemWriter()) .build();
} @Bean public ItemWriter<String> itemWriter(){ return new ItemWriter<String>() { @Override public void write(List<? extends String> items) throws Exception { items.forEach(System.err::println); } }; } @Bean public ItemReader<String> itemReader(){ return new ListItemReader<>(Arrays.asList("Java", "Python", "Swift", "MyBatis")); } }
|