import com.hanxiaozhang.importexcelnew.ErrorInfoEntity;
import com.hanxiaozhang.importexcelnew.SaveExcelNewService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 〈一句话功能简述〉<br>
* 〈数据字典数据导入〉
*/
@Slf4j
public abstract class SaveExcelTemplateHandle<T> implements SaveExcelNewService<T> {
@Transactional(rollbackFor = Exception.class)
public com.hanxiaozhang.importexcelnew.ErrorInfoEntity batchSave(List<T> list, CyclicBarrier barrier, AtomicBoolean rollbackFlag) throws Exception {
log.info("save(),[{}]", Thread.currentThread().getName());
// 其他线程异常手工回滚
if (rollbackFlag.get()) {
String message = "子线程未全部执行成功,对线程[" + Thread.currentThread().getName() + "]进行回滚";
throw new Exception(message);
}
try {
//创建返回错误信息实体
com.hanxiaozhang.importexcelnew.ErrorInfoEntity errorInfoEntity = new ErrorInfoEntity();
//业务操作
List<T> errorList = handle(list);
//赋值错误数据
errorInfoEntity.setErrorList(errorList);
barrier.await();
if (rollbackFlag.get()) {
String message = "子线程未全部执行成功,对线程[" + Thread.currentThread().getName() + "]进行回滚";
throw new Exception(message);
}
return errorInfoEntity;
} catch (Exception e) {
e.printStackTrace();
log.error(e.toString());
rollbackFlag.set(true);
barrier.await();
throw e;
}
}
protected abstract List<T> handle(List<T> list);
}
import lombok.Data;
import java.util.List;
/**
* 〈一句话功能简述〉<br>
* 〈错误信息实体〉
*
*/
@Data
public class ErrorInfoEntity<T> {
/**
* 业务上判断有错误的数据集合
*/
private List<T> errorList;
}
import com.hanxiaozhang.sourcecode.executor.MyThreadPoolExecutor;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.concurrent.Executors.defaultThreadFactory;
/**
* 〈一句话功能简述〉<br>
* 〈导入Excel执行器〉
* <p>
* 2020-04-14 问题:使用固定线程池,线程数大于7之后,第8个线程就启动不了,也没有找了原因
*
*/
@Slf4j
public class ImportExcelNewExecutor {
private final static int MAX_THREAD = 10;
/**
* 执行方法(分批创建子线程)
*
* @param saveService 保存的服务
* @param lists 数据List
* @param groupLen 分组的长度
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
public static <T> List<T> execute(SaveExcelNewService<T> saveService, List<T> lists, int groupLen) throws ExecutionException, InterruptedException {
if (lists == null || lists.size() == 0) {
return null;
}
List<T> errorList = new ArrayList<>();
//创建一个固定线程池
ExecutorService executorService = new MyThreadPoolExecutor(MAX_THREAD, MAX_THREAD, 0, TimeUnit.MINUTES,
new LinkedBlockingQueue<Runnable>(), defaultThreadFactory(),
new MyThreadPoolExecutor.MyAbortPolicy());
//创建一个Future集合
List<Future<ErrorInfoEntity>> futures = new ArrayList<>();
//集合的元素个数
int size = lists.size();
//适配线程池数与分组长度
//Math.ceil()对小数向下“”取整”
int batches = (int) Math.ceil(size * 1.0 / groupLen);
//分组超长最大线程限制,则设置分组数为最大线程限制,计算分组集合尺寸
if (batches > MAX_THREAD) {
batches = MAX_THREAD;
groupLen = (int) Math.ceil(size * 1.0 / batches);
}
log.info("总条数:[{}],批次数量:[{}],每批数据量:[{}]", size, batches, groupLen);
CyclicBarrier barrier = new CyclicBarrier(MAX_THREAD);
AtomicBoolean rollbackFlag= new AtomicBoolean(false);
int startIndex, toIndex, maxIndex = lists.size();
for (int i = 0; i < batches; i++) {
//开始索引位置
startIndex = i * groupLen;
//截止索引位置
toIndex = startIndex + groupLen;
//如果截止索引大于最大索引,截止索引等于最大索引
if (toIndex > maxIndex) {
toIndex = maxIndex;
}
//截取数组
List<T> temp = lists.subList(startIndex, toIndex);
if (temp == null || temp.size() == 0) {
continue;
}
futures.add(executorService.submit(new ImportExcelNewTask(saveService, temp, barrier,rollbackFlag)));
}
//子线程全部等待返回(存在异常,则直接抛向主线程)
for (Future<ErrorInfoEntity> future : futures) {
errorList.addAll(future.get().getErrorList());
}
//所有线程返回后,关闭线程池
executorService.shutdown();
return errorList;
}
}
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 〈功能描述〉<br>
* 〈导入Excel任务〉
*
*/
@Slf4j
public class ImportExcelNewTask<T> implements Callable<ErrorInfoEntity> {
/**
* 保存Excel服务
*/
private SaveExcelNewService excelService;
/**
* 数据集合
*/
private List<T> list;
/**
* 屏障
*/
private CyclicBarrier barrier;
/**
* 回滚标识
*/
private AtomicBoolean rollbackFlag;
/**
* 构造函数
*
* @param excelService
* @param list
* @param barrier
* @param rollbackFlag
*/
public ImportExcelNewTask(SaveExcelNewService<T> excelService, List<T> list, CyclicBarrier barrier, AtomicBoolean rollbackFlag) {
this.excelService = excelService;
this.list = list;
this.barrier = barrier;
this.rollbackFlag = rollbackFlag;
}
@Override
public ErrorInfoEntity call() throws Exception {
return excelService.batchSave(list, barrier,rollbackFlag);
}
}
import java.util.List;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 〈一句话功能简述〉<br>
* 〈保存ExcelService〉
*
*/
public interface SaveExcelNewService<T> {
/**
* 批量保存
*
* @param list
* @param barrier
* @param rollbackFlag
* @return
* @throws Exception
*/
ErrorInfoEntity batchSave(List<T> list, CyclicBarrier barrier, AtomicBoolean rollbackFlag) throws Exception;
}
import com.hanxiaozhang.dictonecode.dao.TestDao;
import com.hanxiaozhang.dictonecode.domain.TestDO;
import com.hanxiaozhang.importexcelnew.config.SaveExcelTemplateHandle;
import com.hanxiaozhang.utils.StringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
/**
* 〈一句话功能简述〉<br>
* 〈数据字典数据导入〉
*
*/
@Slf4j
@Service
public class TestSaveExcelNewServiceImpl extends SaveExcelTemplateHandle<TestDO> {
@Resource
private TestDao testDao;
/**
* 处理相关数据
*
* @param list
* @return
*/
protected List<TestDO> handle(List<TestDO> list) {
List<TestDO> errorList = new ArrayList<>();
list.forEach(x -> {
boolean flag = true;
List<String> errorMsg = new ArrayList<>();
//模拟一个业务数据错误,姓名不能为空
if (StringUtil.isBlank(x.getName())) {
errorMsg.add("姓名不能为空!");
flag = false;
throw new RuntimeException();
}
//模拟一个业务数据错误,类型不能为空
if (StringUtil.isBlank(x.getType())) {
errorMsg.add("类型不能为空!");
flag = false;
}
if (flag) {
testDao.save(x);
} else {
x.setRemarks(String.join("\n", errorMsg));
errorList.add(x);
}
});
return errorList;
}
}