阿里云开发者社区

电脑版
提示:原网页已由神马搜索转码, 内容由developer.aliyun.com提供.

Spring Batch学习记录及示例项目代码

2024-03-1440
版权
版权声明:
本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《 阿里云开发者社区用户服务协议》和 《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写 侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介:Spring Batch学习记录及示例项目代码

在这里插入图片描述

Spring Batch学习记录总结

关于Spring Batch的相关知识点参考:https://mp.weixin.qq.com/s/OUMwyo2EopXkSHGn2OlghQ

Spring Batch的一些基本概念:

在这里插入图片描述
  • ItemReader:对资源的读处理,如从数据库查询、文件读取、变量读取等。

  • ItemProcessor:对读取的数据进行处理,可以实现自己的业务逻辑操作来对数据处理,如对数据进行计算、逻辑处理、格式转换等。

  • ItemWriter:对资源的写处理,如写入数据库、写入文件、打印日志等。

  • Step:一个完整的批处理步骤,一个Step是由ItemReader、ItemProcessor、ItemWriter三部分组成。

  • Job:代表一个完整的批处理过程,一个Job由一个或多个Step组成。

    在这里插入图片描述
  • Listener:监听器,可以对Step、Job状态进行监听,我们可以实现监听方法,对其进行一些逻辑处理,如打印日志等。

  • JobLauncher:负责启动Job。

  • JobRepository:存储关于配置和执行的Job(作业)的元数据。

简单使用

1、创建一个SpringBoot项目。

2、在项目中创建包config,并在该包下面创建一个Configuration类。

在类上面加上@Configuration;@EnableBatchProcessing;两个注解。

    @Configuration
    @EnableBatchProcessing
    public class SingleJobConfiguration {

@Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; /**
* 程序执行的入口
* @return
*/
@Bean public Job helloJob(){

return jobBuilderFactory.get("hellojpb-1") .start(step1()) .build(); } @Bean public Step step1() {

return stepBuilderFactory.get("step-1").tasklet( new Tasklet() {

@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

System.out.println("hello spring batch"); return RepeatStatus.FINISHED; } } ).build(); } }

3、现在启动项目,发现控制台出现以下错误信息。

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-05-22 13:00:10.396 ERROR 15548 --- [ main] o.s.b.d.LoggingFailureAnalysisReporter :
***************************
APPLICATION FAILED TO START
***************************
Description:
Failed to configure a DataSource: 'url' attribute is not specified and no embedded datasource could be configured.
Reason: Failed to determine a suitable driver class


​ Action:

Consider the following:
If you want an embedded database (H2, HSQL or Derby), please put it on the classpath.
If you have database settings to be loaded from a particular profile you may need to activate it (no profiles are currently active).

解决方法:由于Spring Batch在运行的时候需要数据库来存储一些具体的信息;因此我们需要配置具体的数据库信息。

方式一:配置内存数据库H2。

         <dependency>
                <groupId>com.h2database</groupId>
                <artifactId>h2</artifactId>
                <scope>runtime</scope>
         </dependency>

方式二:配置Mysql数据库。

            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.25</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jdbc</artifactId>
            </dependency>

Mysql数据库连接信息:

    spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?useSSL=false spring.datasource.username=root
spring.datasource.password=123456 spring.datasource.schema=classpath:/org/springframework/batch/core/schema-mysql.sql
spring.batch.initialize-schema=always

Spring Batch具体分析

  • Step有两种实现方式。一个是tasklet的方式;一个是chunk方式。

    tasklet方式:

    在Step中,执行单个任务,Job有多个Step按照一定的顺序来组成的,每个步骤执行一个具体的任务。

    chunk方式:

    该方法是基于数据块(一部分数据)执行的,每个任务又都可以分为Read-Process-Write。

  • Spring Batch框架主要有四个角色:
    JobLauncher:任务启动器,通过它来启动任务,可以看作程序的入口。
    Job:代表一个具体的任务。
    Step:代表一个具体的步骤。一个Job有1个或多个Step构成。
    JobRepository:是存储数据的地方,可以看作一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等信息。

