你的位置:首页 > 信息动态 > 新闻中心
信息动态
联系我们

分布式任务调度系统chaconne介绍

2021/12/23 1:29:25

public class EtlJob implements NotManagedJob {

@Override

public Object execute(JobKey jobKey, Object attachment, Logger log) {

log.info(“JobKey:{}, Parameter: {}”, jobKey, attachment);

return null;

}

}

任务依赖

任务依赖是chaconne框架的重要特性之一,任务依赖分为串行依赖并行依赖

所谓串行依赖是指任务A做完接着执行任务B, 即任务B依赖任务A。

并行依赖是指,比如有3个任务,分别为任务A, 任务B, 任务C, 任务A和任务B都做完才能执行任务C, 类似会签的业务场景。

串行依赖和并行依赖都可以共享任务上下文参数和运行结果,并且支持自定义判断策略来决定要不要触发下游任务。

DAG(有向无环图)

而在结合串行依赖和并行依赖的基础上,chaconne框架又提供了DAG功能并提供了友好的API,来模拟类似工作流的业务场景,更加丰富了任务依赖的使用场景。

(这里为了方便举例,都通过注解的方式配置任务)

串行依赖示例:

@ChacJob

@ChacTrigger(triggerType = TriggerType.DEPENDENT)

@ChacDependency({ @ChacJobKey(className = “com.chinapex.test.chaconne.job.DemoSchedJob”, name = “demoSchedJob”) })

public class DemoDependentJob {

@Run

public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {

log.info(“DemoDependentJob is running at: {}”, DateUtils.format(System.currentTimeMillis()));

return RandomUtils.randomLong(1000000L, 1000000000L);

}

@OnSuccess

public void onSuccess(JobKey jobKey, Object result, Logger log) {

log.info(“DemoDependentJob’s return value is: {}”, result);

}

@OnFailure

public void onFailure(JobKey jobKey, Throwable e, Logger log) {

log.error(“DemoDependentJob is failed by cause: {}”, e.getMessage(), e);

}

}

并行依赖示例:

有3个任务,DemoTask, DemoTaskOne, DemoTaskTwo

让DemoTaskOne, DemoTaskTwo都做完再执行DemoTask,且DemoTask可以获得DemoTaskOne, DemoTaskTwo执行后的值

DemoTaskOne:

@ChacJob

@ChacTrigger(triggerType = TriggerType.SIMPLE)

public class DemoTaskOne {

@Run

public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {

log.info(“DemoTaskOne is running at: {}”, DateUtils.format(System.currentTimeMillis()));

return RandomUtils.randomLong(1000000L, 1000000000L);

}

@OnSuccess

public void onSuccess(JobKey jobKey, Object result, Logger log) {

log.info(“DemoTaskOne return value is: {}”, result);

}

@OnFailure

public void onFailure(JobKey jobKey, Throwable e, Logger log) {

log.error(“DemoTaskOne is failed by cause: {}”, e.getMessage(), e);

}

}

DemoTaskTwo:

@ChacJob

@ChacTrigger(triggerType = TriggerType.SIMPLE)

public class DemoTaskTwo {

@Run

public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {

log.info(“DemoTaskTwo is running at: {}”, DateUtils.format(System.currentTimeMillis()));

return RandomUtils.randomLong(1000000L, 1000000000L);

}

@OnSuccess

public void onSuccess(JobKey jobKey, Object result, Logger log) {

log.info(“DemoTaskTwo return value is: {}”, result);

}

@OnFailure

public void onFailure(JobKey jobKey, Throwable e, Logger log) {

log.error(“DemoTaskTwo is failed by cause: {}”, e.getMessage(), e);

}

}

DemoTask:

@ChacJob

@ChacTrigger(cron = “0 0/1 * * * ?”, triggerType = TriggerType.CRON)

@ChacFork({ @ChacJobKey(className = “com.chinapex.test.chaconne.job.DemoTaskOne”, name = “demoTaskOne”),

@ChacJobKey(className = “com.chinapex.test.chaconne.job.DemoTaskTwo”, name = “demoTaskTwo”) })

