你的位置:首页 > 信息动态 > 新闻中心
信息动态
联系我们

多线程导入mysql 二

2021/12/18 20:06:35
import com.hanxiaozhang.importexcelnew.ErrorInfoEntity;
import com.hanxiaozhang.importexcelnew.SaveExcelNewService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 〈一句话功能简述〉<br>
 * 〈数据字典数据导入〉
 */
@Slf4j
public abstract class SaveExcelTemplateHandle<T> implements SaveExcelNewService<T> {

    @Transactional(rollbackFor = Exception.class)
    public com.hanxiaozhang.importexcelnew.ErrorInfoEntity batchSave(List<T> list, CyclicBarrier barrier, AtomicBoolean rollbackFlag) throws Exception {
        log.info("save(),[{}]", Thread.currentThread().getName());
        // 其他线程异常手工回滚
        if (rollbackFlag.get()) {
            String message = "子线程未全部执行成功,对线程[" + Thread.currentThread().getName() + "]进行回滚";
            throw new Exception(message);
        }
        try {
            //创建返回错误信息实体
            com.hanxiaozhang.importexcelnew.ErrorInfoEntity errorInfoEntity = new ErrorInfoEntity();
            //业务操作
            List<T> errorList = handle(list);
            //赋值错误数据
            errorInfoEntity.setErrorList(errorList);
            barrier.await();
            if (rollbackFlag.get()) {
                String message = "子线程未全部执行成功,对线程[" + Thread.currentThread().getName() + "]进行回滚";
                throw new Exception(message);
            }
            return errorInfoEntity;
        } catch (Exception e) {
            e.printStackTrace();
            log.error(e.toString());
            rollbackFlag.set(true);
            barrier.await();
            throw e;
        }
    }

    protected abstract List<T> handle(List<T> list);


}
import lombok.Data;
import java.util.List;

/**
 * 〈一句话功能简述〉<br>
 * 〈错误信息实体〉
 *
 */
@Data
public class ErrorInfoEntity<T> {

    /**
     * 业务上判断有错误的数据集合
     */
    private List<T> errorList;


}
import com.hanxiaozhang.sourcecode.executor.MyThreadPoolExecutor;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.concurrent.Executors.defaultThreadFactory;

/**
 * 〈一句话功能简述〉<br>
 * 〈导入Excel执行器〉
 * <p>
 * 2020-04-14 问题:使用固定线程池,线程数大于7之后,第8个线程就启动不了,也没有找了原因
 *
 */
@Slf4j
public class ImportExcelNewExecutor {


    private final static int MAX_THREAD = 10;


    /**
     * 执行方法(分批创建子线程)
     *
     * @param saveService 保存的服务
     * @param lists       数据List
     * @param groupLen    分组的长度
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static <T> List<T> execute(SaveExcelNewService<T> saveService, List<T> lists, int groupLen) throws ExecutionException, InterruptedException {

        if (lists == null || lists.size() == 0) {
            return null;
        }

        List<T> errorList = new ArrayList<>();

        //创建一个固定线程池
        ExecutorService executorService = new MyThreadPoolExecutor(MAX_THREAD, MAX_THREAD, 0, TimeUnit.MINUTES,
                new LinkedBlockingQueue<Runnable>(), defaultThreadFactory(),
                new MyThreadPoolExecutor.MyAbortPolicy());
        //创建一个Future集合
        List<Future<ErrorInfoEntity>> futures = new ArrayList<>();
        //集合的元素个数
        int size = lists.size();

        //适配线程池数与分组长度
        //Math.ceil()对小数向下“”取整”
        int batches = (int) Math.ceil(size * 1.0 / groupLen);
        //分组超长最大线程限制,则设置分组数为最大线程限制,计算分组集合尺寸
        if (batches > MAX_THREAD) {
            batches = MAX_THREAD;
            groupLen = (int) Math.ceil(size * 1.0 / batches);
        }
        log.info("总条数:[{}],批次数量:[{}],每批数据量:[{}]", size, batches, groupLen);

        CyclicBarrier barrier = new CyclicBarrier(MAX_THREAD);
        AtomicBoolean rollbackFlag= new AtomicBoolean(false);

        int startIndex, toIndex, maxIndex = lists.size();


        for (int i = 0; i < batches; i++) {
            //开始索引位置
            startIndex = i * groupLen;
            //截止索引位置
            toIndex = startIndex + groupLen;
            //如果截止索引大于最大索引,截止索引等于最大索引
            if (toIndex > maxIndex) {
                toIndex = maxIndex;
            }
            //截取数组
            List<T> temp = lists.subList(startIndex, toIndex);
            if (temp == null || temp.size() == 0) {
                continue;
            }
            futures.add(executorService.submit(new ImportExcelNewTask(saveService, temp, barrier,rollbackFlag)));

        }

        //子线程全部等待返回(存在异常,则直接抛向主线程)
        for (Future<ErrorInfoEntity> future : futures) {
            errorList.addAll(future.get().getErrorList());
        }

        //所有线程返回后,关闭线程池
        executorService.shutdown();

        return errorList;
    }

}

import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 〈功能描述〉<br>
 * 〈导入Excel任务〉
 *
 */
@Slf4j
public class ImportExcelNewTask<T> implements Callable<ErrorInfoEntity> {