Step的两种实现方式:

   //方式1   chunk
        @Bean
        Step step() {

return stepBuilderFactory.get(STEP) .<Music, Music>chunk(2) .reader(itemReader()) .writer(itemWriter()) .build(); } //方式2 tasklet @Bean public Step step1() {

return stepBuilderFactory.get("step-1").tasklet( new Tasklet() {

@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

System.out.println("hello spring batch"); return RepeatStatus.FINISHED; } } ).build(); }

Flow的创建和使用

  • Flow是多个Step的集合。
  • 可以被多个Job复用。
  • 使用FlowBuilder来创建。
    @Configuration
    @EnableBatchProcessing
    public class FlowDemo {

@Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; /**
* 创建 Step1
* @return
*/
@Bean public Step flowDemoStep1() {

return stepBuilderFactory.get("step-1").tasklet( new Tasklet() {

@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

System.out.println("hello spring flowDemoStep1"); return RepeatStatus.FINISHED; } } ).build(); } /**
* 创建Step2
* @return
*/
@Bean public Step flowDemoStep2() {

return stepBuilderFactory.get("step-2").tasklet( new Tasklet() {

@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

System.out.println("hello spring flowDemoStep2"); return RepeatStatus.FINISHED; } } ).build(); } /**
* 创建Step3
* @return
*/
@Bean public Step flowDemoStep3() {

return stepBuilderFactory.get("step-3").tasklet( new Tasklet() {

@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

System.out.println("hello spring flowDemoStep3"); return RepeatStatus.FINISHED; } } ).build(); } /**
* 创建Flow 对象 ,指明Flow对象包含哪些Step
*/
public Flow flowDemoFlow(){

return new FlowBuilder<Flow>("flowDemoFlow") .start(flowDemoStep1()) .next(flowDemoStep2()) .build(); } /**
* 创建Job对象
* @return
*/
@Bean public Job job(){

return jobBuilderFactory.get("flowjob") .start(flowDemoFlow()) .next(flowDemoStep3()) .end() .build(); }

split实现并发执行

实现任务中的多个Step或多个flow并发执行。

关键代码:

    @Configuration
    @EnableBatchProcessing
    public class SplitDemo {

@Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; /**
* 创建 Step1
* @return
*/
@Bean public Step splitDemoStep1() {

return stepBuilderFactory.get("step-1").tasklet( new Tasklet() {

@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

System.out.println("hello spring splitDemoStep1"); return RepeatStatus.FINISHED; } } ).build(); } /**
* 创建Step2
* @return
*/
@Bean public Step splitDemoStep2() {

return stepBuilderFactory.get("step-2").tasklet( new Tasklet() {

@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

System.out.println("hello spring splitDemoStep2"); return RepeatStatus.FINISHED; } } ).build(); } /**
* 创建Step3
* @return
*/
@Bean public Step splitDemoStep3() {

return stepBuilderFactory.get("step-3").tasklet( new Tasklet() {

@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

System.out.println("hello spring splitDemoStep3"); return RepeatStatus.FINISHED; } } ).build(); }

创建Flow 对象 ,指明Flow对象包含哪些Step:

      public Flow  splitDemoFlow1(){

return new FlowBuilder<Flow>("splitDemoFlow1").start(splitDemoStep1()).build();}public Flow splitDemoFlow2(){

return new FlowBuilder<Flow>("splitDemoFlow2") .start(splitDemoStep2()) .next(splitDemoStep3()) .build(); }
        /**
* 创建任务
* @return
*/
@Bean public Job job(){

return jobBuilderFactory.get("splitDemoJob") .start(splitDemoFlow1()) .split(new SimpleAsyncTaskExecutor()) .add(splitDemoFlow2()) .end() .build(); } }

决策器的使用

接口:JobExecutionDecider

自定义决策器:MyDecider.java,实现JobExecutionDecider接口并且重写decide方法

