说明一下schedule-fom的常见使用场景和使用方式
pom.xml1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>com.cowave.zoo</groupId> <artifactId>schedule-fom</artifactId> <version>2.7.6</version> </dependency> <!-— 需要依赖一下micrometer-core,版本根据使用的spring版本选择 —-> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-core</artifactId> </dependency>
|
1. 使用方式
需要通过 @EnableFom 进行声明,schedule-fom依赖于spring应用上下文
Main.java1 2 3 4 5 6 7 8
| @EnableFom(enableFomView=false) @Configuration public class Main { @SuppressWarnings("resource") public static void main(String[] args) { new AnnotationConfigApplicationContext("org.springframework.fom.test"); } }
|
比如定义一个简单的定时任务
SimpleSchedule.java1 2 3 4 5 6 7 8
| @Fom(cron = "0 */1 * * * ?") public class SimpleSchedule { @Schedule public void schedule() { } }
|
2. 定时场景
2.1. 单任务 SingleSchedule
单个定时任务比较简单,这里演示一下 ScheduleService,可以在任务过程中对配置的获取和修改,配置项 config.email 可以在界面进行修改和注入,在任务的执行过程中,也可以手动新增修改配置项config.sleep.time,然后在管理界面上查看和修改
SingleSchedule.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Fom(cron = "0 */1 * * * ?", remark = "定时单任务") public class SingleSchedule {
private static final Logger LOG = LoggerFactory.getLogger(SingleSchedule.class);
private final Random random = new Random();
@Value("${config.email:shanhm1991@163.com}") private String email;
@Autowired private ScheduleService scheduleService;
@Schedule public long exec() throws InterruptedException{ Integer sleepTime = scheduleService.getCurrentConfig("config.sleep.time"); if(sleepTime != null){ LOG.info("executing sleep..., email={}", email); Thread.sleep(sleepTime); }
sleepTime = random.nextInt(5000); scheduleService.putCurrentConfig("config.sleep.time", sleepTime); scheduleService.serializeCurrent(); return sleepTime; } }
|
2.2. 多任务 MultiSchedule
@Fom 标识的类,表示一个调度计划,使用 @Schedule 标识其中的方法,表示以这个方法作为任务来执行。可以标识多个方法,然后这些任务共用同一个调度计划
MultiSchedule.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Fom(fixedRate = 120000, threadCore = 2, remark = "定时多任务") public class MultiSchedule {
private static final Logger LOG = LoggerFactory.getLogger(MultiSchedule.class);
private final Random random = new Random();
@Schedule public long task1() throws InterruptedException{ long sleep = random.nextInt(5000); LOG.info("task executing ..."); Thread.sleep(sleep); return sleep; }
@Schedule public String task2() throws InterruptedException{ LOG.info("task executing ..."); Thread.sleep(random.nextInt(5000)); return "test"; } }
|
2.3. 批任务 BatchSchedule
实现接口 ScheduleFactory,可以用集合的方式来提交批任务,但是对于任务的抽象需要继承给定的FomTask
BatchSchedule.java1 2 3 4 5 6 7 8 9 10 11 12
| @Fom(fixedDelay = 180000, threadCore = 4, remark = "定时批任务") public class BatchSchedule implements ScheduleFactory<Long> {
@Override public List<BatchTask> newScheduleTasks() throws Exception { List<BatchTask> list = new ArrayList<>(); for(int i = 1; i <= 10; i++){ list.add(new BatchTask(i)); } return list; } }
|
2.4. 任务结束事件 CompleteSchedule
实现接口 CompleteHandler,可以在批任务结束时,执行一些自定义处理,在处理调用中可以拿到执行次数、时间、结果等信息
CompleteSchedule.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Fom(cron = "0 0/5 * * * ?", threadCore = 4, remark = "自定义任务结束处理") public class CompleteSchedule implements ScheduleFactory<Long>, CompleteHandler<Long> { private static final Logger LOG = LoggerFactory.getLogger(CompleteSchedule.class);
@Override public List<CompleteTask> newScheduleTasks() throws Exception { List<CompleteTask> list = new ArrayList<>(); for(int i = 1; i <= 10; i++){ list.add(new CompleteTask()); } return list; } @Override public void onComplete(long times, long lastTime, List<Result<Long>> results) throws Exception { String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lastExecTime); LOG.info( "onComplete:第{}次在{}提交的任务全部完成,结果省略...", times, date); } }
|
2.5. 任务超时检测 TimeoutSchedule
通过 taskOverTime 可以给任务指定超时时间,这样当任务超时后调度线程会尝试对其进行中断取消。默认是对每个任务单独进行超时检测,如果想对整体任务进行超时检测,可以设置detectTimeoutOnEachTask,这样将从第一个任务开始执行时计算时间,等到超时后就取消还没结束的任务,不管这个任务实际耗时多久,就算它还没有开始执行也会被取消
TimeoutSchedule.java1 2 3 4 5 6 7 8 9 10 11 12
| @Fom(cron = "0 0/7 * * * ?", threadCore = 4, taskOverTime = 3500, remark = "任务超时检测") public class TimeoutSchedule implements ScheduleFactory<Long>{ @Override public List<TimeoutTask> newScheduleTasks() throws Exception { List<TimeoutTask> list = new ArrayList<>(); for(int i = 1; i <= 15; i++){ list.add(new TimeoutTask()); } return list; } }
|
2.6. 任务取消事件 CancelSchedule
实现接口 TaskCancelHandler 可以自定义任务的取消策略,取消调用有两个时机:检测到任务超时,或者被外部shutdown关闭。如果想自定义shutdown之后的处理,比如释放一些资源,也可以实现接口TerminateHandler(有的场景中,可能要取消的任务无法响应中断,比如等待套接字返回,或者无限循环,此时只能通过关闭连接,或增加循环标志等办法来结束)
CancelSchedule.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Fom(cron = "0 0/11 * * * ?", execOnLoad = true, taskOverTime = 300000, remark = "自定义任务取消事件") public class CancelSchedule implements TaskCancelHandler, TerminateHandler { private static final Logger LOG = LoggerFactory.getLogger(CancelSchedule.class); private volatile boolean off = false;
@Schedule public void exec() { LOG.info("task executing ..."); while(!off){ try { Thread.sleep(60000); } catch (InterruptedException e) { } } }
@Override public void handleCancel(String taskId, long costTime) { off = true; LOG.info("handleCancel:任务被取消{},已经耗时{}ms", taskId, costTime); }
@Override public void onTerminate(long execTimes, long lastExecTime) { String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lastExecTime); LOG.info("onTerminate:任务关闭,共执行{}次任务,最后一次执行时间为{}", execTimes, date); } }
|
对于定义的FomTask,如果实现了接口TaskCancelHandler,同样也会在超时或者 shutdown 时进行调用
TimeoutTask.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class TimeoutTask extends FomTask<Long> implements TaskCancelHandler {
@Override public Long exec() throws InterruptedException { long sleep = new Random().nextInt(10000); logger.info("task executing ..."); Thread.sleep(sleep); return sleep; }
@Override public void handleCancel(String taskId, long costTime) throws Exception { logger.info("task handleCancel:取消任务[{}],耗时={}ms", taskId, costTime); } }
|
3. 非定时场景
也可以将 @Fom 的声明当成一个线程池来使用,同样支持一些接口功能,可用来提交执行任务
FomExecutor.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Fom(threadCore = 4, taskOverTime = 5000, enableTaskConflict = true, detectTimeoutOnEachTask = true) public class FomExecutor implements ResultHandler<Long>, TaskCancelHandler, CompleteHandler<Long> {
private static final Logger LOG = LoggerFactory.getLogger(FomExecutor.class);
@Override public void handleResult(FomTaskResult<Long> result) throws Exception { LOG.info("handleResult:统计任务[{}]的结果:{}", result.getTaskId(), result.getContent()); }
@Override public void handleCancel(String taskId, long costTime) throws Exception { LOG.info("handleCancel:取消任务[{}],耗时={}ms", taskId, costTime); }
@Override public void onComplete(long times, long lastTime, List<FomTaskResult<Long>> results) throws Exception { String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(lastTime); LOG.info("onComplete: 第{}次在{}提交的任务全部执行结束,结果数:{}", times, date, results.size()); } }
|
使用时实际注入类型ScheduleContext
ExecController.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @RestController @RequestMapping("/exec") public class ExecController {
@Autowired private ScheduleContext<Long> $fomExecutor;
@RequestMapping("/submit") public void submit(String tag) { List<Task<Long>> tasks = new ArrayList<>(); for(int i = 0; i < 10; i++){ tasks.add(new OtherTask(tag + i)); } $fomExecutor.submitBatch(tasks); } }
|
4. 任务管理
FomService中提供了一些接口支持任务的实时监控管理,如果接口功能不能满足,也可以直接获取ScheduleContext实例进行操作
ExecController.java1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @RestController @RequestMapping("/exec") public class ExecController {
@Autowired private FomService fomService;
@Autowired private List<ScheduleContext<?>> schedules;
@RequestMapping("/list") public List<ScheduleInfo> list(){ return fomService.list(); }
@RequestMapping("/count") public Integer count(){ return schedules.size(); } }
|
默认基于FomService提供了一个任务管理界面:http://[ip]:[port]/[context-path]/fom