public class DemoTask {

@Run

public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {

log.info(“DemoTask is running at: {}”, DateUtils.format(System.currentTimeMillis()));

TaskJoinResult joinResult = (TaskJoinResult) attachment;

TaskForkResult[] forkResults = joinResult.getTaskForkResults();

long max = 0;

for (TaskForkResult forkResult : forkResults) {

max = Long.max(max, (Long) forkResult.getResult());

}

return max;

}

@OnSuccess

public void onSuccess(JobKey jobKey, Object result, Logger log) {

log.info(“DemoTask return max value is: {}”, result);

}

@OnFailure

public void onFailure(JobKey jobKey, Throwable e, Logger log) {

log.error(“DemoTask is failed by cause: {}”, e.getMessage(), e);

}

}

DAG任务示例

DAG任务目前只支持API创建, 后续会持续改进,增加界面方式创建DAG任务

@RequestMapping("/dag")

@RestController

public class DagJobController {

@Value("${spring.application.cluster.name}")

private String clusterName;

@Value("${spring.application.name}")

private String applicationName;

@Autowired

private JobManager jobManager;

@GetMapping("/create")

public Map<String, Object> createDagTask() throws Exception {

Dag dag = new Dag(clusterName, applicationName, “testDag”);// 创建一个Dag任务,并指定集群名,应用名,和任务名称

dag.setTrigger(new CronTrigger(“0 0/1 * * * ?”));// 设置Cron表达式

dag.setDescription(“This is only a demo of dag job”);// 任务描述

DagFlow first = dag.startWith(clusterName, applicationName, “demoDagStart”, DemoDagStart.class.getName());// 定义第一个节点

DagFlow second = first.flow(clusterName, applicationName, “demoDag”, DemoDag.class.getName());// 定义第二个节点

// 第二个节点fork两个子进程处理

second.fork(clusterName, applicationName, “demoDagOne”, DemoDagOne.class.getName());

second.fork(clusterName, applicationName, “demoDagTwo”, DemoDagTwo.class.getName());

jobManager.persistJob(dag, “123”);

return Collections.singletonMap(“ok”, 1);

}

}

上面的DAG示例说明一下,chaconne框架提供的DAG模型支持串行流入,即flow模式,也提供了fork模式进行并行处理,上例中,任务demoDag fork了两个子进程(“demoDagOne”和“demoDagTwo”),即demoDagOne和demoDagTwo同时处理完了再触发demoDag任务。

Chaconne部署说明

chaconne除了依托SpringBoot框架外,默认用MySQL存储任务信息(目前仅支持MySQL,后续会支持更多类型的数据库), 用Redis保存集群元数据和进行消息广播

所以无论使用哪种部署方式,你都需要在你的应用中设置DataSource和RedisConnectionFactory

示例代码:

@Slf4j

@Configuration