public class MyDecider implements JobExecutionDecider {

private int count; @Override public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {

count ++; if(count % 2 == 0){

return new FlowExecutionStatus("even"); }else {

return new FlowExecutionStatus("odd"); } } }
@Configuration
@EnableBatchProcessing
public class DeciderDemo {

@Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Bean public Step deciderDemoStep1(){

return stepBuilderFactory.get("deciderDemoStep1").tasklet(new Tasklet() {

@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

System.out.println("hello spring deciderDemoStep1"); return RepeatStatus.FINISHED; } }).build(); } @Bean public Step deciderDemoStep2() {

return stepBuilderFactory.get("deciderDemoStep2").tasklet( new Tasklet() {

@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

System.out.println("even"); return RepeatStatus.FINISHED; } } ).build(); } @Bean public Step deciderDemoStep3() {

return stepBuilderFactory.get("deciderDemoStep3").tasklet( new Tasklet() {

@Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

System.out.println("odd"); return RepeatStatus.FINISHED; } } ).build(); } /**
* 创建决策器
* @return
*/
@Bean public JobExecutionDecider myDecider(){

return new MyDecider(); } @Bean public Job deciderDemoJob(){

return jobBuilderFactory.get("deciderDemoJob") .start(deciderDemoStep1()) .next(myDecider()) .from(myDecider()).on("even").to(deciderDemoStep2()) .from(myDecider()).on("odd").to(deciderDemoStep3()) .from(deciderDemoStep3()).on("*").to(myDecider()) .end() .build(); } }

完整例子

主要代码如下:
1、项目总体结构图。

在这里插入图片描述

2、在pom.xml文件中引入依赖。
        <dependency>
            <groupId>com.h2database</groupId>
            <artifactId>h2</artifactId>
            <scope>runtime</scope>
        </dependency>
        <!--引入Spring Batch-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>
       <!--引入mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>
        <!--引入jdbc-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
       <!-- 引入mybatis-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
<!--        引入其他工具包-->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.7</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

3、配置数据库的连接信息。

server:
port: 8081
# 配置Mysql连接信息
spring:
datasource:
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/xk_test?useSSL=false
username: root
password: root
schema: classpath:/org/springframework/batch/core/schema-mysql.sql
batch:
initialize-schema: always
job:
enabled: false

4、读数据CityDataReader.java

@Service
@Slf4j
@StepScope
public class CityDataReader implements ItemReader<City> {

@Autowired private CityService cityService; List<City> cityInfoLists = null; private int nextCityIndex = 0; @Override public City read() throws Exception {

this.init(); City city = null; if(nextCityIndex < cityInfoLists.size()){

city = cityInfoLists.get(nextCityIndex); nextCityIndex ++; return city; } return null; } private void init() {

cityInfoLists = cityService.getCityInfoLists(); } }

5、处理数据CityDataProcess.java

@Slf4j
@Service
@StepScope
public class CityDataProcess implements ItemProcessor<City,City> {

@Value("#{jobParameters['startDate']}") private Date date; @Value("#{jobParameters['name']}") private String name; @Override public City process(City city) throws Exception {

log.info("开始处理第{}条数据----参数{}:{}.........",city.getId(),date,name); String detail = city.getProvince() + "-" + city.getCity() + "-" + city.getDistrict(); city.setDetail(detail); log.info("第{}条数据已处理完成......",city.getId()); return city; } }

6、写数据CityDataWriter.java

@Service
@StepScope
@Slf4j
public class CityDataWriter implements ItemWriter<City> {

@Autowired private CityService cityService; @Value("#{jobParameters['startDate']}") private Date date; @Value("#{jobParameters['name']}") private String name; @Override public void write(List<? extends City> cities) throws Exception {

for(City city : cities) {

log.info("第{}条数据开始更新.........}", city.getId()); int update = cityService.update(city); if (update > 0 ) {

log.info("第{}条数据更新完成.........}", city.getId()); } } } }

7、配置Job。JobConfig.java

