
在Spring Boot中使用定时任务,只需要@EnableScheduling开启定时任务支持,在需要调度的方法上添加@Scheduled注解。这样就能够在项目中开启定时调度功能了,支持通过cron、fixedRate、fixedDelay等灵活的控制执行周期和频率。
1.1 缺点- 周期一旦指定,想要更改必须要重启应用
- 热更新定时任务的执行周期,基于cron表达式并支持外部存储,如数据库,nacos等
- 最小改造兼容现有的定时任务(仅需添加一个注解)
- 动态增加定时任务
2.1 @EnableScheduling 引入了配置类 SchedulingConfiguration
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@import(SchedulingConfiguration.class)
@documented
public @interface EnableScheduling {
}
2.2 SchedulingConfiguration只配置了一个bean,ScheduledAnnotationBeanPostProcessor从名字就知道该类实现BeanPostProcessor接口
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
2.3 ScheduledAnnotationBeanPostProcessor的postProcessAfterInitialization实现,可见具体处理@Scheduled实现定时任务的是processScheduled方法
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
Class> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
// 获取bean的方法及@Scheduled映射关系
Map> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.metadataLookup>) method -> {
Set scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// Non-empty set of methods
annotatedMethods.forEach((method, scheduledMethods) ->
// 处理@Scheduled注解
scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
2.4 以下仅贴出ScheduledAnnotationBeanPostProcessor.processScheduled处理cron表达式的关键实现,
private final ScheduledTaskRegistrar registrar;
public ScheduledAnnotationBeanPostProcessor() {
this.registrar = new ScheduledTaskRegistrar();
}
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
// 将定时任务方法,转为Runnable
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
Set tasks = new linkedHashSet<>(4);
// Determine initial delay
// 处理 scheduled.initialDelay()的值,略过...
// Check cron expression
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
// ${}变量值表达式的转换
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
} else {
timeZone = TimeZone.getDefault();
}
// 创建cron触发器CronTrigger对象,并注册CronTask
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}
// 处理fixedDelay和fixedRate,及ScheduledTask保存用于销毁,略过...
}
// 略过 catch Exception ...
}
以上通过this.registrar.scheduleCronTask实现cron定时任务注册或初始化
3.动态定时任务的实现实现思路: 重写ScheduledAnnotationBeanPostProcessor.processScheduled方法,修改处理cron的部分代码,使用this.registrar.scheduleTriggerTask注册或初始化定时任务
3.1 相关类图DisposableBean+destroy() : voidDynamicCronScheduleTaskManager+Map
import org.springframework.beans.factory.DisposableBean;
import org.springframework.scheduling.config.ScheduledTask;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.config.TriggerTask;
import java.util.HashMap;
import java.util.Map;
public class DynamicCronScheduleTaskManager implements DisposableBean {
private Map dynamicScheduledTaskMap = new HashMap<>();
ScheduledTaskRegistrar registrar;
// 添加定时任务
public ScheduledTask addTriggerTask(String cronName, TriggerTask task) {
ScheduledTask scheduledTask = dynamicScheduledTaskMap.get(cronName);
if (scheduledTask != null) {
scheduledTask.cancel();
}
scheduledTask = this.registrar.scheduleTriggerTask(task);
dynamicScheduledTaskMap.put(cronName, scheduledTask);
return scheduledTask;
}
public boolean contains(String cronName){
return this.dynamicScheduledTaskMap.containsKey(cronName);
}
// 更新定时任务的触发时机
public void updateTriggerTask(String cronName) {
ScheduledTask scheduledTask = dynamicScheduledTaskMap.get(cronName);
if (scheduledTask == null) {
throw new IllegalStateException("Invalid cronName "" + cronName + "",no fund ScheduledTask");
}
scheduledTask.cancel();
scheduledTask = this.registrar.scheduleTriggerTask((TriggerTask) scheduledTask.getTask());
dynamicScheduledTaskMap.put(cronName, scheduledTask);
}
// 移除定时任务
public void removeTriggerTask(String cronName) {
ScheduledTask scheduledTask = dynamicScheduledTaskMap.remove(cronName);
if (scheduledTask != null) {
scheduledTask.cancel();
}
}
@Override
public void destroy() throws Exception {
for (ScheduledTask value : dynamicScheduledTaskMap.values()) {
value.cancel();
}
this.dynamicScheduledTaskMap.clear();
}
}
3.3 AbstractDynamicCronHandler
public abstract class AbstractDynamicCronHandler {
@Autowired
protected DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager;
public abstract String getCronexpression(String cronName);
public void updateTriggerTask(String cronName) {
dynamicCronScheduleTaskManager.updateTriggerTask(cronName);
}
}
3.4 EnvironmentDynamicCronHandler
基于Environment,在刷新配置时,自动刷新定时任务的触发时机,支持分布式多节点集群部署。
如,cron表达式配置在nacos,更新nacos上的配置时由于监听了EnvironmentChangeEvent事件实现了定时任务的触发时机的更新
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.context.environment.EnvironmentChangeEvent;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.event.EventListener;
import org.springframework.core.env.Environment;
public class EnvironmentDynamicCronHandler extends AbstractDynamicCronHandler implements EnvironmentAware {
private final Logger logger = LoggerFactory.getLogger(EnvironmentDynamicCronHandler.class);
private Environment environment;
@Override
public String getCronexpression(String cronName) {
try {
return environment.getProperty(cronName);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return null;
}
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
@EventListener
public void environmentChangeEvent(EnvironmentChangeEvent event) {
for (String key : event.getKeys()) {
if (this.dynamicCronScheduleTaskManager.contains(key)) {
this.dynamicCronScheduleTaskManager.updateTriggerTask(key);
}
}
}
}
3.5 DynamicCronTrigger
public class DynamicCronTrigger implements Trigger {
private final static Logger LOGGER = LoggerFactory.getLogger(DynamicCronTrigger.class);
private String cronName;
private AbstractDynamicCronHandler dynamicCronHandler;
private String cronexpression;
private CronSequenceGenerator sequenceGenerator;
public DynamicCronTrigger(String cronName, AbstractDynamicCronHandler dynamicCronHandler) {
this.cronName = cronName;
this.dynamicCronHandler = dynamicCronHandler;
}
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
String cronexpression = dynamicCronHandler.getCronexpression(cronName);
if (cronexpression == null) {
return null;
}
if (this.sequenceGenerator == null || !cronexpression.equals(this.cronexpression)) {
try {
this.sequenceGenerator = new CronSequenceGenerator(cronexpression);
this.cronexpression = cronexpression;
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
Date date = triggerContext.lastCompletionTime();
if (date != null) {
Date scheduled = triggerContext.lastScheduledExecutionTime();
if (scheduled != null && date.before(scheduled)) {
// Previous task apparently executed too early...
// Let's simply use the last calculated execution time then,
// in order to prevent accidental re-fires in the same second.
date = scheduled;
}
} else {
date = new Date();
}
return this.sequenceGenerator.next(date);
}
}
3.6 注解类ScheduledDynamicCron
@Target({ElementType.METHOD,ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@documented
public @interface ScheduledDynamicCron {
@AliasFor("cronName")
String value() default "";
@AliasFor("value")
String cronName() default "";
Class extends AbstractDynamicCronHandler> handler() default EnvironmentDynamicCronHandler.class;
}
3.7 DynamicScheduledAnnotationBeanPostProcessor
import org.springframework.beans.factory.BeanFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;
import org.springframework.scheduling.config.*;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.util.StringValueResolver;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.linkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
public class DynamicScheduledAnnotationBeanPostProcessor extends ScheduledAnnotationBeanPostProcessor {
private StringValueResolver embeddedValueResolver;
private BeanFactory beanFactory;
private DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager;
private final ScheduledTaskRegistrar registrar = (ScheduledTaskRegistrar) getFieldValueFromParentClass("registrar");
private final Map
3.8 配置类SchedulerConfiguration
@EnableScheduling
public class SchedulerConfiguration implements SchedulingConfigurer {
@Value("${app.scheduler.thread.count:10}")
private int schedulerThreadCount;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
TaskScheduler taskScheduler = new ConcurrentTaskScheduler(new ScheduledThreadPoolExecutor(schedulerThreadCount));
taskRegistrar.setTaskScheduler(taskScheduler);
}
@Bean
public EnvironmentDynamicCronHandler environmentDynamicCronHandler() {
return new EnvironmentDynamicCronHandler();
}
@Bean
public DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager() {
return new DynamicCronScheduleTaskManager();
}
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@Primary
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor(DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager) {
return new RedisScheduledAnnotationBeanPostProcessor(dynamicCronScheduleTaskManager);
}
}
4. Spring boot使用示例
4.1 配置方式,在启动类中导入配置类SchedulerConfiguration
如下示例:
@SpringBootApplication
@import(SchedulerConfiguration.class) // 导入配置类
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
4.2 nacos配置cron示例
nacos的配置
cron: name1: "0/5 * * * * ?" # 每5秒执行一次
使用@ScheduledDynamicCron指定cron表达式的配置名cron.name1,不指定handler()默认使用EnvironmentDynamicCronHandler,该类会根据指定的配置名cron.name1获取nacos上的cron表达式
@Component
public class DynamicTask {
@Scheduled
@ScheduledDynamicCron("cron.name1")
public void dynamicCronForEnvironment() {
System.out.println("dynamicCronForEnvironment:" + DateUtils.format(LocalDateTime.now()));
}
}
- @Scheduled仍需要添加,但会忽略其中的cron属性配置
- 修改nacos的cron.name1配置为0/2 * * * * ?并发布,定时任务会立即由原来的5秒执行一次,变为2秒执行一次
扩展AbstractDynamicCronHandler,实现从数据库查询cron表达式
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class DynamicCronHandler extends AbstractDynamicCronHandler {
@Autowired
private ParametersService parametersService;
@Override
public String getCronexpression(String cronName) {
// 通过cronName查询保存在数据库的cron表达式
Parameters ap = parametersService.getByName(cronName);
return ap.getValue();
}
}
定时任务类
@Component
public class DynamicTask {
@Scheduled
// 需要指定handler为上面定义的DynamicCronHandler
@ScheduledDynamicCron(cronName = "cron.name2", handler = DynamicCronHandler.class)
public void dynamicCronForDb() {
System.out.println("dynamicCronForDb:" + LocalDateTime.now());
}
}
定时任务触发时机更新,需要在更新数据库配置时进行更新
@RestController
@RequestMapping("parameters")
public class ParametersController {
@Autowired
private ParametersService parametersService;
@Autowired
private DynamicCronHandler dynamicCronHandler;
@PostMapping("/update")
public Result update(Parameters parameters){
if (parametersService.update(parameters)) {
if ("cron.name2".equals(parameters.getName())) {
// 更新数据库配置后,更新定时任务的触发时机
dynamicCronHandler.updateTriggerTask(cronName);
}
}
return Result.success();
}
}
4.4 分布式集群部署服务的定时任务的更新
上面更新数据库配置后,同步更新任务的触发时机,仅在本服务生效,集群中的其他服务节点并不会更新
其他节点的更新可以通过消息总线的方式进行更新,如通过MQ发送广播消息,其它服务节点消费消息后调用以下方法更新任务触发时机
dynamicCronHandler.updateTriggerTask(cronName);4.5 添加定时任务
添加任务的web接口
@RestController
@RequestMapping("dynamicCron")
public class DynamicCronController {
@Autowired
private DynamicCronScheduleTaskManager dynamicCronScheduleTaskManager;
@Autowired
private EnvironmentDynamicCronHandler environmentDynamicCronHandler;
@PostMapping("/addTask")
public Result addTask(String cronName){
// 创建要定时运行的Runnable
Runnable runnable = () -> System.out.println("run task:" + LocalDateTime.now());
// 使用EnvironmentDynamicCronHandler,创建触发器
DynamicCronTrigger trigger = new DynamicCronTrigger(cronName, environmentDynamicCronHandler);
// 添加定时任务
dynamicCronScheduleTaskManager.addTriggerTask(cronName, new TriggerTask(runnable, trigger));
return Result.success();
}
}
接口执行完成后,定时任务并不会执行,因为还没配置cron.name2,在nacos配置cron表达式后,定时任务将开始调度
配置nacos后的控制台输出
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)