public class ResourceConfig {

@Setter

@Configuration(proxyBeanMethods = false)

@ConfigurationProperties(prefix = “spring.datasource”)

public static class DataSourceConfig {

private String jdbcUrl;

private String username;

private String password;

private String driverClassName;

private HikariConfig getDbConfig() {

if (log.isTraceEnabled()) {

log.trace("DataSourceConfig JdbcUrl: " + jdbcUrl);

log.trace("DataSourceConfig Username: " + username);

log.trace("DataSourceConfig Password: " + password);

log.trace("DataSourceConfig DriverClassName: " + driverClassName);

}

final HikariConfig config = new HikariConfig();

config.setDriverClassName(driverClassName);

config.setJdbcUrl(jdbcUrl);

config.setUsername(username);

config.setPassword(password);

config.setMinimumIdle(5);

config.setMaximumPoolSize(50);

config.setMaxLifetime(60 * 1000);

config.setIdleTimeout(60 * 1000);

config.setValidationTimeout(3000);

config.setReadOnly(false);

config.setConnectionInitSql(“SELECT UUID()”);

config.setConnectionTestQuery(“SELECT 1”);

config.setConnectionTimeout(60 * 1000);

config.setTransactionIsolation(“TRANSACTION_READ_COMMITTED”);

config.addDataSourceProperty(“cachePrepStmts”, “true”);

config.addDataSourceProperty(“prepStmtCacheSize”, “250”);

config.addDataSourceProperty(“prepStmtCacheSqlLimit”, “2048”);

return config;

}

@Primary

@Bean

public DataSource dataSource() {

return new HikariDataSource(getDbConfig());

}

}

@Setter

@Configuration(proxyBeanMethods = false)

@ConfigurationProperties(prefix = “spring.redis”)

public static class RedisConfig {

private String host;

private String password;

private int port;

private int dbIndex;

@Bean

@ConditionalOnMissingBean(RedisConnectionFactory.class)

public RedisConnectionFactory redisConnectionFactory() {

RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration();

redisStandaloneConfiguration.setHostName(host);

redisStandaloneConfiguration.setPort(port);

redisStandaloneConfiguration.setDatabase(dbIndex);

redisStandaloneConfiguration.setPassword(RedisPassword.of(password));

JedisClientConfiguration.JedisClientConfigurationBuilder jedisClientConfiguration = JedisClientConfiguration.builder();

jedisClientConfiguration.connectTimeout(Duration.ofMillis(60000)).readTimeout(Duration.ofMillis(60000)).usePooling()

.poolConfig(jedisPoolConfig());

JedisConnectionFactory factory = new JedisConnectionFactory(redisStandaloneConfiguration, jedisClientConfiguration.build());

return factory;

}

@Bean

public JedisPoolConfig jedisPoolConfig() {

JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();

jedisPoolConfig.setMinIdle(1);

jedisPoolConfig.setMaxIdle(10);

jedisPoolConfig.setMaxTotal(200);

jedisPoolConfig.setMaxWaitMillis(-1);

jedisPoolConfig.setTestWhileIdle(true);

return jedisPoolConfig;

}

}

}

Chaconne去中心化部署

在你的Spring应用程序的主函数上加上@EnableChaconneEmbeddedMode注解,然后启动

示例代码:

@EnableChaconneEmbeddedMode

@SpringBootApplication

@ComponentScan

public class YourApplicationMain {

public static void main(String[] args) {

final int port = 8088;

System.setProperty(“server.port”, String.valueOf(port));

SpringApplication.run(YourApplicationMain.class, args);

}

}

Chaconne中心化部署
  1. 启动调度中心,这需要你新建一个SpringBoot项目,在主函数上加上@EnableChaconneDetachedMode注解,并指定为生产端

示例代码:

@EnableChaconneDetachedMode(DetachedMode.PRODUCER)

@SpringBootApplication

public class ChaconneManagementMain {

public static void main(String[] args) {

SpringApplication.run(ChaconneManagementMain.class, args);

}

}

别忘了配置DataSource和RedisConnectionFactory
  1. 在你的Spring应用程序的主函数上加上@EnableChaconneDetachedMode注解(默认为消费端),然后启动

@EnableChaconneDetachedMode

@SpringBootApplication

@ComponentScan

public class YourApplicationMain {

public static void main(String[] args) {

final int port = 8088;

System.setProperty(“server.port”, String.valueOf(port));

SpringApplication.run(YourApplicationMain.class, args);

}

}

Chaconne Console使用说明

Chaconne Console是chaconne框架提供的任务管理和查看的Web项目,它也支持去中心化部署和中心化部署模式,默认端口6140

提供了如下功能:

  1. 保存任务和查看任务信息

  2. 暂停和继续任务

  3. 删除任务

  4. 手动运行任务

  5. 查看任务统计(按天)

  6. 查看任务运行时日志

目前Chaconne Console项目还在不断的维护中,有些功能略显粗糙(任务JSON编辑器),有些功能暂未开放。

同样,Chaconne Console也是一个SpringBoot的工程

源码:

@EnableChaconneClientMode

@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })

public class ChaconneConsoleMain {

static {

System.setProperty(“spring.devtools.restart.enabled”, “false”);

File logDir = FileUtils.getFile(FileUtils.getUserDirectory(), “logs”, “indi”, “atlantis”, “framework”, “chaconne”, “console”);

if (!logDir.exists()) {

logDir.mkdirs();

}

System.setProperty(“DEFAULT_LOG_BASE”, logDir.getAbsolutePath());

}

public static void main(String[] args) {