This commit is contained in:
Jane
2023-12-22 10:59:10 +08:00
parent 751c43e199
commit d1ede2d4aa
2774 changed files with 291509 additions and 0 deletions

View File

@@ -0,0 +1,95 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>service-data-dts-parent</artifactId>
<groupId>com.platform</groupId>
<version>0.4.x</version>
</parent>
<artifactId>service-data-core</artifactId>
<packaging>jar</packaging>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>com.platform</groupId>
<artifactId>service-data-rpc</artifactId>
<version>${project.parent.version}</version>
</dependency>
<!-- groovy-all -->
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
<version>${groovy.version}</version>
</dependency>
<!-- spring-context -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
<scope>provided</scope>
</dependency>
<!-- junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<version>${jna.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<dependency>
<groupId>com.github.oshi</groupId>
<artifactId>oshi-core</artifactId>
<version>${oshi.core.version}</version>
<exclusions>
<exclusion>
<artifactId>jna</artifactId>
<groupId>net.java.dev.jna</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,50 @@
package com.platform.core.biz;
import com.platform.core.biz.model.HandleCallbackParam;
import com.platform.core.biz.model.HandleProcessCallbackParam;
import com.platform.core.biz.model.RegistryParam;
import com.platform.core.biz.model.ReturnT;
import java.util.List;
public interface AdminBiz {
// ---------------------- callback ----------------------
/**
* callback
*
* @param callbackParamList
* @return
*/
ReturnT<String> callback(List<HandleCallbackParam> callbackParamList);
/**
* processCallback
*
* @param processCallbackParamList
* @return
*/
ReturnT<String> processCallback(List<HandleProcessCallbackParam> processCallbackParamList);
// ---------------------- registry ----------------------
/**
* registry
*
* @param registryParam
* @return
*/
ReturnT<String> registry(RegistryParam registryParam);
/**
* registry remove
*
* @param registryParam
* @return
*/
ReturnT<String> registryRemove(RegistryParam registryParam);
}

View File

@@ -0,0 +1,49 @@
package com.platform.core.biz;
import com.platform.core.biz.model.LogResult;
import com.platform.core.biz.model.ReturnT;
import com.platform.core.biz.model.TriggerParam;
public interface ExecutorBiz {
/**
* beat
*
* @return
*/
ReturnT<String> beat();
/**
* idle beat
*
* @param jobId
* @return
*/
ReturnT<String> idleBeat(int jobId);
/**
* kill
*
* @param jobId
* @return
*/
ReturnT<String> kill(int jobId);
/**
* log
*
* @param logDateTim
* @param logId
* @param fromLineNum
* @return
*/
ReturnT<LogResult> log(long logDateTim, long logId, int fromLineNum);
/**
* run
*
* @param triggerParam
* @return
*/
ReturnT<String> run(TriggerParam triggerParam);
}

View File

@@ -0,0 +1,49 @@
package com.platform.core.biz.client;
import com.platform.core.biz.model.HandleCallbackParam;
import com.platform.core.biz.model.HandleProcessCallbackParam;
import com.platform.core.biz.model.RegistryParam;
import com.platform.core.util.JobRemotingUtil;
import com.platform.core.biz.AdminBiz;
import com.platform.core.biz.model.ReturnT;
import java.util.List;
public class AdminBizClient implements AdminBiz {
public AdminBizClient() {
}
public AdminBizClient(String addressUrl, String accessToken) {
this.addressUrl = addressUrl;
this.accessToken = accessToken;
// valid
if (!this.addressUrl.endsWith("/")) {
this.addressUrl = this.addressUrl + "/";
}
}
private String addressUrl ;
private String accessToken;
@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
return JobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, callbackParamList, 3);
}
@Override
public ReturnT<String> processCallback(List<HandleProcessCallbackParam> callbackParamList) {
return JobRemotingUtil.postBody(addressUrl + "api/processCallback", accessToken, callbackParamList, 3);
}
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return JobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, registryParam, 3);
}
@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
return JobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, registryParam, 3);
}
}

View File

@@ -0,0 +1,167 @@
package com.platform.core.biz.impl;
import com.platform.core.biz.ExecutorBiz;
import com.platform.core.biz.model.LogResult;
import com.platform.core.biz.model.ReturnT;
import com.platform.core.biz.model.TriggerParam;
import com.platform.core.enums.ExecutorBlockStrategyEnum;
import com.platform.core.glue.GlueFactory;
import com.platform.core.glue.GlueTypeEnum;
import com.platform.core.log.JobFileAppender;
import com.platform.core.thread.JobThread;
import com.platform.core.executor.JobExecutor;
import com.platform.core.handler.IJobHandler;
import com.platform.core.handler.impl.GlueJobHandler;
import com.platform.core.handler.impl.ScriptJobHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
public class ExecutorBizImpl implements ExecutorBiz {
private static Logger logger = LoggerFactory.getLogger(ExecutorBizImpl.class);
@Override
public ReturnT<String> beat() {
return ReturnT.SUCCESS;
}
@Override
public ReturnT<String> idleBeat(int jobId) {
// isRunningOrHasQueue
JobThread jobThread = JobExecutor.loadJobThread(jobId);
if (jobThread != null && jobThread.isRunningOrHasQueue()) {
return new ReturnT<>(ReturnT.FAIL_CODE, "job thread is running or has trigger queue.");
}
return ReturnT.SUCCESS;
}
@Override
public ReturnT<String> kill(int jobId) {
// kill handlerThread, and create new one
JobThread jobThread = JobExecutor.loadJobThread(jobId);
if (jobThread != null) {
JobExecutor.removeJobThread(jobId, "scheduling center kill job.");
return ReturnT.SUCCESS;
}
return new ReturnT<>(ReturnT.SUCCESS_CODE, "job thread already killed.");
}
@Override
public ReturnT<LogResult> log(long logDateTim, long logId, int fromLineNum) {
// log filename: logPath/yyyy-MM-dd/9999.log
String logFileName = JobFileAppender.makeLogFileName(new Date(logDateTim), logId);
LogResult logResult = JobFileAppender.readLog(logFileName, fromLineNum);
return new ReturnT<>(logResult);
}
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// load oldjobHandler + jobThread
JobThread jobThread = JobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread != null ? jobThread.getHandler() : null;
String removeOldReason = null;
// validjobHandler + jobThread
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// new jobhandler
IJobHandler newJobHandler = JobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// valid old jobThread
if (jobThread != null && jobHandler != newJobHandler) {
// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof GlueJobHandler
&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime() == triggerParam.getGlueUpdatetime())) {
// change handler or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
try {
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
}
} else if (glueTypeEnum != null && glueTypeEnum.isScript()) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof ScriptJobHandler
&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime() == triggerParam.getGlueUpdatetime())) {
// change script or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
}
} else {
return new ReturnT<>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
// executor block strategy
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<>(ReturnT.FAIL_CODE, "block strategy effect" + ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
// replace thread (new or exists invalid)
if (jobThread == null) {
jobThread = JobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// push data to queue
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
}

View File

@@ -0,0 +1,53 @@
package com.platform.core.biz.model;
import java.io.Serializable;
public class HandleCallbackParam implements Serializable {
private static final long serialVersionUID = 42L;
private long logId;
private long logDateTim;
private ReturnT<String> executeResult;
public HandleCallbackParam(){}
public HandleCallbackParam(long logId, long logDateTim, ReturnT<String> executeResult) {
this.logId = logId;
this.logDateTim = logDateTim;
this.executeResult = executeResult;
}
public long getLogId() {
return logId;
}
public void setLogId(long logId) {
this.logId = logId;
}
public long getLogDateTim() {
return logDateTim;
}
public void setLogDateTim(long logDateTim) {
this.logDateTim = logDateTim;
}
public ReturnT<String> getExecuteResult() {
return executeResult;
}
public void setExecuteResult(ReturnT<String> executeResult) {
this.executeResult = executeResult;
}
@Override
public String toString() {
return "HandleCallbackParam{" +
"logId=" + logId +
", logDateTim=" + logDateTim +
", executeResult=" + executeResult +
'}';
}
}

View File

@@ -0,0 +1,54 @@
package com.platform.core.biz.model;
import java.io.Serializable;
public class HandleProcessCallbackParam implements Serializable {
private static final long serialVersionUID = 42L;
private long logId;
private String processId;
private long logDateTime;
public HandleProcessCallbackParam(){}
public HandleProcessCallbackParam(long logId,long logDateTime, String processId) {
this.logId = logId;
this.processId = processId;
this.logDateTime=logDateTime;
}
public long getLogId() {
return logId;
}
public void setLogId(long logId) {
this.logId = logId;
}
public String getProcessId() {
return processId;
}
public void setProcessId(String processId) {
this.processId = processId;
}
public long getLogDateTime() {
return logDateTime;
}
public void setLogDateTime(long logDateTime) {
this.logDateTime = logDateTime;
}
@Override
public String toString() {
return "HandleCallbackParam{" +
"logId=" + logId +
", processId=" + processId +
", logDateTime=" + logDateTime +
'}';
}
}

View File

@@ -0,0 +1,52 @@
package com.platform.core.biz.model;
import java.io.Serializable;
public class LogResult implements Serializable {
private static final long serialVersionUID = 42L;
public LogResult(int fromLineNum, int toLineNum, String logContent, boolean isEnd) {
this.fromLineNum = fromLineNum;
this.toLineNum = toLineNum;
this.logContent = logContent;
this.isEnd = isEnd;
}
private int fromLineNum;
private int toLineNum;
private String logContent;
private boolean isEnd;
public int getFromLineNum() {
return fromLineNum;
}
public void setFromLineNum(int fromLineNum) {
this.fromLineNum = fromLineNum;
}
public int getToLineNum() {
return toLineNum;
}
public void setToLineNum(int toLineNum) {
this.toLineNum = toLineNum;
}
public String getLogContent() {
return logContent;
}
public void setLogContent(String logContent) {
this.logContent = logContent;
}
public boolean isEnd() {
return isEnd;
}
public void setEnd(boolean end) {
isEnd = end;
}
}

View File

@@ -0,0 +1,92 @@
package com.platform.core.biz.model;
import java.io.Serializable;
public class RegistryParam implements Serializable {
private static final long serialVersionUID = 42L;
private String registryGroup;
private String registryKey;
private String registryValue;
private double cpuUsage;
private double memoryUsage;
private double loadAverage;
public RegistryParam() {
}
public RegistryParam(String registryGroup, String registryKey, String registryValue) {
this.registryGroup = registryGroup;
this.registryKey = registryKey;
this.registryValue = registryValue;
}
public RegistryParam(String registryGroup, String registryKey, String registryValue, double cpuUsage, double memoryUsage, double loadAverage) {
this.registryGroup = registryGroup;
this.registryKey = registryKey;
this.registryValue = registryValue;
this.cpuUsage = cpuUsage;
this.memoryUsage = memoryUsage;
this.loadAverage = loadAverage;
}
public String getRegistryGroup() {
return registryGroup;
}
public void setRegistryGroup(String registryGroup) {
this.registryGroup = registryGroup;
}
public String getRegistryKey() {
return registryKey;
}
public void setRegistryKey(String registryKey) {
this.registryKey = registryKey;
}
public String getRegistryValue() {
return registryValue;
}
public void setRegistryValue(String registryValue) {
this.registryValue = registryValue;
}
public double getCpuUsage() {
return cpuUsage;
}
public void setCpuUsage(double cpuUsage) {
this.cpuUsage = cpuUsage;
}
public double getMemoryUsage() {
return memoryUsage;
}
public void setMemoryUsage(double memoryUsage) {
this.memoryUsage = memoryUsage;
}
public double getLoadAverage() {
return loadAverage;
}
public void setLoadAverage(double loadAverage) {
this.loadAverage = loadAverage;
}
@Override
public String toString() {
return "RegistryParam{" +
"registryGroup='" + registryGroup + '\'' +
", registryKey='" + registryKey + '\'' +
", registryValue='" + registryValue + '\'' +
", cpuUsage='" + cpuUsage + '\'' +
", memoryUsage='" + memoryUsage + '\'' +
", loadAverage='" + loadAverage + '\'' +
'}';
}
}

View File

@@ -0,0 +1,53 @@
package com.platform.core.biz.model;
import java.io.Serializable;
public class ReturnT<T> implements Serializable {
public static final long serialVersionUID = 42L;
public static final int SUCCESS_CODE = 200;
public static final int FAIL_CODE = 500;
public static final ReturnT<String> SUCCESS = new ReturnT<>(null);
public static final ReturnT<String> FAIL = new ReturnT<>(FAIL_CODE, null);
private int code;
private String msg;
private T content;
public ReturnT(){}
public ReturnT(int code, String msg) {
this.code = code;
this.msg = msg;
}
public ReturnT(T content) {
this.code = SUCCESS_CODE;
this.content = content;
}
public int getCode() {
return code;
}
public void setCode(int code) {
this.code = code;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public T getContent() {
return content;
}
public void setContent(T content) {
this.content = content;
}
@Override
public String toString() {
return "ReturnT [code=" + code + ", msg=" + msg + ", content=" + content + "]";
}
}

View File

@@ -0,0 +1,257 @@
package com.platform.core.biz.model;
import java.io.Serializable;
import java.util.Date;
public class TriggerParam implements Serializable{
private static final long serialVersionUID = 42L;
private int jobId;
private String executorHandler;
private String executorParams;
private String executorBlockStrategy;
private int executorTimeout;
private long logId;
private long logDateTime;
private String glueType;
private String glueSource;
private long glueUpdatetime;
private int broadcastIndex;
private int broadcastTotal;
private String jobJson;
private String processId;
private String replaceParam;
private String jvmParam;
private Date startTime;
private Date triggerTime;
private String partitionInfo;
private long startId;
private long endId;
private Integer incrementType;
private String replaceParamType;
public int getJobId() {
return jobId;
}
public void setJobId(int jobId) {
this.jobId = jobId;
}
public String getExecutorHandler() {
return executorHandler;
}
public void setExecutorHandler(String executorHandler) {
this.executorHandler = executorHandler;
}
public String getExecutorParams() {
return executorParams;
}
public void setExecutorParams(String executorParams) {
this.executorParams = executorParams;
}
public String getExecutorBlockStrategy() {
return executorBlockStrategy;
}
public void setExecutorBlockStrategy(String executorBlockStrategy) {
this.executorBlockStrategy = executorBlockStrategy;
}
public int getExecutorTimeout() {
return executorTimeout;
}
public void setExecutorTimeout(int executorTimeout) {
this.executorTimeout = executorTimeout;
}
public long getLogId() {
return logId;
}
public void setLogId(long logId) {
this.logId = logId;
}
public long getLogDateTime() {
return logDateTime;
}
public void setLogDateTime(long logDateTime) {
this.logDateTime = logDateTime;
}
public String getGlueType() {
return glueType;
}
public void setGlueType(String glueType) {
this.glueType = glueType;
}
public String getGlueSource() {
return glueSource;
}
public void setGlueSource(String glueSource) {
this.glueSource = glueSource;
}
public long getGlueUpdatetime() {
return glueUpdatetime;
}
public void setGlueUpdatetime(long glueUpdatetime) {
this.glueUpdatetime = glueUpdatetime;
}
public int getBroadcastIndex() {
return broadcastIndex;
}
public void setBroadcastIndex(int broadcastIndex) {
this.broadcastIndex = broadcastIndex;
}
public int getBroadcastTotal() {
return broadcastTotal;
}
public void setBroadcastTotal(int broadcastTotal) {
this.broadcastTotal = broadcastTotal;
}
public String getJobJson() {
return jobJson;
}
public void setJobJson(String jobJson) {
this.jobJson = jobJson;
}
public String getProcessId() {
return processId;
}
public void setProcessId(String processId) {
this.processId = processId;
}
public String getReplaceParam() {
return replaceParam;
}
public void setReplaceParam(String replaceParam) {
this.replaceParam = replaceParam;
}
public String getJvmParam() {
return jvmParam;
}
public void setJvmParam(String jvmParam) {
this.jvmParam = jvmParam;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public Date getTriggerTime() {
return triggerTime;
}
public void setTriggerTime(Date triggerTime) {
this.triggerTime = triggerTime;
}
public String getPartitionInfo() {
return partitionInfo;
}
public void setPartitionInfo(String partitionInfo) {
this.partitionInfo = partitionInfo;
}
public long getStartId() {
return startId;
}
public void setStartId(long startId) {
this.startId = startId;
}
public long getEndId() {
return endId;
}
public void setEndId(long endId) {
this.endId = endId;
}
public Integer getIncrementType() {
return incrementType;
}
public void setIncrementType(Integer incrementType) {
this.incrementType = incrementType;
}
public String getReplaceParamType() {
return replaceParamType;
}
public void setReplaceParamType(String replaceParamType) {
this.replaceParamType = replaceParamType;
}
@Override
public String toString() {
return "TriggerParam{" +
"jobId=" + jobId +
", executorHandler='" + executorHandler + '\'' +
", executorParams='" + executorParams + '\'' +
", executorBlockStrategy='" + executorBlockStrategy + '\'' +
", executorTimeout=" + executorTimeout +
", logId=" + logId +
", logDateTime=" + logDateTime +
", glueType='" + glueType + '\'' +
", glueSource='" + glueSource + '\'' +
", glueUpdatetime=" + glueUpdatetime +
", broadcastIndex=" + broadcastIndex +
", broadcastTotal=" + broadcastTotal +
", jobJson=" + jobJson +
", processId=" + processId +
", replaceParam=" + replaceParam +
", jvmParam=" + jvmParam +
", startTime=" + startTime +
", triggerTime=" + triggerTime +
", partitionInfo=" + partitionInfo +
", replaceParamType=" + replaceParamType +
", startId=" + startId +
", endId=" + endId +
", incrementType=" + incrementType +
'}';
}
}

View File

@@ -0,0 +1,31 @@
package com.platform.core.enums;
public enum ExecutorBlockStrategyEnum {
SERIAL_EXECUTION("Serial execution"),
/*CONCURRENT_EXECUTION("并行"),*/
DISCARD_LATER("Discard Later"),
COVER_EARLY("Cover Early");
private String title;
ExecutorBlockStrategyEnum(String title) {
this.title = title;
}
public void setTitle(String title) {
this.title = title;
}
public String getTitle() {
return title;
}
public static ExecutorBlockStrategyEnum match(String name, ExecutorBlockStrategyEnum defaultItem) {
if (name != null) {
for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
if (item.name().equals(name)) {
return item;
}
}
}
return defaultItem;
}
}

View File

@@ -0,0 +1,32 @@
package com.platform.core.enums;
/**
* increment type
*/
public enum IncrementTypeEnum {
/**
* 2 TIME
* 1 ID
* 3 PARTITION
*/
TIME(2, "时间"),
ID(1, "自增主键"),
PARTITION(3, "HIVE分区");
IncrementTypeEnum(int code, String descp){
this.code = code;
this.descp = descp;
}
private final int code;
private final String descp;
public int getCode() {
return code;
}
public String getDescp() {
return descp;
}
}

View File

@@ -0,0 +1,10 @@
package com.platform.core.enums;
public class RegistryConfig {
public static final int BEAT_TIMEOUT = 30;
public static final int DEAD_TIMEOUT = BEAT_TIMEOUT * 3;
public enum RegistType{ EXECUTOR, ADMIN }
}

View File

@@ -0,0 +1,277 @@
package com.platform.core.executor;
import com.platform.core.biz.AdminBiz;
import com.platform.core.biz.ExecutorBiz;
import com.platform.core.biz.client.AdminBizClient;
import com.platform.core.biz.impl.ExecutorBizImpl;
import com.platform.core.handler.IJobHandler;
import com.platform.core.log.JobFileAppender;
import com.platform.core.thread.*;
import com.platform.rpc.registry.ServiceRegistry;
import com.platform.rpc.remoting.net.impl.netty_http.server.NettyHttpServer;
import com.platform.rpc.remoting.provider.XxlRpcProviderFactory;
import com.platform.rpc.serialize.Serializer;
import com.platform.rpc.serialize.impl.HessianSerializer;
import com.platform.rpc.util.IpUtil;
import com.platform.rpc.util.NetUtil;
import com.platform.core.thread.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class JobExecutor {
private static final Logger logger = LoggerFactory.getLogger(JobExecutor.class);
// ---------------------- param ----------------------
private String adminAddresses;
private String appName;
private String ip;
private int port;
private String accessToken;
private String logPath;
private int logRetentionDays;
public void setAdminAddresses(String adminAddresses) {
this.adminAddresses = adminAddresses;
}
public void setAppName(String appName) {
this.appName = appName;
}
public void setIp(String ip) {
this.ip = ip;
}
public void setPort(int port) {
this.port = port;
}
public void setAccessToken(String accessToken) {
this.accessToken = accessToken;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public void setLogRetentionDays(int logRetentionDays) {
this.logRetentionDays = logRetentionDays;
}
// ---------------------- start + stop ----------------------
public void start() throws Exception {
// init logpath
JobFileAppender.initLogPath(logPath);
// init invoker, admin-client
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init ProcessCallbackThread
ProcessCallbackThread.getInstance().start();
// init executor-server
port = port > 0 ? port : NetUtil.findAvailablePort(9999);
ip = (ip != null && ip.trim().length() > 0) ? ip : IpUtil.getIp();
initRpcProvider(ip, port, appName, accessToken);
}
public void destroy() {
// destory executor-server
stopRpcProvider();
// destory jobThreadRepository
if (jobThreadRepository.size() > 0) {
for (Map.Entry<Integer, JobThread> item : jobThreadRepository.entrySet()) {
removeJobThread(item.getKey(), "web container destroy and kill the job.");
JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");
// wait for job thread push result to callback queue
if (oldJobThread != null) {
try {
oldJobThread.join();
} catch (InterruptedException e) {
logger.error(">>>>>>>>>>> web, JobThread destroy(join) error, jobId:{}", item.getKey(), e);
}
}
}
jobThreadRepository.clear();
}
jobHandlerRepository.clear();
// destory JobLogFileCleanThread
JobLogFileCleanThread.getInstance().toStop();
// destory TriggerCallbackThread
TriggerCallbackThread.getInstance().toStop();
// destory ProcessCallbackThread
ProcessCallbackThread.getInstance().toStop();
}
// ---------------------- admin-client (rpc invoker) ----------------------
private static List<AdminBiz> adminBizList;
private static Serializer serializer = new HessianSerializer();
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses != null && adminAddresses.trim().length() > 0) {
for (String address : adminAddresses.trim().split(",")) {
if (address != null && address.trim().length() > 0) {
//实例化AdminBizClient
AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
if (adminBizList == null) {
adminBizList = new ArrayList<>();
}
adminBizList.add(adminBiz);
}
}
}
}
public static List<AdminBiz> getAdminBizList() {
return adminBizList;
}
public static Serializer getSerializer() {
return serializer;
}
// ---------------------- executor-server (rpc provider) ----------------------
private XxlRpcProviderFactory xxlRpcProviderFactory = null;
private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
// init, provider factory
String address = IpUtil.getIpPort(ip, port);
Map<String, String> serviceRegistryParam = new HashMap<>();
serviceRegistryParam.put("appName", appName);
serviceRegistryParam.put("address", address);
xxlRpcProviderFactory = new XxlRpcProviderFactory();
xxlRpcProviderFactory.setServer(NettyHttpServer.class);
xxlRpcProviderFactory.setSerializer(HessianSerializer.class);
xxlRpcProviderFactory.setCorePoolSize(20);
xxlRpcProviderFactory.setMaxPoolSize(200);
xxlRpcProviderFactory.setIp(ip);
xxlRpcProviderFactory.setPort(port);
xxlRpcProviderFactory.setAccessToken(accessToken);
xxlRpcProviderFactory.setServiceRegistry(ExecutorServiceRegistry.class);
xxlRpcProviderFactory.setServiceRegistryParam(serviceRegistryParam);
// add services
xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
// start
xxlRpcProviderFactory.start();
}
public static class ExecutorServiceRegistry extends ServiceRegistry {
@Override
public void start(Map<String, String> param) {
// start registry
ExecutorRegistryThread.getInstance().start(param.get("appName"), param.get("address"));
}
@Override
public void stop() {
// stop registry
ExecutorRegistryThread.getInstance().toStop();
}
@Override
public boolean registry(Set<String> keys, String value) {
return false;
}
@Override
public boolean remove(Set<String> keys, String value) {
return false;
}
@Override
public Map<String, TreeSet<String>> discovery(Set<String> keys) {
return null;
}
@Override
public TreeSet<String> discovery(String key) {
return null;
}
}
private void stopRpcProvider() {
// stop provider factory
try {
xxlRpcProviderFactory.stop();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
// ---------------------- job handler repository ----------------------
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) {
logger.info(">>>>>>>>>>> web register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}
public static IJobHandler loadJobHandler(String name) {
return jobHandlerRepository.get(name);
}
// ---------------------- job thread repository ----------------------
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason) {
JobThread newJobThread = new JobThread(jobId, handler);
newJobThread.start();
logger.info(">>>>>>>>>>> web regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
if (oldJobThread != null) {
oldJobThread.toStop(removeOldReason);
oldJobThread.interrupt();
}
return newJobThread;
}
public static JobThread removeJobThread(int jobId, String removeOldReason) {
JobThread oldJobThread = jobThreadRepository.remove(jobId);
if (oldJobThread != null) {
oldJobThread.toStop(removeOldReason);
oldJobThread.interrupt();
return oldJobThread;
}
return null;
}
public static JobThread loadJobThread(int jobId) {
JobThread jobThread = jobThreadRepository.get(jobId);
return jobThread;
}
}

View File

@@ -0,0 +1,81 @@
package com.platform.core.executor.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.platform.core.glue.GlueFactory;
import com.platform.core.handler.IJobHandler;
import com.platform.core.handler.annotation.JobHandler;
import com.platform.core.executor.JobExecutor;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import java.util.Map;
public class JobSpringExecutor extends JobExecutor
implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
// start
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository
initJobHandlerRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// destroy
@Override
public void destroy() {
super.destroy();
}
private void initJobHandlerRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// init job handler action
Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(JobHandler.class);
if (CollectionUtil.isNotEmpty(serviceBeanMap)) {
for (Object serviceBean : serviceBeanMap.values()) {
if (serviceBean instanceof IJobHandler) {
String name = serviceBean.getClass().getAnnotation(JobHandler.class).value();
IJobHandler handler = (IJobHandler) serviceBean;
if (loadJobHandler(name) != null) {
throw new RuntimeException("web jobhandler[" + name + "] naming conflicts.");
}
registJobHandler(name, handler);
}
}
}
}
// ---------------------- applicationContext ----------------------
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
}

View File

@@ -0,0 +1,89 @@
package com.platform.core.glue;
import com.platform.core.glue.impl.SpringGlueFactory;
import com.platform.core.handler.IJobHandler;
import groovy.lang.GroovyClassLoader;
import java.math.BigInteger;
import java.security.MessageDigest;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class GlueFactory {
private static GlueFactory glueFactory = new GlueFactory();
public static GlueFactory getInstance() {
return glueFactory;
}
public static void refreshInstance(int type) {
if (type == 0) {
glueFactory = new GlueFactory();
} else if (type == 1) {
glueFactory = new SpringGlueFactory();
}
}
/**
* groovy class loader
*/
private GroovyClassLoader groovyClassLoader = new GroovyClassLoader();
private ConcurrentMap<String, Class<?>> CLASS_CACHE = new ConcurrentHashMap<>();
/**
* load new instance, prototype
*
* @param codeSource
* @return
* @throws Exception
*/
public IJobHandler loadNewInstance(String codeSource) throws Exception {
if (codeSource != null && codeSource.trim().length() > 0) {
Class<?> clazz = getCodeSourceClass(codeSource);
if (clazz != null) {
Object instance = clazz.newInstance();
if (instance != null) {
if (instance instanceof IJobHandler) {
this.injectService(instance);
return (IJobHandler) instance;
} else {
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, "
+ "cannot convert from instance[" + instance.getClass() + "] to IJobHandler");
}
}
}
}
throw new IllegalArgumentException(">>>>>>>>>>> xxl-glue, loadNewInstance error, instance is null");
}
private Class<?> getCodeSourceClass(String codeSource) {
try {
// md5
byte[] md5 = MessageDigest.getInstance("MD5").digest(codeSource.getBytes());
String md5Str = new BigInteger(1, md5).toString(16);
Class<?> clazz = CLASS_CACHE.get(md5Str);
if (clazz == null) {
clazz = groovyClassLoader.parseClass(codeSource);
CLASS_CACHE.putIfAbsent(md5Str, clazz);
}
return clazz;
} catch (Exception e) {
return groovyClassLoader.parseClass(codeSource);
}
}
/**
* inject service of bean field
*
* @param instance
*/
public void injectService(Object instance) {
// do something
}
}

View File

@@ -0,0 +1,52 @@
package com.platform.core.glue;
public enum GlueTypeEnum {
seatunnel("seatunnel", false, null, null),
datax("data", false, null, null),
flinkx("flinkx", false, null, null),
BEAN("BEAN", false, null, null),
GLUE_GROOVY("GLUE(Java)", false, null, null),
GLUE_SHELL("GLUE(Shell)", true, "bash", ".sh"),
GLUE_PYTHON("GLUE(Python)", true, "python", ".py"),
GLUE_PHP("GLUE(PHP)", true, "php", ".php"),
GLUE_NODEJS("GLUE(Nodejs)", true, "node", ".js"),
GLUE_POWERSHELL("GLUE(PowerShell)", true, "powershell", ".ps1");
private String desc;
private boolean isScript;
private String cmd;
private String suffix;
private GlueTypeEnum(String desc, boolean isScript, String cmd, String suffix) {
this.desc = desc;
this.isScript = isScript;
this.cmd = cmd;
this.suffix = suffix;
}
public String getDesc() {
return desc;
}
public boolean isScript() {
return isScript;
}
public String getCmd() {
return cmd;
}
public String getSuffix() {
return suffix;
}
public static GlueTypeEnum match(String name){
for (GlueTypeEnum item: GlueTypeEnum.values()) {
if (item.name().equals(name)) {
return item;
}
}
return null;
}
}

View File

@@ -0,0 +1,78 @@
package com.platform.core.glue.impl;
import com.platform.core.glue.GlueFactory;
import com.platform.core.executor.impl.JobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.annotation.AnnotationUtils;
import javax.annotation.Resource;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
public class SpringGlueFactory extends GlueFactory {
private static Logger logger = LoggerFactory.getLogger(SpringGlueFactory.class);
/**
* inject action of spring
* @param instance
*/
@Override
public void injectService(Object instance){
if (instance==null) {
return;
}
if (JobSpringExecutor.getApplicationContext() == null) {
return;
}
Field[] fields = instance.getClass().getDeclaredFields();
for (Field field : fields) {
if (Modifier.isStatic(field.getModifiers())) {
continue;
}
Object fieldBean = null;
// with bean-id, bean could be found by both @Resource and @Autowired, or bean could only be found by @Autowired
if (AnnotationUtils.getAnnotation(field, Resource.class) != null) {
try {
Resource resource = AnnotationUtils.getAnnotation(field, Resource.class);
if (resource.name()!=null && resource.name().length()>0){
fieldBean = JobSpringExecutor.getApplicationContext().getBean(resource.name());
} else {
fieldBean = JobSpringExecutor.getApplicationContext().getBean(field.getName());
}
} catch (Exception e) {
}
if (fieldBean==null ) {
fieldBean = JobSpringExecutor.getApplicationContext().getBean(field.getType());
}
} else if (AnnotationUtils.getAnnotation(field, Autowired.class) != null) {
Qualifier qualifier = AnnotationUtils.getAnnotation(field, Qualifier.class);
if (qualifier!=null && qualifier.value()!=null && qualifier.value().length()>0) {
fieldBean = JobSpringExecutor.getApplicationContext().getBean(qualifier.value());
} else {
fieldBean = JobSpringExecutor.getApplicationContext().getBean(field.getType());
}
}
if (fieldBean!=null) {
field.setAccessible(true);
try {
field.set(instance, fieldBean);
} catch (IllegalArgumentException e) {
logger.error(e.getMessage(), e);
} catch (IllegalAccessException e) {
logger.error(e.getMessage(), e);
}
}
}
}
}

View File

@@ -0,0 +1,45 @@
package com.platform.core.handler;
import com.platform.core.biz.model.ReturnT;
import com.platform.core.biz.model.TriggerParam;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public abstract class IJobHandler {
/** success */
public static final ReturnT<String> SUCCESS = new ReturnT<>(200, null);
/** fail */
public static final ReturnT<String> FAIL = new ReturnT<>(500, null);
/** fail timeout */
public static final ReturnT<String> FAIL_TIMEOUT = new ReturnT<>(502, null);
public static final ConcurrentMap<String, String> jobTmpFiles = new ConcurrentHashMap<>();
/**
* execute handler, invoked when executor receives a scheduling request
*
* @param tgParam
* @return
* @throws Exception
*/
public abstract ReturnT<String> execute(TriggerParam tgParam) throws Exception;
/**
* init handler, invoked when JobThread init
*/
public void init() {
// do something
}
/**
* destroy handler, invoked when JobThread destroy
*/
public void destroy() {
// do something
}
}

View File

@@ -0,0 +1,15 @@
package com.platform.core.handler.annotation;
import java.lang.annotation.*;
/**
* annotation for job handler
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface JobHandler {
String value() default "";
}

View File

@@ -0,0 +1,25 @@
package com.platform.core.handler.impl;
import com.platform.core.biz.model.ReturnT;
import com.platform.core.biz.model.TriggerParam;
import com.platform.core.log.JobLogger;
import com.platform.core.handler.IJobHandler;
public class GlueJobHandler extends IJobHandler {
private long glueUpdatetime;
private IJobHandler jobHandler;
public GlueJobHandler(IJobHandler jobHandler, long glueUpdatetime) {
this.jobHandler = jobHandler;
this.glueUpdatetime = glueUpdatetime;
}
public long getGlueUpdatetime() {
return glueUpdatetime;
}
@Override
public ReturnT<String> execute(TriggerParam tgParam) throws Exception {
JobLogger.log("----------- glue.version:"+ glueUpdatetime +" -----------");
return jobHandler.execute(tgParam);
}
}

View File

@@ -0,0 +1,89 @@
package com.platform.core.handler.impl;
import com.platform.core.biz.model.ReturnT;
import com.platform.core.biz.model.TriggerParam;
import com.platform.core.glue.GlueTypeEnum;
import com.platform.core.handler.IJobHandler;
import com.platform.core.log.JobFileAppender;
import com.platform.core.log.JobLogger;
import com.platform.core.util.ScriptUtil;
import com.platform.core.util.ShardingUtil;
import java.io.File;
public class ScriptJobHandler extends IJobHandler {
private int jobId;
private long glueUpdatetime;
private String gluesource;
private GlueTypeEnum glueType;
public ScriptJobHandler(int jobId, long glueUpdatetime, String gluesource, GlueTypeEnum glueType){
this.jobId = jobId;
this.glueUpdatetime = glueUpdatetime;
this.gluesource = gluesource;
this.glueType = glueType;
// clean old script file
File glueSrcPath = new File(JobFileAppender.getGlueSrcPath());
if (glueSrcPath.exists()) {
File[] glueSrcFileList = glueSrcPath.listFiles();
if (glueSrcFileList!=null && glueSrcFileList.length>0) {
for (File glueSrcFileItem : glueSrcFileList) {
if (glueSrcFileItem.getName().startsWith(jobId +"_")) {
glueSrcFileItem.delete();
}
}
}
}
}
public long getGlueUpdatetime() {
return glueUpdatetime;
}
@Override
public ReturnT<String> execute(TriggerParam tgParam) throws Exception {
if (!glueType.isScript()) {
return new ReturnT<>(IJobHandler.FAIL.getCode(), "glueType[" + glueType + "] invalid.");
}
// cmd
String cmd = glueType.getCmd();
// make script file
String scriptFileName = JobFileAppender.getGlueSrcPath()
.concat(File.separator)
.concat(String.valueOf(jobId))
.concat("_")
.concat(String.valueOf(glueUpdatetime))
.concat(glueType.getSuffix());
File scriptFile = new File(scriptFileName);
if (!scriptFile.exists()) {
ScriptUtil.markScriptFile(scriptFileName, gluesource);
}
// log file
String logFileName = JobFileAppender.contextHolder.get();
// script params0=param、1=分片序号、2=分片总数
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
String[] scriptParams = new String[3];
scriptParams[0] = tgParam.getExecutorParams();
scriptParams[1] = String.valueOf(shardingVO.getIndex());
scriptParams[2] = String.valueOf(shardingVO.getTotal());
// invoke
JobLogger.log("----------- script file:"+ scriptFileName +" -----------");
int exitValue = ScriptUtil.execToFile(cmd, scriptFileName, logFileName,tgParam.getLogId(),tgParam.getLogDateTime(), scriptParams);
if (exitValue == 0) {
return IJobHandler.SUCCESS;
} else {
return new ReturnT<>(IJobHandler.FAIL.getCode(), "script exit value(" + exitValue + ") is failed");
}
}
}

View File

@@ -0,0 +1,222 @@
package com.platform.core.log;
import com.platform.core.biz.model.LogResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.Date;
public class JobFileAppender {
private static Logger logger = LoggerFactory.getLogger(JobFileAppender.class);
// for JobThread (support log for child thread of job handler)
//public static ThreadLocal<String> contextHolder = new ThreadLocal<String>();
public static final InheritableThreadLocal<String> contextHolder = new InheritableThreadLocal<>();
/**
* log base path
*
* strut like:
* ---/
* ---/gluesource/
* ---/gluesource/10_1514171108000.js
* ---/gluesource/10_1514171108000.js
* ---/2017-12-25/
* ---/2017-12-25/639.log
* ---/2017-12-25/821.log
*
*/
private static String logBasePath = "/data/applogs/executor/jobhandler";
private static String glueSrcPath = logBasePath.concat("/gluesource");
public static void initLogPath(String logPath){
// init
if (logPath!=null && logPath.trim().length()>0) {
logBasePath = logPath;
}
// mk base dir
File logPathDir = new File(logBasePath);
if (!logPathDir.exists()) {
logPathDir.mkdirs();
}
logBasePath = logPathDir.getPath();
// mk glue dir
File glueBaseDir = new File(logPathDir, "gluesource");
if (!glueBaseDir.exists()) {
glueBaseDir.mkdirs();
}
glueSrcPath = glueBaseDir.getPath();
}
public static String getLogPath() {
return logBasePath;
}
public static String getGlueSrcPath() {
return glueSrcPath;
}
/**
* log filename, like "logPath/yyyy-MM-dd/9999.log"
*
* @param triggerDate
* @param logId
* @return
*/
public static String makeLogFileName(Date triggerDate, long logId) {
// filePath/yyyy-MM-dd
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); // avoid concurrent problem, can not be static
File logFilePath = new File(getLogPath(), sdf.format(triggerDate));
if (!logFilePath.exists()) {
logFilePath.mkdir();
}
// filePath/yyyy-MM-dd/9999.log
String logFileName = logFilePath.getPath()
.concat(File.separator)
.concat(String.valueOf(logId))
.concat(".log");
return logFileName;
}
/**
* append log
*
* @param logFileName
* @param appendLog
*/
public static void appendLog(String logFileName, String appendLog) {
// log file
if (logFileName==null || logFileName.trim().length()==0) {
return;
}
File logFile = new File(logFileName);
if (!logFile.exists()) {
try {
logFile.createNewFile();
} catch (IOException e) {
logger.error(e.getMessage(), e);
return;
}
}
// log
if (appendLog == null) {
appendLog = "";
}
appendLog += "\r\n";
// append file content
FileOutputStream fos = null;
try {
fos = new FileOutputStream(logFile, true);
fos.write(appendLog.getBytes("utf-8"));
fos.flush();
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
}
/**
* support read log-file
*
* @param logFileName
* @return log content
*/
public static LogResult readLog(String logFileName, int fromLineNum){
// valid log file
if (logFileName==null || logFileName.trim().length()==0) {
return new LogResult(fromLineNum, 0, "readLog fail, logFile not found", true);
}
File logFile = new File(logFileName);
if (!logFile.exists()) {
return new LogResult(fromLineNum, 0, "readLog fail, logFile not exists", true);
}
// read file
StringBuffer logContentBuffer = new StringBuffer();
int toLineNum = 0;
LineNumberReader reader = null;
try {
//reader = new LineNumberReader(new FileReader(logFile));
reader = new LineNumberReader(new InputStreamReader(new FileInputStream(logFile), "utf-8"));
String line;
while ((line = reader.readLine())!=null) {
toLineNum = reader.getLineNumber(); // [from, to], start as 1
if (toLineNum >= fromLineNum) {
logContentBuffer.append(line).append("\n");
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
// result
LogResult logResult = new LogResult(fromLineNum, toLineNum, logContentBuffer.toString(), false);
return logResult;
/*
// it will return the number of characters actually skipped
reader.skip(Long.MAX_VALUE);
int maxLineNum = reader.getLineNumber();
maxLineNum++; // 最大行号
*/
}
/**
* read log data
* @param logFile
* @return log line content
*/
public static String readLines(File logFile){
BufferedReader reader = null;
try {
reader = new BufferedReader(new InputStreamReader(new FileInputStream(logFile), "utf-8"));
if (reader != null) {
StringBuilder sb = new StringBuilder();
String line = null;
while ((line = reader.readLine()) != null) {
sb.append(line).append("\n");
}
return sb.toString();
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
return null;
}
}

View File

@@ -0,0 +1,77 @@
package com.platform.core.log;
import com.platform.core.util.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.FormattingTuple;
import org.slf4j.helpers.MessageFormatter;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
public class JobLogger {
private static Logger logger = LoggerFactory.getLogger("web logger");
/**
* append log
*
* @param call
* @param appendLog
*/
private static void logDetail(StackTraceElement call, String appendLog) {
// "yyyy-MM-dd HH:mm:ss [fileName.MethodName-LineNumber] log";
StringBuffer buffer = new StringBuffer();
buffer.append(DateUtil.formatDateTime(new Date())).append(" ")
.append("[" + call.getFileName().replace("java", "") + call.getMethodName())
.append("-" + call.getLineNumber() + "]").append(" ")
.append(appendLog != null ? appendLog : "");
String formatAppendLog = buffer.toString();
String logFileName = JobFileAppender.contextHolder.get();
if (logFileName != null && logFileName.trim().length() > 0) {
JobFileAppender.appendLog(logFileName, formatAppendLog);
} else {
logger.info(">>> {}", formatAppendLog);
}
}
/**
* append log with pattern
*
* @param appendLogPattern like "aaa {} bbb {} ccc"
* @param appendLogArguments like "111, true"
*/
public static void log(String appendLogPattern, Object... appendLogArguments) {
FormattingTuple ft = MessageFormatter.arrayFormat(appendLogPattern, appendLogArguments);
String appendLog = ft.getMessage();
/*appendLog = appendLogPattern;
if (appendLogArguments!=null && appendLogArguments.length>0) {
appendLog = MessageFormat.format(appendLogPattern, appendLogArguments);
}*/
StackTraceElement callInfo = new Throwable().getStackTrace()[1];
logDetail(callInfo, appendLog);
}
/**
* append exception stack
*
* @param e
*/
public static void log(Throwable e) {
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String appendLog = stringWriter.toString();
StackTraceElement callInfo = new Throwable().getStackTrace()[1];
logDetail(callInfo, appendLog);
}
}

View File

@@ -0,0 +1,121 @@
package com.platform.core.thread;
import com.platform.core.biz.AdminBiz;
import com.platform.core.biz.model.RegistryParam;
import com.platform.core.biz.model.ReturnT;
import com.platform.core.enums.RegistryConfig;
import com.platform.core.executor.JobExecutor;
import com.platform.core.util.OSUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
public class ExecutorRegistryThread {
private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class);
private static ExecutorRegistryThread instance = new ExecutorRegistryThread();
public static ExecutorRegistryThread getInstance(){
return instance;
}
private Thread registryThread;
private volatile boolean toStop = false;
public void start(final String appName, final String address){
// valid
if (appName==null || appName.trim().length()==0) {
logger.warn(">>>>>>>>>>> web, executor registry config fail, appName is null.");
return;
}
if (JobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> web, executor registry config fail, adminAddresses is null.");
return;
}
registryThread = new Thread(() -> {
// registry
while (!toStop) {
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address, OSUtils
.cpuUsage(),OSUtils.memoryUsage(),OSUtils.loadAverage());
for (AdminBiz adminBiz: JobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> web registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> web registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> web registry error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
if (!toStop) {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
} catch (InterruptedException e) {
if (!toStop) {
logger.warn(">>>>>>>>>>> web, executor registry thread interrupted, error msg:{}", e.getMessage());
}
}
}
// registry remove
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, address);
for (AdminBiz adminBiz: JobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.info(">>>>>>>>>>> web registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> web registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
if (!toStop) {
logger.info(">>>>>>>>>>> web registry-remove error, registryParam:{}", registryParam, e);
}
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> web, executor registry thread destory.");
});
registryThread.setDaemon(true);
registryThread.setName("web, executor ExecutorRegistryThread");
registryThread.start();
}
public void toStop() {
toStop = true;
// interrupt and wait
registryThread.interrupt();
try {
registryThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}

View File

@@ -0,0 +1,112 @@
package com.platform.core.thread;
import com.platform.core.log.JobFileAppender;
import com.platform.core.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class JobLogFileCleanThread {
private static Logger logger = LoggerFactory.getLogger(JobLogFileCleanThread.class);
private static JobLogFileCleanThread instance = new JobLogFileCleanThread();
public static JobLogFileCleanThread getInstance() {
return instance;
}
private Thread localThread;
private volatile boolean toStop = false;
public void start(final long logRetentionDays) {
// limit min value
if (logRetentionDays < 3) {
return;
}
localThread = new Thread(() -> {
while (!toStop) {
try {
// clean log dir, over logRetentionDays
File[] childDirs = new File(JobFileAppender.getLogPath()).listFiles();
if (childDirs != null && childDirs.length > 0) {
// today
Calendar todayCal = Calendar.getInstance();
todayCal.set(Calendar.HOUR_OF_DAY, 0);
todayCal.set(Calendar.MINUTE, 0);
todayCal.set(Calendar.SECOND, 0);
todayCal.set(Calendar.MILLISECOND, 0);
Date todayDate = todayCal.getTime();
for (File childFile : childDirs) {
// valid
if (!childFile.isDirectory() || childFile.getName().indexOf("-") == -1) {
continue;
}
// file create date
Date logFileCreateDate = null;
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
logFileCreateDate = simpleDateFormat.parse(childFile.getName());
} catch (ParseException e) {
logger.error(e.getMessage(), e);
}
// valid
if (logFileCreateDate == null) {
continue;
}
if ((todayDate.getTime() - logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000)) {
FileUtil.deleteRecursively(childFile);
}
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
TimeUnit.DAYS.sleep(1);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> web, executor JobLogFileCleanThread thread destory.");
});
localThread.setDaemon(true);
localThread.setName("web, executor JobLogFileCleanThread");
localThread.start();
}
public void toStop() {
toStop = true;
if (localThread == null) {
return;
}
// interrupt and wait
localThread.interrupt();
try {
localThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}

View File

@@ -0,0 +1,214 @@
package com.platform.core.thread;
import com.platform.core.biz.model.HandleCallbackParam;
import com.platform.core.biz.model.ReturnT;
import com.platform.core.biz.model.TriggerParam;
import com.platform.core.executor.JobExecutor;
import com.platform.core.handler.IJobHandler;
import com.platform.core.log.JobFileAppender;
import com.platform.core.log.JobLogger;
import com.platform.core.util.ShardingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class JobThread extends Thread {
private static Logger logger = LoggerFactory.getLogger(JobThread.class);
private int jobId;
private IJobHandler handler;
private LinkedBlockingQueue<TriggerParam> triggerQueue;
private Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID
private volatile boolean toStop = false;
private String stopReason;
private boolean running = false; // if running job
private int idleTimes = 0; // idel times
public JobThread(int jobId, IJobHandler handler) {
this.jobId = jobId;
this.handler = handler;
this.triggerQueue = new LinkedBlockingQueue<>();
this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<>());
}
public IJobHandler getHandler() {
return handler;
}
/**
* new trigger to queue
*
* @param triggerParam
* @return
*/
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
// avoid repeat
if (triggerLogIdSet.contains(triggerParam.getLogId())) {
logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
return new ReturnT<>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
}
triggerLogIdSet.add(triggerParam.getLogId());
triggerQueue.add(triggerParam);
return ReturnT.SUCCESS;
}
/**
* kill job thread
*
* @param stopReason
*/
public void toStop(String stopReason) {
/**
* Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep)
* 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;
* 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;
*/
this.toStop = true;
this.stopReason = stopReason;
}
/**
* is running job
*
* @return
*/
public boolean isRunningOrHasQueue() {
return running || triggerQueue.size() > 0;
}
@Override
public void run() {
// init
try {
handler.init();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
// execute
while (!toStop) {
running = false;
idleTimes++;
TriggerParam tgParam = null;
ReturnT<String> executeResult = null;
try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
tgParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (tgParam != null) {
running = true;
idleTimes = 0;
triggerLogIdSet.remove(tgParam.getLogId());
// log filename, like "logPath/yyyy-MM-dd/9999.log"
String logFileName = JobFileAppender.makeLogFileName(new Date(tgParam.getLogDateTime()), tgParam.getLogId());
JobFileAppender.contextHolder.set(logFileName);
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(tgParam.getBroadcastIndex(), tgParam.getBroadcastTotal()));
// execute
JobLogger.log("<br>----------- web job execute start -----------<br>----------- Param:" + tgParam.getExecutorParams());
if (tgParam.getExecutorTimeout() > 0) {
// limit timeout
Thread futureThread = null;
try {
final TriggerParam tgParamT = tgParam;
FutureTask<ReturnT<String>> futureTask = new FutureTask<>(() -> handler.execute(tgParamT));
futureThread = new Thread(futureTask);
futureThread.start();
executeResult = futureTask.get(tgParam.getExecutorTimeout(), TimeUnit.MINUTES);
} catch (TimeoutException e) {
JobLogger.log("<br>----------- web job execute timeout");
JobLogger.log(e);
executeResult = new ReturnT<>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
} finally {
futureThread.interrupt();
}
} else {
// just execute
executeResult = handler.execute(tgParam);
}
if (executeResult == null) {
executeResult = IJobHandler.FAIL;
} else {
executeResult.setMsg(
(executeResult != null && executeResult.getMsg() != null && executeResult.getMsg().length() > 50000)
? executeResult.getMsg().substring(0, 50000).concat("...")
: executeResult.getMsg());
executeResult.setContent(null); // limit obj size
}
JobLogger.log("<br>----------- web job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
} else {
if (idleTimes > 30) {
if (triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
JobExecutor.removeJobThread(jobId, "executor idel times over limit.");
}
}
}
} catch (Throwable e) {
if (toStop) {
JobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
}
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
executeResult = new ReturnT<>(ReturnT.FAIL_CODE, errorMsg);
JobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- web job execute end(error) -----------");
} finally {
// 终止操作暂不监控状态
if (tgParam != null && tgParam.getJobId() != -1) {
// callback handler info
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(tgParam.getLogId(), tgParam.getLogDateTime(), executeResult));
} else {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(tgParam.getLogId(), tgParam.getLogDateTime(), stopResult));
}
}
}
}
// callback trigger request in queue
while (triggerQueue != null && triggerQueue.size() > 0) {
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam != null) {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
}
}
// destroy
try {
handler.destroy();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> web JobThread stoped, hashCode:{}", Thread.currentThread());
}
}

View File

@@ -0,0 +1,238 @@
package com.platform.core.thread;
import com.platform.core.biz.AdminBiz;
import com.platform.core.biz.model.HandleProcessCallbackParam;
import com.platform.core.biz.model.ReturnT;
import com.platform.core.enums.RegistryConfig;
import com.platform.core.executor.JobExecutor;
import com.platform.core.log.JobFileAppender;
import com.platform.core.log.JobLogger;
import com.platform.core.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class ProcessCallbackThread {
private static Logger logger = LoggerFactory.getLogger(ProcessCallbackThread.class);
private static ProcessCallbackThread instance = new ProcessCallbackThread();
public static ProcessCallbackThread getInstance() {
return instance;
}
/**
* job results callback queue
*/
private LinkedBlockingQueue<HandleProcessCallbackParam> callBackQueue = new LinkedBlockingQueue<>();
public static void pushCallBack(HandleProcessCallbackParam callback) {
getInstance().callBackQueue.add(callback);
logger.debug(">>>>>>>>>>> web, push process callback request, logId:{}", callback.getLogId());
}
/**
* callback thread
*/
private Thread processCallbackThread;
private Thread processRetryCallbackThread;
private volatile boolean toStop = false;
public void start() {
// valid
if (JobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> web, executor callback config fail, adminAddresses is null.");
return;
}
// callback
processCallbackThread = new Thread(() -> {
// normal callback
while (!toStop) {
try {
HandleProcessCallbackParam callback = getInstance().callBackQueue.take();
// callback list param
List<HandleProcessCallbackParam> callbackParamList = new ArrayList<HandleProcessCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
// callback, will retry if error
if (callbackParamList.size() > 0) {
doCallback(callbackParamList);
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
// last callback
try {
List<HandleProcessCallbackParam> callbackParamList = new ArrayList<HandleProcessCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
if (callbackParamList != null && callbackParamList.size() > 0) {
doCallback(callbackParamList);
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> web, executor callback thread destory.");
});
processCallbackThread.setDaemon(true);
processCallbackThread.setName("web, executor TriggerCallbackThread");
processCallbackThread.start();
// retry
processRetryCallbackThread = new Thread(() -> {
while (!toStop) {
try {
retryFailCallbackFile();
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> web, executor retry callback thread destory.");
});
processRetryCallbackThread.setDaemon(true);
processRetryCallbackThread.start();
}
public void toStop() {
toStop = true;
// stop callback, interrupt and wait
if (processCallbackThread != null) { // support empty admin address
processCallbackThread.interrupt();
try {
processCallbackThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
// stop retry, interrupt and wait
if (processRetryCallbackThread != null) {
processRetryCallbackThread.interrupt();
try {
processRetryCallbackThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
/**
* do callback, will retry if error
*
* @param callbackParamList
*/
private void doCallback(List<HandleProcessCallbackParam> callbackParamList) {
boolean callbackRet = false;
// callback, will retry if error
for (AdminBiz adminBiz : JobExecutor.getAdminBizList()) {
try {
ReturnT<String> callbackResult = adminBiz.processCallback(callbackParamList);
if (callbackResult != null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
callbackLog(callbackParamList, "<br>----------- web job callback finish.");
callbackRet = true;
break;
} else {
callbackLog(callbackParamList, "<br>----------- web job callback fail, callbackResult:" + callbackResult);
}
} catch (Exception e) {
callbackLog(callbackParamList, "<br>----------- web job callback error, errorMsg:" + e.getMessage());
}
}
if (!callbackRet) {
appendFailCallbackFile(callbackParamList);
}
}
/**
* callback log
*/
private void callbackLog(List<HandleProcessCallbackParam> callbackParamList, String logContent) {
for (HandleProcessCallbackParam callbackParam : callbackParamList) {
String logFileName = JobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTime()), callbackParam.getLogId());
JobFileAppender.contextHolder.set(logFileName);
JobLogger.log(logContent);
}
}
// ---------------------- fail-callback file ----------------------
private static String failCallbackFilePath = JobFileAppender.getLogPath().concat(File.separator).concat("processcallbacklog").concat(File.separator);
private static String failCallbackFileName = failCallbackFilePath.concat("web-processcallback-{x}").concat(".log");
private void appendFailCallbackFile(List<HandleProcessCallbackParam> handleProcessCallbackParams) {
// valid
if (handleProcessCallbackParams == null || handleProcessCallbackParams.size() == 0) {
return;
}
// append file
byte[] callbackParamList_bytes = JobExecutor.getSerializer().serialize(handleProcessCallbackParams);
File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));
if (callbackLogFile.exists()) {
for (int i = 0; i < 100; i++) {
callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i))));
if (!callbackLogFile.exists()) {
break;
}
}
}
FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);
}
private void retryFailCallbackFile() {
// valid
File callbackLogPath = new File(failCallbackFilePath);
if (!callbackLogPath.exists()) {
return;
}
if (callbackLogPath.isFile()) {
callbackLogPath.delete();
}
if (!(callbackLogPath.isDirectory() && callbackLogPath.list() != null && callbackLogPath.list().length > 0)) {
return;
}
// load and clear file, retry
List<HandleProcessCallbackParam> params;
for (File f : callbackLogPath.listFiles()) {
byte[] ps = FileUtil.readFileContent(f);
params = (List<HandleProcessCallbackParam>) JobExecutor.getSerializer().deserialize(ps, HandleProcessCallbackParam.class);
f.delete();
doCallback(params);
}
}
}

View File

@@ -0,0 +1,239 @@
package com.platform.core.thread;
import com.platform.core.biz.AdminBiz;
import com.platform.core.biz.model.HandleCallbackParam;
import com.platform.core.biz.model.ReturnT;
import com.platform.core.enums.RegistryConfig;
import com.platform.core.executor.JobExecutor;
import com.platform.core.log.JobFileAppender;
import com.platform.core.log.JobLogger;
import com.platform.core.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class TriggerCallbackThread {
private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);
private static TriggerCallbackThread instance = new TriggerCallbackThread();
public static TriggerCallbackThread getInstance() {
return instance;
}
/**
* job results callback queue
*/
private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<>();
public static void pushCallBack(HandleCallbackParam callback) {
getInstance().callBackQueue.add(callback);
logger.debug(">>>>>>>>>>> web, push callback request, logId:{}", callback.getLogId());
}
/**
* callback thread
*/
private Thread triggerCallbackThread;
private Thread triggerRetryCallbackThread;
private volatile boolean toStop = false;
public void start() {
// valid
if (JobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> web, executor callback config fail, adminAddresses is null.");
return;
}
// callback
triggerCallbackThread = new Thread(() -> {
// normal callback
while (!toStop) {
try {
HandleCallbackParam callback = getInstance().callBackQueue.take();
// callback list param
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
// callback, will retry if error
if (callbackParamList.size() > 0) {
doCallback(callbackParamList);
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
// last callback
try {
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
if (callbackParamList != null && callbackParamList.size() > 0) {
doCallback(callbackParamList);
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> web, executor callback thread destory.");
});
triggerCallbackThread.setDaemon(true);
triggerCallbackThread.setName("web, executor TriggerCallbackThread");
triggerCallbackThread.start();
// retry
triggerRetryCallbackThread = new Thread(() -> {
while (!toStop) {
try {
retryFailCallbackFile();
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> web, executor retry callback thread destory.");
});
triggerRetryCallbackThread.setDaemon(true);
triggerRetryCallbackThread.start();
}
public void toStop() {
toStop = true;
// stop callback, interrupt and wait
if (triggerCallbackThread != null) { // support empty admin address
triggerCallbackThread.interrupt();
try {
triggerCallbackThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
// stop retry, interrupt and wait
if (triggerRetryCallbackThread != null) {
triggerRetryCallbackThread.interrupt();
try {
triggerRetryCallbackThread.join();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
/**
* do callback, will retry if error
*
* @param callbackParamList
*/
private void doCallback(List<HandleCallbackParam> callbackParamList) {
boolean callbackRet = false;
// callback, will retry if error
for (AdminBiz adminBiz : JobExecutor.getAdminBizList()) {
try {
ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);
if (callbackResult != null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
callbackLog(callbackParamList, "<br>----------- web job callback finish.");
callbackRet = true;
break;
} else {
callbackLog(callbackParamList, "<br>----------- web job callback fail, callbackResult:" + callbackResult);
}
} catch (Exception e) {
callbackLog(callbackParamList, "<br>----------- web job callback error, errorMsg:" + e.getMessage());
}
}
if (!callbackRet) {
appendFailCallbackFile(callbackParamList);
}
}
/**
* callback log
*/
private void callbackLog(List<HandleCallbackParam> callbackParamList, String logContent) {
for (HandleCallbackParam c : callbackParamList) {
String logFileName = JobFileAppender.makeLogFileName(new Date(c.getLogDateTim()), c.getLogId());
JobFileAppender.contextHolder.set(logFileName);
JobLogger.log(logContent);
}
}
// ---------------------- fail-callback file ----------------------
private static String failCallbackFilePath = JobFileAppender.getLogPath().concat(File.separator).concat("callbacklog").concat(File.separator);
private static String failCallbackFileName = failCallbackFilePath.concat("web-callback-{x}").concat(".log");
private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList) {
// valid
if (callbackParamList == null || callbackParamList.size() == 0) {
return;
}
// append file
byte[] callbackParamList_bytes = JobExecutor.getSerializer().serialize(callbackParamList);
File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));
if (callbackLogFile.exists()) {
for (int i = 0; i < 100; i++) {
callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i))));
if (!callbackLogFile.exists()) {
break;
}
}
}
FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);
}
private void retryFailCallbackFile() {
// valid
File callbackLogPath = new File(failCallbackFilePath);
if (!callbackLogPath.exists()) {
return;
}
if (callbackLogPath.isFile()) {
callbackLogPath.delete();
}
if (!(callbackLogPath.isDirectory() && callbackLogPath.list() != null && callbackLogPath.list().length > 0)) {
return;
}
// load and clear file, retry
List<HandleCallbackParam> params;
for (File f : callbackLogPath.listFiles()) {
byte[] ps = FileUtil.readFileContent(f);
params = (List<HandleCallbackParam>) JobExecutor.getSerializer().deserialize(ps, HandleCallbackParam.class);
f.delete();
doCallback(params);
}
}
}

View File

@@ -0,0 +1,65 @@
package com.platform.core.util;
public final class Constants {
public static final String MYSQL_DATABASE = "Unknown database";
public static final String MYSQL_CONNEXP = "Communications link failure";
public static final String MYSQL_ACCDENIED = "Access denied";
public static final String MYSQL_TABLE_NAME_ERR1 = "Table";
public static final String MYSQL_TABLE_NAME_ERR2 = "doesn't exist";
public static final String MYSQL_SELECT_PRI = "SELECT command denied to user";
public static final String MYSQL_COLUMN1 = "Unknown column";
public static final String MYSQL_COLUMN2 = "field list";
public static final String MYSQL_WHERE = "where clause";
public static final String ORACLE_DATABASE = "ORA-12505";
public static final String ORACLE_CONNEXP = "The Network Adapter could not establish the connection";
public static final String ORACLE_ACCDENIED = "ORA-01017";
public static final String ORACLE_TABLE_NAME = "table or view does not exist";
public static final String ORACLE_SELECT_PRI = "insufficient privileges";
public static final String ORACLE_SQL = "invalid identifier";
public static final String CMDWINDOW ="cmd /c python";
public static final String CMDLINUX ="python";
public static final String CMDWINDOWTASKKILL ="taskkill /pid ";
public static final String CMDLINUXTASKKILL ="kill -9 ";
public static final String SPLIT_COMMA = ",";
public static final String SPLIT_AT = "@";
public static final String SPLIT_COLON = ";";
public static final String SPLIT_POINT = ".";
public static final String SPLIT_SCOLON=":";
public static final String SPLIT_HYPHEN = "-";
public static final String SPLIT_DIVIDE = "/";
public static final String SPLIT_STAR = "*";
public static final String SPLIT_QUESTION = "?";
public static final String EQUAL = "=";
public static final String SPLIT_AMPERSAND = "&";
public static final String AND = "AND";
public static final String SPACE = " ";
public static final String STRING_BLANK = "";
public static final String MONGO_URL_PREFIX = "mongodb://";
/**
* UTF-8 字符集
*/
public static final String UTF8CODE = "UTF-8";
/**
* GBK 字符集
*/
public static final String GBKCODE = "GBK";
/**
* http请求
*/
public static final String HTTPCODE = "http://";
/**
* https请求
*/
public static final String HTTPSCODE = "https://";
}

View File

@@ -0,0 +1,143 @@
package com.platform.core.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class DateUtil {
// ---------------------- format parse ----------------------
private static Logger logger = LoggerFactory.getLogger(DateUtil.class);
private static final String DATE_FORMAT = "yyyy-MM-dd";
private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
private static final ThreadLocal<Map<String, DateFormat>> dateFormatThreadLocal = new ThreadLocal<Map<String, DateFormat>>();
private static DateFormat getDateFormat(String pattern) {
if (pattern==null || pattern.trim().length()==0) {
throw new IllegalArgumentException("pattern cannot be empty.");
}
Map<String, DateFormat> dateFormatMap = dateFormatThreadLocal.get();
if(dateFormatMap!=null && dateFormatMap.containsKey(pattern)){
return dateFormatMap.get(pattern);
}
synchronized (dateFormatThreadLocal) {
if (dateFormatMap == null) {
dateFormatMap = new HashMap<>();
}
dateFormatMap.put(pattern, new SimpleDateFormat(pattern));
dateFormatThreadLocal.set(dateFormatMap);
}
return dateFormatMap.get(pattern);
}
/**
* format datetime. like "yyyy-MM-dd"
*
* @param date
* @return
* @throws ParseException
*/
public static String formatDate(Date date) {
return format(date, DATE_FORMAT);
}
/**
* format date. like "yyyy-MM-dd HH:mm:ss"
*
* @param date
* @return
* @throws ParseException
*/
public static String formatDateTime(Date date) {
return format(date, DATETIME_FORMAT);
}
/**
* format date
*
* @param date
* @param patten
* @return
* @throws ParseException
*/
public static String format(Date date, String patten) {
return getDateFormat(patten).format(date);
}
/**
* parse date string, like "yyyy-MM-dd HH:mm:s"
*
* @param dateString
* @return
* @throws ParseException
*/
public static Date parseDate(String dateString){
return parse(dateString, DATE_FORMAT);
}
/**
* parse datetime string, like "yyyy-MM-dd HH:mm:ss"
*
* @param dateString
* @return
* @throws ParseException
*/
public static Date parseDateTime(String dateString) {
return parse(dateString, DATETIME_FORMAT);
}
/**
* parse date
*
* @param dateString
* @param pattern
* @return
* @throws ParseException
*/
public static Date parse(String dateString, String pattern) {
try {
Date date = getDateFormat(pattern).parse(dateString);
return date;
} catch (Exception e) {
logger.warn("parse date error, dateString = {}, pattern={}; errorMsg = {}", dateString, pattern, e.getMessage());
return null;
}
}
// ---------------------- add date ----------------------
public static Date addYears(final Date date, final int amount) {
return add(date, Calendar.YEAR, amount);
}
public static Date addMonths(final Date date, final int amount) {
return add(date, Calendar.MONTH, amount);
}
public static Date addDays(final Date date, final int amount) {
return add(date, Calendar.DAY_OF_MONTH, amount);
}
private static Date add(final Date date, final int calendarField, final int amount) {
if (date == null) {
return null;
}
final Calendar c = Calendar.getInstance();
c.setTime(date);
c.add(calendarField, amount);
return c.getTime();
}
}

View File

@@ -0,0 +1,174 @@
package com.platform.core.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
public class FileUtil {
private static Logger logger = LoggerFactory.getLogger(FileUtil.class);
/**
* delete recursively
*
* @param root
* @return
*/
public static boolean deleteRecursively(File root) {
if (root != null && root.exists()) {
if (root.isDirectory()) {
File[] children = root.listFiles();
if (children != null) {
for (File child : children) {
deleteRecursively(child);
}
}
}
return root.delete();
}
return false;
}
public static void deleteFile(String fileName) {
// file
File file = new File(fileName);
if (file.exists()) {
file.delete();
}
}
public static void writeFileContent(File file, byte[] data) {
// file
if (!file.exists()) {
file.getParentFile().mkdirs();
}
// append file content
FileOutputStream fos = null;
try {
fos = new FileOutputStream(file);
fos.write(data);
fos.flush();
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
}
public static byte[] readFileContent(File file) {
Long filelength = file.length();
byte[] filecontent = new byte[filelength.intValue()];
FileInputStream in = null;
try {
in = new FileInputStream(file);
in.read(filecontent);
in.close();
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
return filecontent;
}
/*public static void appendFileLine(String fileName, String content) {
// file
File file = new File(fileName);
if (!file.exists()) {
try {
file.createNewFile();
} catch (IOException e) {
logger.error(e.getMessage(), e);
return;
}
}
// content
if (content == null) {
content = "";
}
content += "\r\n";
// append file content
FileOutputStream fos = null;
try {
fos = new FileOutputStream(file, true);
fos.write(content.getBytes("utf-8"));
fos.flush();
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
if (fos != null) {
try {
fos.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
}
public static List<String> loadFileLines(String fileName){
List<String> result = new ArrayList<>();
// valid log file
File file = new File(fileName);
if (!file.exists()) {
return result;
}
// read file
StringBuffer logContentBuffer = new StringBuffer();
int toLineNum = 0;
LineNumberReader reader = null;
try {
//reader = new LineNumberReader(new FileReader(logFile));
reader = new LineNumberReader(new InputStreamReader(new FileInputStream(file), "utf-8"));
String line = null;
while ((line = reader.readLine())!=null) {
if (line!=null && line.trim().length()>0) {
result.add(line);
}
}
} catch (IOException e) {
logger.error(e.getMessage(), e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}
return result;
}*/
}

View File

@@ -0,0 +1,119 @@
package com.platform.core.util;
import com.platform.core.biz.model.ReturnT;
import com.platform.rpc.util.json.BasicJson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Map;
public class JobRemotingUtil {
private static Logger logger = LoggerFactory.getLogger(JobRemotingUtil.class);
public static String XXL_RPC_ACCESS_TOKEN = "XXL-RPC-ACCESS-TOKEN";
/**
* post
*
* @param url
* @param accessToken
* @param requestObj
* @return
*/
public static ReturnT<String> postBody(String url, String accessToken, Object requestObj, int timeout) {
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
// connection
URL realUrl = new URL(url);
connection = (HttpURLConnection) realUrl.openConnection();
// connection setting
connection.setRequestMethod("POST");
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(timeout * 1000);
connection.setConnectTimeout(3 * 1000);
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
if(accessToken!=null && accessToken.trim().length()>0){
connection.setRequestProperty(XXL_RPC_ACCESS_TOKEN, accessToken);
}
// do connection
connection.connect();
// write requestBody
String requestBody = BasicJson.toJson(requestObj);
DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
dataOutputStream.writeBytes(requestBody);
dataOutputStream.flush();
dataOutputStream.close();
/*byte[] requestBodyBytes = requestBody.getBytes("UTF-8");
connection.setRequestProperty("Content-Length", String.valueOf(requestBodyBytes.length));
OutputStream outwritestream = connection.getOutputStream();
outwritestream.write(requestBodyBytes);
outwritestream.flush();
outwritestream.close();*/
// valid StatusCode
int statusCode = connection.getResponseCode();
if (statusCode != 200) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting fail, StatusCode("+ statusCode +") invalid. for url : " + url);
}
// result
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
StringBuilder result = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
result.append(line);
}
String resultJson = result.toString();
// parse returnT
try {
Map<String, Object> resultMap = BasicJson.parseMap(resultJson);
ReturnT<String> returnT = new ReturnT<String>();
if (resultMap==null) {
returnT.setCode(ReturnT.FAIL_CODE);
returnT.setMsg("AdminBizClient Remoting call fail.");
} else {
returnT.setCode(Integer.valueOf(String.valueOf(resultMap.get("code"))));
returnT.setMsg(String.valueOf(resultMap.get("msg")));
returnT.setContent(String.valueOf(resultMap.get("content")));
}
return returnT;
} catch (Exception e) {
logger.error("xxl-rpc remoting (url="+url+") response content invalid("+ resultJson +").", e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting (url="+url+") response content invalid("+ resultJson +").");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-rpc remoting error("+ e.getMessage() +"), for url : " + url);
} finally {
try {
if (bufferedReader != null) {
bufferedReader.close();
}
if (connection != null) {
connection.disconnect();
}
} catch (Exception e2) {
logger.error(e2.getMessage(), e2);
}
}
}
}

View File

@@ -0,0 +1,11 @@
package com.platform.core.util;
import com.sun.jna.Library;
import com.sun.jna.Native;
public interface Kernel32 extends Library {
Kernel32 INSTANCE = (Kernel32) Native.loadLibrary("kernel32", Kernel32.class);
long GetProcessId(Long hProcess);
}

View File

@@ -0,0 +1,128 @@
package com.platform.core.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import oshi.SystemInfo;
import oshi.hardware.CentralProcessor;
import oshi.hardware.GlobalMemory;
import oshi.hardware.HardwareAbstractionLayer;
import java.math.RoundingMode;
import java.text.DecimalFormat;
/**
* os utils
*/
public class OSUtils {
private static final Logger logger = LoggerFactory.getLogger(OSUtils.class);
private static final SystemInfo SI = new SystemInfo();
public static final String TWO_DECIMAL = "0.00";
private static HardwareAbstractionLayer hal = SI.getHardware();
private OSUtils() {
}
/**
* get memory usage
* Keep 2 decimal
*
* @return percent %
*/
public static double memoryUsage() {
GlobalMemory memory = hal.getMemory();
double memoryUsage = (memory.getTotal() - memory.getAvailable()) * 1.0 / memory.getTotal();
DecimalFormat df = new DecimalFormat(TWO_DECIMAL);
df.setRoundingMode(RoundingMode.HALF_UP);
return Double.parseDouble(df.format(memoryUsage * 100));
}
/**
* get available physical memory size
* <p>
* Keep 2 decimal
*
* @return available Physical Memory Size, unit: G
*/
public static double availablePhysicalMemorySize() {
GlobalMemory memory = hal.getMemory();
double availablePhysicalMemorySize = (memory.getAvailable() + memory.getSwapUsed()) / 1024.0 / 1024 / 1024;
DecimalFormat df = new DecimalFormat(TWO_DECIMAL);
df.setRoundingMode(RoundingMode.HALF_UP);
return Double.parseDouble(df.format(availablePhysicalMemorySize));
}
/**
* get total physical memory size
* <p>
* Keep 2 decimal
*
* @return available Physical Memory Size, unit: G
*/
public static double totalMemorySize() {
GlobalMemory memory = hal.getMemory();
double availablePhysicalMemorySize = memory.getTotal() / 1024.0 / 1024 / 1024;
DecimalFormat df = new DecimalFormat(TWO_DECIMAL);
df.setRoundingMode(RoundingMode.HALF_UP);
return Double.parseDouble(df.format(availablePhysicalMemorySize));
}
/**
* load average
*
* @return load average
*/
public static double loadAverage() {
double loadAverage = hal.getProcessor().getSystemLoadAverage();
DecimalFormat df = new DecimalFormat(TWO_DECIMAL);
df.setRoundingMode(RoundingMode.HALF_UP);
return Double.parseDouble(df.format(loadAverage));
}
/**
* get cpu usage
*
* @return cpu usage
*/
public static double cpuUsage() {
CentralProcessor processor = hal.getProcessor();
double cpuUsage = processor.getSystemCpuLoad();
DecimalFormat df = new DecimalFormat(TWO_DECIMAL);
df.setRoundingMode(RoundingMode.HALF_UP);
return Double.parseDouble(df.format(cpuUsage*100));
}
/**
* check memory and cpu usage
*
* @return check memory and cpu usage
*/
public static Boolean checkResource(double systemCpuLoad, double systemReservedMemory) {
// judging usage
double loadAverage = OSUtils.loadAverage();
//
double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
if (loadAverage > systemCpuLoad || availablePhysicalMemorySize < systemReservedMemory) {
logger.warn("load or availablePhysicalMemorySize(G) is too high, it's availablePhysicalMemorySize(G):{},loadAvg:{}", availablePhysicalMemorySize, loadAverage);
return false;
} else {
return true;
}
}
}

View File

@@ -0,0 +1,93 @@
package com.platform.core.util;
import com.platform.core.thread.JobThread;
import com.platform.core.log.JobLogger;
import com.sun.jna.Platform;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
/**
* ProcessUtil
*/
public class ProcessUtil {
private static Logger logger = LoggerFactory.getLogger(JobThread.class);
public static String getProcessId(Process process) {
long pid = -1;
Field field;
if (Platform.isWindows()) {
try {
field = process.getClass().getDeclaredField("handle");
field.setAccessible(true);
pid = Kernel32.INSTANCE.GetProcessId((Long) field.get(process));
} catch (Exception ex) {
logger.error("get process id for windows error {0}", ex);
}
} else if (Platform.isLinux() || Platform.isAIX()) {
try {
Class<?> clazz = Class.forName("java.lang.UNIXProcess");
field = clazz.getDeclaredField("pid");
field.setAccessible(true);
pid = (Integer) field.get(process);
} catch (Throwable e) {
logger.error("get process id for unix error {0}", e);
}
}
return String.valueOf(pid);
}
/**
* 关闭Linux进程
*
* @param pid 进程的PID
*/
public static boolean killProcessByPid(String pid) {
if (StringUtils.isEmpty(pid) || "-1".equals(pid)) {
throw new RuntimeException("Pid ==" + pid);
}
Process process = null;
BufferedReader reader = null;
String command = "";
boolean result;
if (Platform.isWindows()) {
command = "cmd.exe /c taskkill /PID " + pid + " /F /T ";
} else if (Platform.isLinux() || Platform.isAIX()) {
command = "kill " + pid;
}
try {
//杀掉进程
process = Runtime.getRuntime().exec(command);
reader = new BufferedReader(new InputStreamReader(process.getInputStream(), StandardCharsets.UTF_8));
String line;
while ((line = reader.readLine()) != null) {
JobLogger.log(line);
}
result = true;
} catch (Exception e) {
logger.error("kill pid error {0}", e);
result = false;
} finally {
if (process != null) {
process.destroy();
}
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
logger.error("reader close error {0}", e);
}
}
}
return result;
}
}

View File

@@ -0,0 +1,229 @@
package com.platform.core.util;
import com.platform.core.thread.ProcessCallbackThread;
import com.platform.core.biz.model.HandleProcessCallbackParam;
import com.platform.core.log.JobLogger;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
/**
* 1、内嵌编译器如"PythonInterpreter"无法引用扩展包因此推荐使用java调用控制台进程方式"Runtime.getRuntime().exec()"来运行脚本(shell或python)
* 2、因为通过java调用控制台进程方式实现需要保证目标机器PATH路径正确配置对应编译器
* 3、暂时脚本执行日志只能在脚本执行结束后一次性获取无法保证实时性因此为确保日志实时性可改为将脚本打印的日志存储在指定的日志文件上
* 4、python 异常输出优先级高于标准输出体现在Log文件中因此推荐通过logging方式打日志保持和异常信息一致否则用prinf日志顺序会错乱
* <p>
*/
public class ScriptUtil {
/**
* make script file
*
* @param scriptFileName
* @param content
* @throws IOException
*/
public static void markScriptFile(String scriptFileName, String content) throws IOException {
// make file, filePath/gluesource/666-123456789.py
FileOutputStream fileOutputStream = null;
try {
fileOutputStream = new FileOutputStream(scriptFileName);
fileOutputStream.write(content.getBytes("UTF-8"));
fileOutputStream.close();
} catch (Exception e) {
throw e;
} finally {
if (fileOutputStream != null) {
fileOutputStream.close();
}
}
}
/**
* 脚本执行,日志文件实时输出
*
* @param command
* @param scriptFile
* @param logFile
* @param params
* @return
*/
public static int execToFile(String command, String scriptFile, String logFile,long logId,long logDateTime, String... params) {
FileOutputStream fileOutputStream = null;
Thread inputThread = null;
Thread errThread = null;
try {
// file
fileOutputStream = new FileOutputStream(logFile, true);
// command
List<String> cmdarray = new ArrayList<>();
cmdarray.add(command);
cmdarray.add(scriptFile);
if (params != null && params.length > 0) {
for (String param : params) {
cmdarray.add(param);
}
}
String[] cmdarrayFinal = cmdarray.toArray(new String[cmdarray.size()]);
// process-exec
final Process process = Runtime.getRuntime().exec(cmdarrayFinal);
String prcsId = ProcessUtil.getProcessId(process);
JobLogger.log("------------------Process id: " + prcsId);
//update task process id
HandleProcessCallbackParam prcs = new HandleProcessCallbackParam(logId, logDateTime, prcsId);
ProcessCallbackThread.pushCallBack(prcs);
// log-thread
final FileOutputStream finalFileOutputStream = fileOutputStream;
inputThread = new Thread(() -> {
try {
copy(process.getInputStream(), finalFileOutputStream, new byte[1024]);
} catch (IOException e) {
JobLogger.log(e);
}
});
errThread = new Thread(() -> {
try {
copy(process.getErrorStream(), finalFileOutputStream, new byte[1024]);
} catch (IOException e) {
JobLogger.log(e);
}
});
inputThread.start();
errThread.start();
// process-wait
int exitValue = process.waitFor(); // exit code: 0=success, 1=error
// log-thread join
inputThread.join();
errThread.join();
return exitValue;
} catch (Exception e) {
JobLogger.log(e);
return -1;
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
JobLogger.log(e);
}
}
if (inputThread != null && inputThread.isAlive()) {
inputThread.interrupt();
}
if (errThread != null && errThread.isAlive()) {
errThread.interrupt();
}
}
}
/**
* 数据流CopyInput自动关闭Output不处理
*
* @param inputStream
* @param outputStream
* @param buffer
* @return
* @throws IOException
*/
private static long copy(InputStream inputStream, OutputStream outputStream, byte[] buffer) throws IOException {
try {
long total = 0;
for (; ; ) {
int res = inputStream.read(buffer);
if (res == -1) {
break;
}
if (res > 0) {
total += res;
if (outputStream != null) {
outputStream.write(buffer, 0, res);
}
}
}
outputStream.flush();
//out = null;
inputStream.close();
inputStream = null;
return total;
} finally {
if (inputStream != null) {
inputStream.close();
}
}
}
/**
* 脚本执行,日志文件实时输出
*
* 优点:支持将目标数据实时输出到指定日志文件中去
* 缺点:
* 标准输出和错误输出优先级固定,可能和脚本中顺序不一致
* Java无法实时获取
*
* <!-- commons-exec -->
* <dependency>
* <groupId>org.apache.commons</groupId>
* <artifactId>commons-exec</artifactId>
* <version>${commons-exec.version}</version>
* </dependency>
*
* @param command
* @param scriptFile
* @param logFile
* @param params
* @return
* @throws IOException
*/
/*public static int execToFileB(String command, String scriptFile, String logFile, String... params) throws IOException {
// 标准输出print null if watchdog timeout
// 错误输出logging + 异常 still exists if watchdog timeout
// 标准输入
FileOutputStream fileOutputStream = null; //
try {
fileOutputStream = new FileOutputStream(logFile, true);
PumpStreamHandler streamHandler = new PumpStreamHandler(fileOutputStream, fileOutputStream, null);
// command
CommandLine commandline = new CommandLine(command);
commandline.addArgument(scriptFile);
if (params!=null && params.length>0) {
commandline.addArguments(params);
}
// exec
DefaultExecutor exec = new DefaultExecutor();
exec.setExitValues(null);
exec.setStreamHandler(streamHandler);
int exitValue = exec.execute(commandline); // exit code: 0=success, 1=error
return exitValue;
} catch (Exception e) {
JobLogger.log(e);
return -1;
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
JobLogger.log(e);
}
}
}
}*/
}

View File

@@ -0,0 +1,43 @@
package com.platform.core.util;
public class ShardingUtil {
private static InheritableThreadLocal<ShardingVO> contextHolder = new InheritableThreadLocal<>();
public static class ShardingVO {
private int index; // sharding index
private int total; // sharding total
public ShardingVO(int index, int total) {
this.index = index;
this.total = total;
}
public int getIndex() {
return index;
}
public void setIndex(int index) {
this.index = index;
}
public int getTotal() {
return total;
}
public void setTotal(int total) {
this.total = total;
}
}
public static void setShardingVo(ShardingVO shardingVo) {
contextHolder.set(shardingVo);
}
public static ShardingVO getShardingVo() {
return contextHolder.get();
}
}