你的位置:首页 > 信息动态 > 新闻中心
信息动态
联系我们

27 后台任务的中断

2021/12/31 20:05:52

前言

在日常开发中, 我们可能有一些耗时的任务是在后台异步处理, 完成之后 前端轮询 或者 后端主动推送 通知到前端 

通常在这样的任务中, 有些时候我们可能需要中断一些 耗时处理的任务, 比如我们做到一半 突然不想做了, 那么我们可以把它中断掉 

通常来说 这样的异步任务 应该会有一个线程池, 有这个线程池里面的线程来处理这些异步任务[比如 spring 的 @Async 可以指定执行的 ExecutorService]

呵呵 这里说的大概就是这样的一个事情, 我这边的处理 也是一个相对来说 比较简单的处理 

整理一下这个流程, 用户 能够感知到的流程 大概如下 

1. 操作1 触发创建了一个后台的异步任务1 

2. 异步任务1 开始执行 

3. 操作2 触发取消 异步任务1 

InterruptUtils

那么 我们程序如何设计呢? 程序中的流程又是怎么样的呢? 

我们先来梳理一下程序上的大致的流程  

1. 操作1 触发创建了一个后台的异步任务1, 这个 异步任务1 一般来说会关联一个标记, 要么是这个任务会对应一条记录, 要么是 为这个记录生成一个 id 返回回去 

2. 异步任务1 开始执行, 这个由具体的异步调度框架去处理, 比如 线程池, 调度线程池, 定时任务  等等 

3. 操作2 触发取消 异步任务1, 这里 操作2 在程序上来看, 只能为当前这个 异步任务1的id 打上一个标记 

4. 异步任务1, 在一些点埋点检测标记, 如果当前 异步任务 被中断, 则抛出异常, 中断流程继续向下走, finally 里面做好相关的清理工作 

那么上面的流程会涉及到几个操作, 比如 标记异步任务1 被中断, 比如 检测异步任务是否被中断, 如果中断抛出异常 

针对这个 我设计了一个 InterruptUtils, 来处理, 提供了几个 api 用于复用, 我这里是使用了 redis 作为媒介来存储, 读取中断的相关信息 

markInterrupt : 标记给定的记录类型下面的记录对应的任务被中断 

isInterrupted : 查询给定的记录类型下面的记录对应的任务是否被中断

doInterrupt : 查询给定的记录类型下面的记录对应的任务是否被中断, 如果被中断抛出异常, 并记录日志信息 

public final class InterruptUtils {

    /**
     * 中断给定给定的记录
     *
     * @param recordClazz recordClazz
     * @param recordId    recordId
     * @return void
     * @author Jerry.X.He
     * @date 2021-02-03 15:03
     */
    public static Long markInterrupt(Class recordClazz, String recordId) {
        String recordType = recordClazz.getName();
        String cacheKey = BizCacheKeyUtil.getInterruptKey(recordType, recordId);
        StringRedisTemplate redisTemplate = SpringContext.getBean(StringRedisTemplate.class);
        // 加个默认过期时间
        redisTemplate.expire(cacheKey, 1, TimeUnit.HOURS);
        return redisTemplate.boundValueOps(cacheKey).increment();
    }

    /**
     * 判断给定的记录 是否中断
     *
     * @param recordClazz recordClazz
     * @param recordId    recordId
     * @return boolean
     * @author Jerry.X.He
     * @date 2021-02-03 15:03
     */
    public static boolean isInterrupted(Class recordClazz, String recordId) {
        String recordType = recordClazz.getName();
        String cacheKey = BizCacheKeyUtil.getInterruptKey(recordType, recordId);
        StringRedisTemplate redisTemplate = SpringContext.getBean(StringRedisTemplate.class);
        String value = redisTemplate.boundValueOps(cacheKey).get();
        if (!StringUtils.isNumeric(value)) {
            return false;
        }

        return Long.valueOf(value) > 0;
    }

    /**
     * 中断 给定的线程, 并抛出异常
     *
     * @param recordClazz recordClazz
     * @param recordId    recordId
     * @return void
     * @author Jerry.X.He
     * @date 2021-02-03 15:24
     */
    public static void doInterrupt(Class recordClazz, String recordId) {
        if (isInterrupted(recordClazz, recordId)) {
            String threadName = Thread.currentThread().getName();
            String msg = String.format("线程 %s 被中断, recordType : %s, recordId : %s ",
                                       threadName, recordClazz.getName(), recordId);
            throw new RuntimeException(msg);
        }
    }

}

标记中断 和 检测中断埋点

这个一般是可以 使用 aop 来处理, 加上一个注解来 标记中断 或者 检测中断埋点 

一些特殊的 地方需要代码里面 增加埋点, 比如不属于 调用自己的 耗时/开销较大 的操作, 调用的工具类的 耗时/开销较大 的操作 

呵呵 方法中调用自己工具方法 的情况 还不是很好处理, 而且 很常见, 再桥接一层服务? 

往往 你对封装理解的越到位, 这种情况就越好处理 

使用 aop 来处理, 相对来说 是一种侵入比较小的方式了 

完