This commit is contained in:
Jane
2024-01-02 19:38:11 +08:00
parent fc6d88a783
commit 58df49f91c
22 changed files with 438 additions and 17 deletions

View File

@@ -61,6 +61,10 @@ public class CheckReportEntity implements Serializable {
*/
private String checkBatch;
private String checkReportType;
private Integer checkTimeConsuming;
/**
* 规则名称
*/

View File

@@ -25,6 +25,8 @@ public class CheckReportVo implements Serializable {
private LocalDateTime checkDate;
private String checkResult;
private Integer checkTotalCount;
private String checkReportType;
private String checkTimeConsuming;
private Integer checkErrorCount;
private String ruleName;
private String ruleType;

View File

@@ -88,7 +88,6 @@
<groupId>com.platform</groupId>
<artifactId>data-metadata-service</artifactId>
<version>0.4.x</version>
<scope>compile</scope>
</dependency>
</dependencies>

View File

@@ -47,7 +47,7 @@ public class StartedUpRunner implements ApplicationRunner {
List<ScheduleJobEntity> list = scheduleJobService.list(Wrappers.<ScheduleJobEntity>lambdaQuery().eq(ScheduleJobEntity::getStatus, DataConstant.TrueOrFalse.TRUE.getKey()));
if (CollUtil.isNotEmpty(list)) {
list.forEach(job -> {
SchedulingRunnable task = new SchedulingRunnable(job.getId(), job.getBeanName(), job.getMethodName(), job.getMethodParams());
SchedulingRunnable task = new SchedulingRunnable(job.getId(), job.getBeanName(), job.getMethodName(), job.getMethodParams(), job.getJobType());
cronTaskRegistrar.addCronTask(task, job.getCronExpression());
});
}

View File

@@ -78,8 +78,6 @@ public class CheckReportController extends BaseController {
queryWrapper.like(StrUtil.isNotBlank(checkReportQuery.getRuleSource()), "r.rule_source", checkReportQuery.getRuleSource());
queryWrapper.like(StrUtil.isNotBlank(checkReportQuery.getRuleTable()), "r.rule_table", checkReportQuery.getRuleTable());
queryWrapper.like(StrUtil.isNotBlank(checkReportQuery.getRuleColumn()), "r.rule_column", checkReportQuery.getRuleColumn());
// 确定唯一核查报告
queryWrapper.apply("c.check_batch = r.last_check_batch");
IPage<CheckReportEntity> page = checkReportService.page(new Page<>(checkReportQuery.getPageNum(), checkReportQuery.getPageSize()), queryWrapper);
List<CheckReportVo> collect = page.getRecords().stream().map(checkReportMapper::toVO).collect(Collectors.toList());
JsonPage<CheckReportVo> jsonPage = new JsonPage<>(page.getCurrent(), page.getSize(), page.getTotal(), collect);

View File

@@ -25,11 +25,14 @@ public class SchedulingRunnable implements Runnable {
private String params;
public SchedulingRunnable(String id, String beanName, String methodName, String params) {
private String jobType;
public SchedulingRunnable(String id, String beanName, String methodName, String params, String jobType) {
this.id = id;
this.beanName = beanName;
this.methodName = methodName;
this.params = params;
this.jobType = jobType;
}
@Override
@@ -47,6 +50,7 @@ public class SchedulingRunnable implements Runnable {
}
batch = DateUtil.format(LocalDateTime.now(), DatePattern.PURE_DATETIME_PATTERN);
map.put("batch", batch);
map.put("jobType", jobType);
ReflectionUtils.makeAccessible(method);
method.invoke(target, map);
} catch (Exception ex) {

View File

@@ -65,6 +65,8 @@ public class QualityTask {
tasks.add(task);
});
List<Future<CheckReportEntity>> futures;
long checkConsumeTime = 0L;
long startTime = System.currentTimeMillis();
try {
futures = threadPoolExecutor.invokeAll(tasks);
// 处理线程返回结果
@@ -76,13 +78,18 @@ public class QualityTask {
} catch (Exception e) {
e.printStackTrace();
}
checkConsumeTime = System.currentTimeMillis() - startTime;
// 关闭线程池
threadPoolExecutor.shutdown();
// 核查报告
long finalCheckConsumeTime = checkConsumeTime;
result.forEach(s -> {
// 插入核查结果正常的数据
String status = StrUtil.isBlank(s.getCheckResult()) ? DataConstant.TrueOrFalse.TRUE.getKey() : DataConstant.TrueOrFalse.FALSE.getKey();
if (StrUtil.isBlank(s.getCheckResult())) {
s.setCheckTimeConsuming((int) finalCheckConsumeTime);
s.setCheckResult(DataConstant.TrueOrFalse.TRUE.getKey());
s.setCheckReportType((String) map.get("jobType"));
s.setCheckBatch((String) map.get("batch"));
checkReportService.save(s);
// 更新最近核查批次号

View File

@@ -5,7 +5,6 @@ import cn.datax.common.core.RedisConstant;
import cn.datax.common.database.constants.DbType;
import cn.datax.common.redis.service.RedisService;
import cn.datax.service.data.metadata.api.entity.MetadataTableEntity;
import cn.datax.service.data.metadata.dao.MetadataTableDao;
import cn.datax.service.data.quality.api.dto.*;
import cn.datax.service.data.quality.api.entity.CheckRuleEntity;
import cn.datax.service.data.quality.api.enums.RuleItem;
@@ -46,8 +45,8 @@ public class CheckRuleServiceImpl extends BaseServiceImpl<CheckRuleDao, CheckRul
@Autowired
private RedisService redisService;
@Autowired
private MetadataTableDao metadataTableDao;
/*@Autowired
private MetadataTableDao metadataTableDao;*/
private static String BIND_GB_CODE = "gb_code";
private static String BIND_GB_NAME = "gb_name";
@@ -65,10 +64,10 @@ public class CheckRuleServiceImpl extends BaseServiceImpl<CheckRuleDao, CheckRul
QueryWrapper<MetadataTableEntity> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("source_id", checkRuleDto.getRuleSourceId());
queryWrapper.eq("table_name",currentString);
MetadataTableEntity metadataTableEntity = metadataTableDao.selectOne(queryWrapper);
//MetadataTableEntity metadataTableEntity = metadataTableDao.selectOne(queryWrapper);
checkRule = checkRuleMapper.toEntity(checkRuleDto);
checkRule.setRuleTable(currentString);
checkRule.setRuleTableId(metadataTableEntity.getId());
//checkRule.setRuleTableId(metadataTableEntity.getId());
checkRuleDao.insert(checkRule);
}
return checkRule;

View File

@@ -73,7 +73,7 @@ public class ScheduleJobServiceImpl extends BaseServiceImpl<ScheduleJobDao, Sche
@Override
public void pauseScheduleJobById(String id) {
ScheduleJobEntity scheduleJobEntity = super.getById(id);
SchedulingRunnable task = new SchedulingRunnable(id, scheduleJobEntity.getBeanName(), scheduleJobEntity.getMethodName(), scheduleJobEntity.getMethodParams());
SchedulingRunnable task = new SchedulingRunnable(id, scheduleJobEntity.getBeanName(), scheduleJobEntity.getMethodName(), scheduleJobEntity.getMethodParams(), scheduleJobEntity.getJobType());
cronTaskRegistrar.removeCronTask(task);
scheduleJobEntity.setStatus(DataConstant.TrueOrFalse.FALSE.getKey());
scheduleJobDao.updateById(scheduleJobEntity);
@@ -82,7 +82,7 @@ public class ScheduleJobServiceImpl extends BaseServiceImpl<ScheduleJobDao, Sche
@Override
public void resumeScheduleJobById(String id) {
ScheduleJobEntity scheduleJobEntity = super.getById(id);
SchedulingRunnable task = new SchedulingRunnable(id, scheduleJobEntity.getBeanName(), scheduleJobEntity.getMethodName(), scheduleJobEntity.getMethodParams());
SchedulingRunnable task = new SchedulingRunnable(id, scheduleJobEntity.getBeanName(), scheduleJobEntity.getMethodName(), scheduleJobEntity.getMethodParams(), scheduleJobEntity.getJobType());
cronTaskRegistrar.addCronTask(task, scheduleJobEntity.getCronExpression());
scheduleJobEntity.setStatus(DataConstant.TrueOrFalse.TRUE.getKey());
scheduleJobDao.updateById(scheduleJobEntity);
@@ -92,7 +92,7 @@ public class ScheduleJobServiceImpl extends BaseServiceImpl<ScheduleJobDao, Sche
@Async("taskExecutor")
public void runScheduleJobById(String id) {
ScheduleJobEntity scheduleJobEntity = super.getById(id);
SchedulingRunnable task = new SchedulingRunnable(id, scheduleJobEntity.getBeanName(), scheduleJobEntity.getMethodName(), scheduleJobEntity.getMethodParams());
SchedulingRunnable task = new SchedulingRunnable(id, scheduleJobEntity.getBeanName(), scheduleJobEntity.getMethodName(), scheduleJobEntity.getMethodParams(), scheduleJobEntity.getJobType());
task.run();
}
}

View File

@@ -11,6 +11,7 @@
<result column="check_total_count" property="checkTotalCount" />
<result column="check_error_count" property="checkErrorCount" />
<result column="check_batch" property="checkBatch" />
<result column="check_report_type" property="checkReportType" />
</resultMap>
<resultMap id="ExtendResultMap" type="cn.datax.service.data.quality.api.entity.CheckReportEntity" extends="BaseResultMap">
@@ -24,12 +25,12 @@
<!-- 通用查询结果列 -->
<sql id="Base_Column_List">
id,
check_rule_id, check_date, check_result, check_total_count, check_error_count, check_batch
check_rule_id, check_date, check_result, check_total_count, check_error_count, check_batch, check_report_type, check_time_consuming
</sql>
<sql id="Report_Column_List">
${alias}.id,
${alias}.check_rule_id, ${alias}.check_date, ${alias}.check_result, ${alias}.check_total_count, ${alias}.check_error_count, ${alias}.check_batch
${alias}.check_rule_id, ${alias}.check_date, ${alias}.check_result, ${alias}.check_total_count, ${alias}.check_error_count, ${alias}.check_batch, ${alias}.check_report_type, ${alias}.check_time_consuming
</sql>
<select id="selectPage" resultMap="ExtendResultMap">
@@ -51,6 +52,8 @@
<result column="rule_level_id" property="ruleLevelId" />
<result column="rule_level_name" property="ruleLevelName" />
<result column="check_error_count" property="checkErrorCount" />
<result column="check_report_type" property="checkReportType" />
<result column="check_time_consuming" property="checkTimeConsuming" />
</resultMap>
<resultMap id="ExtendReportResultMap" type="cn.datax.service.data.quality.api.entity.DataReportEntity" extends="ReportResultMap">
@@ -60,6 +63,8 @@
<result column="rule_column_name" property="ruleColumnName" />
<result column="rule_column_comment" property="ruleColumnComment" />
<result column="check_total_count" property="checkTotalCount" />
<result column="check_report_type" property="checkReportType" />
<result column="check_time_consuming" property="checkTimeConsuming" />
</resultMap>
<select id="getReportBySource" resultMap="ReportResultMap">