    /**
     * 保存Excel服务
     */
    private SaveExcelNewService excelService;

    /**
     * 数据集合
     */
    private List<T> list;


    /**
     * 屏障
     */
    private CyclicBarrier barrier;

    /**
     * 回滚标识
     */
    private AtomicBoolean rollbackFlag;


    /**
     * 构造函数
     *
     * @param excelService
     * @param list
     * @param barrier
     * @param rollbackFlag
     */
    public ImportExcelNewTask(SaveExcelNewService<T> excelService, List<T> list, CyclicBarrier barrier, AtomicBoolean rollbackFlag) {

        this.excelService = excelService;
        this.list = list;
        this.barrier = barrier;
        this.rollbackFlag = rollbackFlag;
    }


    @Override
    public ErrorInfoEntity call() throws Exception {

        return excelService.batchSave(list, barrier,rollbackFlag);
    }


}
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;


/**
 * 〈一句话功能简述〉<br>
 * 〈保存ExcelService〉
 *
 */

public interface SaveExcelNewService<T> {


    /**
     * 批量保存
     *
     * @param list
     * @param barrier
     * @param rollbackFlag
     * @return
     * @throws Exception
     */
    ErrorInfoEntity batchSave(List<T> list, CyclicBarrier barrier, AtomicBoolean rollbackFlag) throws Exception;

}
import com.hanxiaozhang.dictonecode.dao.TestDao;
import com.hanxiaozhang.dictonecode.domain.TestDO;
import com.hanxiaozhang.importexcelnew.config.SaveExcelTemplateHandle;
import com.hanxiaozhang.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;

/**
 * 〈一句话功能简述〉<br>
 * 〈数据字典数据导入〉
 *
 */
@Slf4j
@Service
public class TestSaveExcelNewServiceImpl extends SaveExcelTemplateHandle<TestDO> {

    @Resource
    private TestDao testDao;

    /**
     * 处理相关数据
     *
     * @param list
     * @return
     */
    protected List<TestDO> handle(List<TestDO> list) {
        List<TestDO> errorList = new ArrayList<>();
        list.forEach(x -> {
            boolean flag = true;
            List<String> errorMsg = new ArrayList<>();
            //模拟一个业务数据错误,姓名不能为空
            if (StringUtil.isBlank(x.getName())) {
                errorMsg.add("姓名不能为空!");
                flag = false;
                throw new RuntimeException();
            }
            //模拟一个业务数据错误,类型不能为空
            if (StringUtil.isBlank(x.getType())) {
                errorMsg.add("类型不能为空!");
                flag = false;
            }
            if (flag) {
                testDao.save(x);
            } else {
                x.setRemarks(String.join("\n", errorMsg));
                errorList.add(x);
            }
        });

        return errorList;
    }


}