@Configuration
public class JobConfig {

private static final Logger log = LoggerFactory.getLogger(SimpleJobConfig.class); @Autowired JobBuilderFactory jobBuilderFactory; @Autowired StepBuilderFactory stepBuilderFactory; @Autowired SqlSessionFactory sqlSessionFactory; @Autowired private CityDataReader cityDataReader; @Autowired private CityDataProcess cityDataProcess; @Autowired private CityDataWriter cityDataWriter; @Autowired private MyTasketOne myTasketOne; //配置一个Job @Bean("singleJob") public Job singleStepJob() {

return jobBuilderFactory.get("singleJob") .start(singlestep()) .next(singlestep2()) .listener(new MyJobExecutionListener()) .build(); } //配置第一个处理Step @Bean public Step singlestep() {

return stepBuilderFactory.get("singlestep") .<City,City>chunk(9) .reader(cityDataReader) // 读数据逻辑 .processor(cityDataProcess) //数据处理逻辑——业务处理 .writer(cityDataWriter) //写数据逻辑 .listener(new MyStepExecutionListener()) //配置监听器 .build(); } //配置第二个处理Step @Bean public Step singlestep2() {

return stepBuilderFactory.get("singlestep2") .tasklet(myTasketOne) .listener(new MyStepExecutionListener()) //配置监听器 .build(); } }

8、创建controller,启动Job。

@RestController
@RequestMapping("batch")
public class JobStartController {

@Autowired private Job singleJob; @Autowired private JobLauncher jobLauncher; /**
* 启动 Job
* @return
* @throws Exception
*/
@GetMapping("/start") public String invokeStep() throws Exception {

JobParameters jobParameters = new JobParametersBuilder() .addDate("startDate",DateUtil.date(System.currentTimeMillis())) .addString("name","xxkfz") .toJobParameters(); jobLauncher.run(singleJob, jobParameters); return "The job is Run start......."; } }

9、主启动类上加上@EnableBatchProcessing注解。
10、测试。
访问:http://127.0.0.1:8081/batch/start启动Job。

在这里插入图片描述

在这里插入图片描述

本案例项目代码获取方式:关注下方二维码,私信回复【Spring Batch学习项目】即可获取哦
相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助& nbsp;& nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
3天前
|
Java数据安全/隐私保护网络架构
一个简单的示例在spring boot中实现国际化
一个简单的示例在spring boot中实现国际化
1200
|
3天前
|
JavaScriptJava测试技术
基于SpringBoot+Vue的学生网课学习效果评价的详细设计和实现(源码+lw+部署文档+讲解等)
基于SpringBoot+Vue的学生网课学习效果评价的详细设计和实现(源码+lw+部署文档+讲解等)
|
4天前
|
安全算法Java
在Spring Boot项目中集成Jasypt(Java Simplified Encryption)
在Spring Boot项目中集成Jasypt(Java Simplified Encryption)
2077
|
4天前
|
消息中间件JavaKafka
集成Kafka到Spring Boot项目中的步骤和配置
集成Kafka到Spring Boot项目中的步骤和配置
2577
|
3天前
|
设计模式前端开发Java
【Spring MVC】快速学习使用Spring MVC的注解及三层架构
【Spring MVC】快速学习使用Spring MVC的注解及三层架构
|
4天前
|
JavaAPISpring
集成EasyPoi(一个基于POI的Excel导入导出工具)到Spring Boot项目中
集成EasyPoi(一个基于POI的Excel导入导出工具)到Spring Boot项目中
2011
|
3天前
|
前端开发搜索推荐Java
【Spring Boot】深度复盘在开发搜索引擎项目中重难点的整理,以及遇到的困难和总结
【Spring Boot】深度复盘在开发搜索引擎项目中重难点的整理,以及遇到的困难和总结
|
3天前
|
安全Java数据安全/隐私保护
上手spring boot项目(二)之spring boot整合shiro安全框架
上手spring boot项目(二)之spring boot整合shiro安全框架
|
1月前
|
存储JSONJava
SpringBoot集成AOP实现每个接口请求参数和返回参数并记录每个接口请求时间
SpringBoot集成AOP实现每个接口请求参数和返回参数并记录每个接口请求时间
|
1月前
|
前端开发Java应用服务中间件
Springboot对MVC、tomcat扩展配置
Springboot对MVC、tomcat扩展配置

热门文章

最新文章