对于失败和等待中的任务,也可以查看异常信息、创建时间等信息,这里不再赘述…
5. Metric指标
推荐使用Spring Boot Actuator,可以采集一些指标通过prometheus监控
pom.xml1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> <version>1.9.0</version> </dependency>
|
具体的指标内容示例:
Metric1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| fom_task_count_active{schedule="fomExecutor",} 0.0 # 活跃任务数 fom_task_count_waiting{schedule="fomExecutor",} 0.0 # 等待任务数 fom_schedule_count_times_total{schedule="$fomExecutor",} # 调度次数
fom_task_count_success_total{schedule="fomExecutor",task="xx9",} 1.0 # 任务成功数 fom_task_count_failed_total{schedule="fomExecutor",task="xx1",} 1.0 # 任务失败数
fom_task_cost_min{schedule="fomExecutor",task="xx9",} 3080.0 # 任务最小耗时 fom_task_cost_max{schedule="fomExecutor",task="xx9",} 3080.0 # 任务最大耗时 fom_task_cost_avg{schedule="fomExecutor",task="xx9",} 3080.0 # 任务平均耗时
# 任务耗时区间统计 fom_task_cost_summary_bucket{schedule="fomExecutor",task="xx9",le="250.0",} 0.0 fom_task_cost_summary_bucket{schedule="fomExecutor",task="xx9",le="500.0",} 0.0 fom_task_cost_summary_bucket{schedule="fomExecutor",task="xx9",le="1000.0",} 0.0 fom_task_cost_summary_bucket{schedule="fomExecutor",task="xx9",le="5000.0",} 1.0 fom_task_cost_summary_bucket{schedule="fomExecutor",task="xx9",le="10000.0",} 1.0 fom_task_cost_summary_bucket{schedule="fomExecutor",task="xx9",le="+Inf",} 1.0
|
参考: