原创

Spring(28)——Task抽象

Spring为异步执行任务和定时任务抽象了TaskExecutor接口和TaskScheduler接口,Spring之所以进行这样的抽象是为了在其内部统一任务调度的接口。TaskExecutor和JDK自带的Executor有点类似,只定义了一个execute(),用来执行一个任务,至于对应的任务怎么调度的,则由具体的实现类来实现,比如可以使用一个新的线程,或者使用一个线程池来调度。而实际上TaskExecutor接口也是继承了JDK的Executor接口的。

常见的TaskExecutor实现类

  • SimpleAsyncTaskExecutor 每次调度的时候都会启用一个新的线程执行任务。
  • ConcurrentTaskExecutor 接收一个java.util.concurrent.Executor对象作为参数,然后执行任务的时候会使用内部的java.util.concurrent.Executor调度。
  • ThreadPoolTaskExecutor TaskExecutor的线程池实现,类似于JDK的ThreadPoolExecutor,可以进行线程池的大小定义等。
  • ThreadPoolTaskScheduler 可以定时执行任务的实现,同时实现了TaskExecutor接口和TaskScheduler接口。

使用ThreadPoolTaskExecutor

使用ThreadPoolTaskExecutor时可以自己在程序中new对象,这样我们可以对它进行充分的自定义,比如:

ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(5);
taskExecutor.setMaxPoolSize(10);
taskExecutor.setQueueCapacity(20);
taskExecutor.initialize();
for (int i=0; i<100; i++) {
    TimeUnit.MILLISECONDS.sleep(500);
    taskExecutor.execute(() -> {
        System.out.println(Thread.currentThread() + "--------" + LocalDateTime.now());
    });
}
TimeUnit.SECONDS.sleep(20);

也可以把它定义为Spring的一个bean:

<bean id="myExecutor" 
    class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"
    p:core-pool-size="10"
    p:maxPoolSize="50"
    p:queueCapacity="10000"
    />

也可以通过task这个namespace定义,这需要先引入task命名空间。

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
     http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.1.xsd">

然后就可以通过<task:executor/>来定义一个ThreadPoolTaskExecutor对象了。下面的代码就定义了一个id为myExecutor的bean,其类型是ThreadPoolTaskExecutor。keep-alive指定当创建的线程超过了核心线程数之后,线程在销毁前的最大闲置时间,单位是秒。也就是说超过了核心线程数后创建的线程在闲置了120秒后就将被销毁,不指定keep-alive时默认的最大闲置时间是60秒。通过pool-size指定了核心线程数是10,最大线程数也是10。如果需要指定最大线程数,则可以采用min-max的形式指定,比如10-50则表示核心线程数是10,最大线程数是50。如果不指定pool-size则默认的核心线程数是1,最大线程数是不限制。最大线程数只有在队列容量不是无限制的时候才有用。 通过queue-capacity指定任务队列的大小为10000,不指定queue-capacity则表示队列大小无限制,可以任意增长。通过rejection-policy指定了当线程池的任务队列满了以后将使用当前线程来调用任务,可选值还有ABORT(抛出异常)、DISCARD(忽略当前任务)和DISCARD_OLDEST(忽略在队列中最老的任务,即存放最久的任务)。

<task:executor 
    id="myExecutor" 
    keep-alive="120" 
    pool-size="10" 
    queue-capacity="10000" 
    rejection-policy="CALLER_RUNS"
    />

使用ThreadPoolTaskScheduler

ThreadPoolTaskScheduler类似于JDK的ScheduledThreadPoolExecutor,也是可以直接在程序中使用的。比如下面就定义了一个ThreadPoolTaskScheduler,其接收一个定时任务,每秒打印一次当前时间。

ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10);
scheduler.initialize();
scheduler.scheduleAtFixedRate(() -> {
    System.out.println(LocalDateTime.now());
}, 1000);
TimeUnit.SECONDS.sleep(30);

ThreadPoolTaskScheduler也可以通过task命名空间来定义,下面的代码就定义了一个id为myScheduler的ThreadPoolTaskScheduler,对应的线程池大小是10。pool-size不指定时默认是1。

<task:scheduler id="myScheduler" pool-size="10"/>

通过<task:scheduled-tasks/>定义定时任务

可以通过<task:scheduled-tasks/>来定义定时任务调度器,执行定时任务的ThreadPoolTaskScheduler可以通过属性scheduler来指定,当未指定scheduler属性时,默认将创建一个线程池大小为1的ThreadPoolTaskScheduler来执行任务。需要执行的任务可以通过<task:scheduled/>来定义,其必须指定的参数是ref和method,ref用来指定定时任务方法对应的对象,method用来指定定时任务调度的方法。然后可以选择性的通过cron、fixed-delay、fixed-rate来定义任务的调度时机,fixed-delay和fixed-rate还可以配合initial-delay一起使用,用来指定初次任务调度的延迟时间。如果有需要也可以自己通过trigger属性,指定触发任务调度的org.springframework.scheduling.Trigger对象。实际上最底层调度的时候都是转换为Trigger对象进行调度的,比如CronTrigger或PeriodicTrigger。如果有需要也可以实现自己的Trigger对象。

<bean id="scheduledTasks" class="com.elim.spring.task.ScheduledTasks"/>
<task:scheduled-tasks scheduler="myScheduler">
    <!-- 每5秒中执行一次 -->
    <task:scheduled ref="scheduledTasks" method="printTime" cron="*/5 * * * * ?"/>
    <!-- 每秒执行一次,相当于scheduleWithFixedDelay() -->
    <task:scheduled ref="scheduledTasks" method="printTime" fixed-delay="1000"/>
    <!-- 每秒钟执行一次,初次调度将在5秒后进行,相当于scheduleAtFixedRate() -->
    <task:scheduled ref="scheduledTasks" method="printTime" fixed-rate="1000" initial-delay="5000"/>
</task:scheduled-tasks>

基于注解的支持

可以通过<task:annotation-driven/>启用基于注解的支持,这样就可以在程序中使用@Async标注方法将进行异步调用,使用@Scheduled标注方法将进行定时调度。可以通过executor属性指定在进行@Async标注的方法时默认将使用的TaskExecutor,如果不指定默认将使用一个SimpleAsyncTaskExecutor实例。可以通过scheduler属性指定在进行@Scheduled标注的方法进行调度时将使用的TaskScheduler,如果不指定时默认将使用只有一个线程的TaskScheduler进行调度。

<task:annotation-driven executor="myExecutor" scheduler="myScheduler" />

由于在进行基于注解的异步调用时,对应的方法的调用完全由Spring来接管,异步调用方法中抛出的异常是不会被调用程序捕获到的,Spring默认情况下会将捕获的异常进行异常日志输出,由SimpleAsyncUncaughtExceptionHandler处理。如果有需要程序可以通过实现AsyncUncaughtExceptionHandler接口实现自定义的异常处理器,然后通过<task:annotation-driven/>exception-handler属性指定AsyncUncaughtExceptionHandler对应的bean。

<task:annotation-driven 
    executor="myExecutor" 
    scheduler="myScheduler" 
    exception-handler="myAsyncExceptionHandler"/>

如果是使用基于Java类的配置,则要启用基于注解的异步调用时通过@EnableAsync启用,通过@EnableScheduling启用基于注解的定时调度。需要自定义默认的异常处理器和TaskExecutor,则可以通过使@Configuration类实现AsyncConfigurer接口进行相应的自定义。

@Async

使用@Async标注的方法的异步调用的功能由Spring AOP实现,由AsyncAnnotationBeanPostProcessor定义。所以@Async标注的方法必须是public方法,且必须是外部调用。比如下面代码中的AnnotationTasks这个bean,如果在外部调用其testAsync()方法,则该方法将由配置的默认的TaskExecutor发起异步调用,但是testAsync()内部调用的async()方法则不会发起异步调用,而是对于testAsync()而言的同步调用。

@Component
public class AnnotationTasks {

    @Async
    public void testAsync() {
        long t1 = System.currentTimeMillis();
        this.async();
        long t2 = System.currentTimeMillis();
        System.out.println(Thread.currentThread() + " 耗时:" + (t2 - t1));
    }
    
    @Async
    private void async() {
        try {
            TimeUnit.SECONDS.sleep(3);
            System.out.println(Thread.currentThread() + "-----------");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
}

默认的异步调用是基于Spring Aop进行的,可以通过<task:annotation-driven/>的mode属性指定是否需要使用Aspectj的支持。使用Spring Aop代理时又可以通过proxy-target-class属性指定是否需要强制使用CGLIB做基于Class的代理。

<task:annotation-driven 
    executor="myExecutor" 
    scheduler="myScheduler" 
    exception-handler="myAsyncExceptionHandler" 
    mode="proxy"
    proxy-target-class="false"/>

@Async标注的方法默认会被<task:annotation-driven/>指定的TaskExecutor执行,如果需要针对某个特定的异步执行使用一个特定的TaskExecutor,则可以通过@Async的value属性指定需要使用的TaskExecutor对应的bean名称。如下代码则指定了在异步调用该方法时将使用名称为executor2的TaskExecutor。

@Async("executor2")
public void testAsync2() {
    try {
        TimeUnit.SECONDS.sleep(3);
        System.out.println(Thread.currentThread() + "-----------");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

@Async也可以是定义在Class上的,定义在Class上时表示该Class中所有的方法在被外部调用时都将异步调用。

@Component
@Async
public class AsyncClass {

    public void print() {
        System.out.println(Thread.currentThread());
    }
    
}

@Scheduled

需要指定一个方法为定时调度时可以通过使用Scheduled标注,该注解将由ScheduledAnnotationBeanPostProcessor负责解析并发起定时调度,其内部会由ScheduledTaskRegistrar定义任务,如有需要我们也可以使用它来进行自定义定时任务调度的封装。Scheduled的定义如下,可以指定的参数的含义都非常清晰,就不做过多的介绍了。与XML定义不同的是,fixedDelay、fixedRate等多了可以使用字符串定义的形式,这就允许我们可以通过使用占位符来进行定义。

public @interface Scheduled {

    String cron() default "";

    String zone() default "";

    long fixedDelay() default -1;

    String fixedDelayString() default "";

    long fixedRate() default -1;

    String fixedRateString() default "";

    long initialDelay() default -1;

    String initialDelayString() default "";

}

如下代码就定义了该方法将每5秒钟执行一次。

@Scheduled(fixedRate = 5000)
public void schdule() {
    System.out.println("当前时间是:" + LocalDateTime.now());
}

(注:本文是基于Spring4.1.0所写)

 

正文到此结束
本文目录