Merge branch 'master' of https://gitee.com/zhijiantianya/ruoyi-vue-pro into sms_temp_zzf_0127
This commit is contained in:
@@ -1,18 +1,35 @@
|
||||
package cn.iocoder.dashboard.framework.jackson.config;
|
||||
|
||||
import cn.iocoder.dashboard.framework.jackson.deser.LocalDateTimeDeserializer;
|
||||
import cn.iocoder.dashboard.framework.jackson.ser.LocalDateTimeSerializer;
|
||||
import cn.iocoder.dashboard.util.json.JsonUtils;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
@Configuration
|
||||
public class JacksonConfig {
|
||||
|
||||
@Bean
|
||||
@SuppressWarnings("InstantiationOfUtilityClass")
|
||||
public JsonUtils jsonUtils(ObjectMapper objectMapper) {
|
||||
SimpleModule simpleModule = new SimpleModule();
|
||||
/*
|
||||
* 1. 新增Long类型序列化规则,数值超过2^53-1,在JS会出现精度丢失问题,因此Long自动序列化为字符串类型
|
||||
* 2. 新增LocalDateTime序列化、反序列化规则
|
||||
*/
|
||||
simpleModule
|
||||
// .addSerializer(Long.class, ToStringSerializer.instance)
|
||||
// .addSerializer(Long.TYPE, ToStringSerializer.instance)
|
||||
.addSerializer(LocalDateTime.class, LocalDateTimeSerializer.INSTANCE)
|
||||
.addDeserializer(LocalDateTime.class, LocalDateTimeDeserializer.INSTANCE);
|
||||
|
||||
objectMapper.registerModules(simpleModule);
|
||||
|
||||
JsonUtils.init(objectMapper);
|
||||
return new JsonUtils();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,26 @@
|
||||
package cn.iocoder.dashboard.framework.jackson.deser;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.JsonDeserializer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
|
||||
/**
|
||||
* LocalDateTime反序列化规则
|
||||
* <p>
|
||||
* 会将毫秒级时间戳反序列化为LocalDateTime
|
||||
*/
|
||||
public class LocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> {
|
||||
|
||||
public static final LocalDateTimeDeserializer INSTANCE = new LocalDateTimeDeserializer();
|
||||
|
||||
@Override
|
||||
public LocalDateTime deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JsonProcessingException {
|
||||
return LocalDateTime.ofInstant(Instant.ofEpochMilli(p.getValueAsLong()), ZoneId.systemDefault());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
package cn.iocoder.dashboard.framework.jackson.ser;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonGenerator;
|
||||
import com.fasterxml.jackson.databind.JsonSerializer;
|
||||
import com.fasterxml.jackson.databind.SerializerProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
|
||||
/**
|
||||
* LocalDateTime序列化规则
|
||||
* <p>
|
||||
* 会将LocalDateTime序列化为毫秒级时间戳
|
||||
*/
|
||||
public class LocalDateTimeSerializer extends JsonSerializer<LocalDateTime> {
|
||||
|
||||
public static final LocalDateTimeSerializer INSTANCE = new LocalDateTimeSerializer();
|
||||
|
||||
@Override
|
||||
public void serialize(LocalDateTime value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
|
||||
gen.writeNumber(value.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
|
||||
}
|
||||
}
|
||||
@@ -81,7 +81,7 @@ public class ApiAccessLogFilter extends OncePerRequestFilter {
|
||||
Map<String, String> queryString, String requestBody, Exception ex) {
|
||||
// 处理用户信息
|
||||
accessLog.setUserId(WebFrameworkUtils.getLoginUserId(request));
|
||||
accessLog.setUserType(WebFrameworkUtils.getUesrType(request));
|
||||
accessLog.setUserType(WebFrameworkUtils.getUserType(request));
|
||||
// 设置访问结果
|
||||
CommonResult<?> result = WebFrameworkUtils.getCommonResult(request);
|
||||
if (result != null) {
|
||||
|
||||
@@ -1,14 +1,21 @@
|
||||
package cn.iocoder.dashboard.framework.redis.config;
|
||||
|
||||
import cn.hutool.system.SystemUtil;
|
||||
import cn.iocoder.dashboard.framework.redis.core.pubsub.AbstractChannelMessageListener;
|
||||
import cn.iocoder.dashboard.framework.redis.core.stream.AbstractStreamMessageListener;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.stream.Consumer;
|
||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
||||
import org.springframework.data.redis.connection.stream.ReadOffset;
|
||||
import org.springframework.data.redis.connection.stream.StreamOffset;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.listener.ChannelTopic;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.data.redis.serializer.RedisSerializer;
|
||||
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@@ -19,6 +26,9 @@ import java.util.List;
|
||||
@Slf4j
|
||||
public class RedisConfig {
|
||||
|
||||
/**
|
||||
* 创建 RedisTemplate Bean,使用 JSON 序列化方式
|
||||
*/
|
||||
@Bean
|
||||
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
|
||||
// 创建 RedisTemplate 对象
|
||||
@@ -27,11 +37,16 @@ public class RedisConfig {
|
||||
template.setConnectionFactory(factory);
|
||||
// 使用 String 序列化方式,序列化 KEY 。
|
||||
template.setKeySerializer(RedisSerializer.string());
|
||||
template.setHashKeySerializer(RedisSerializer.string());
|
||||
// 使用 JSON 序列化方式(库是 Jackson ),序列化 VALUE 。
|
||||
template.setValueSerializer(RedisSerializer.json());
|
||||
template.setHashValueSerializer(RedisSerializer.json());
|
||||
return template;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建 Redis Pub/Sub 广播消费的容器
|
||||
*/
|
||||
@Bean
|
||||
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory,
|
||||
List<AbstractChannelMessageListener<?>> listeners) {
|
||||
@@ -48,4 +63,57 @@ public class RedisConfig {
|
||||
return container;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建 Redis Stream 集群消费的容器
|
||||
*
|
||||
* Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
|
||||
*/
|
||||
@Bean(initMethod = "start", destroyMethod = "stop")
|
||||
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(RedisTemplate<String, Object> redisTemplate,
|
||||
List<AbstractStreamMessageListener<?>> listeners) {
|
||||
// 第一步,创建 StreamMessageListenerContainer 容器
|
||||
// 创建 options 配置
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions =
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
|
||||
.batchSize(10) // 一次性最多拉取多少条消息
|
||||
.targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化
|
||||
.build();
|
||||
// 创建 container 对象
|
||||
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer.create(
|
||||
redisTemplate.getRequiredConnectionFactory(), containerOptions);
|
||||
|
||||
// 第二步,注册监听器,消费对应的 Stream 主题
|
||||
String consumerName = buildConsumerName();
|
||||
// String consumerName = "110";
|
||||
listeners.forEach(listener -> {
|
||||
// 创建 listener 对应的消费者分组
|
||||
try {
|
||||
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
|
||||
} catch (Exception ignore) {}
|
||||
// 设置 listener 对应的 redisTemplate
|
||||
listener.setRedisTemplate(redisTemplate);
|
||||
// 创建 Consumer 对象
|
||||
Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
|
||||
// 设置 Consumer 消费进度,以最小消费进度为准
|
||||
StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
|
||||
// 设置 Consumer 监听
|
||||
StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
|
||||
.builder(streamOffset).consumer(consumer)
|
||||
.autoAcknowledge(false) // 不自动 ack
|
||||
.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
|
||||
container.register(builder.build(), listener);
|
||||
});
|
||||
return container;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建消费者名字,使用本地 IP + 进程编号的方式。
|
||||
* 参考自 RocketMQ clientId 的实现
|
||||
*
|
||||
* @return 消费者名字
|
||||
*/
|
||||
private static String buildConsumerName() {
|
||||
return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
package cn.iocoder.dashboard.framework.redis.core.pubsub;
|
||||
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
import cn.hutool.core.util.TypeUtil;
|
||||
import cn.iocoder.dashboard.util.json.JsonUtils;
|
||||
import lombok.SneakyThrows;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
|
||||
@@ -62,21 +61,11 @@ public abstract class AbstractChannelMessageListener<T extends ChannelMessage> i
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Class<T> getMessageClass() {
|
||||
Class<?> targetClass = getClass();
|
||||
while (targetClass.getSuperclass() != null) {
|
||||
// 如果不是 AbstractMessageListener 父类,继续向上查找
|
||||
if (targetClass.getSuperclass() != AbstractChannelMessageListener.class) {
|
||||
targetClass = targetClass.getSuperclass();
|
||||
continue;
|
||||
}
|
||||
// 如果是 AbstractMessageListener 父类,则解析泛型
|
||||
Type[] types = ((ParameterizedTypeImpl) targetClass.getGenericSuperclass()).getActualTypeArguments();
|
||||
if (ArrayUtil.isEmpty(types)) {
|
||||
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
||||
}
|
||||
return (Class<T>) types[0];
|
||||
Type type = TypeUtil.getTypeArgument(getClass(), 0);
|
||||
if (type == null) {
|
||||
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
||||
}
|
||||
throw new IllegalStateException(String.format("类型(%s) 找不到 AbstractMessageListener 父类", getClass().getName()));
|
||||
return (Class<T>) type;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -4,6 +4,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
|
||||
/**
|
||||
* Redis Channel Message 接口
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public interface ChannelMessage {
|
||||
|
||||
@@ -12,7 +14,7 @@ public interface ChannelMessage {
|
||||
*
|
||||
* @return Channel
|
||||
*/
|
||||
@JsonIgnore // 必须序列化
|
||||
@JsonIgnore // 避免序列化
|
||||
String getChannel();
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
package cn.iocoder.dashboard.framework.redis.core.stream;
|
||||
|
||||
import cn.hutool.core.util.TypeUtil;
|
||||
import cn.iocoder.dashboard.util.json.JsonUtils;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.SneakyThrows;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.stream.StreamListener;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
|
||||
/**
|
||||
* Redis Stream 监听器抽象类,用于实现集群消费
|
||||
*
|
||||
* @param <T> 消息类型。一定要填写噢,不然会报错
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public abstract class AbstractStreamMessageListener<T extends StreamMessage>
|
||||
implements StreamListener<String, ObjectRecord<String, String>> {
|
||||
|
||||
/**
|
||||
* 消息类型
|
||||
*/
|
||||
private final Class<T> messageType;
|
||||
/**
|
||||
* Redis Channel
|
||||
*/
|
||||
@Getter
|
||||
private final String streamKey;
|
||||
|
||||
/**
|
||||
* Redis 消费者分组,默认使用 spring.application.name 名字
|
||||
*/
|
||||
@Value("${spring.application.name}")
|
||||
@Getter
|
||||
private String group;
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Setter
|
||||
private RedisTemplate<String, ?> redisTemplate;
|
||||
|
||||
@SneakyThrows
|
||||
protected AbstractStreamMessageListener() {
|
||||
this.messageType = getMessageClass();
|
||||
this.streamKey = messageType.newInstance().getStreamKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(ObjectRecord<String, String> message) {
|
||||
// 消费消息
|
||||
T messageObj = JsonUtils.parseObject(message.getValue(), messageType);
|
||||
this.onMessage(messageObj);
|
||||
// ack 消息消费完成
|
||||
redisTemplate.opsForStream().acknowledge(group, message);
|
||||
// TODO 芋艿:需要额外考虑以下几个点:
|
||||
// 1. 处理异常的情况
|
||||
// 2. 发送日志;以及事务的结合
|
||||
// 3. 消费日志;以及通用的幂等性
|
||||
// 4. 消费失败的重试,https://zhuanlan.zhihu.com/p/60501638
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public abstract void onMessage(T message);
|
||||
|
||||
/**
|
||||
* 通过解析类上的泛型,获得消息类型
|
||||
*
|
||||
* @return 消息类型
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Class<T> getMessageClass() {
|
||||
Type type = TypeUtil.getTypeArgument(getClass(), 0);
|
||||
if (type == null) {
|
||||
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
|
||||
}
|
||||
return (Class<T>) type;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package cn.iocoder.dashboard.framework.redis.core.stream;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
|
||||
/**
|
||||
* Redis Stream Message 接口
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public interface StreamMessage {
|
||||
|
||||
/**
|
||||
* 获得 Redis Stream Key
|
||||
*
|
||||
* @return Channel
|
||||
*/
|
||||
@JsonIgnore // 避免序列化
|
||||
String getStreamKey();
|
||||
|
||||
}
|
||||
@@ -1,7 +1,10 @@
|
||||
package cn.iocoder.dashboard.framework.redis.core.util;
|
||||
|
||||
import cn.iocoder.dashboard.framework.redis.core.pubsub.ChannelMessage;
|
||||
import cn.iocoder.dashboard.framework.redis.core.stream.StreamMessage;
|
||||
import cn.iocoder.dashboard.util.json.JsonUtils;
|
||||
import org.springframework.data.redis.connection.stream.RecordId;
|
||||
import org.springframework.data.redis.connection.stream.StreamRecords;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
|
||||
/**
|
||||
@@ -17,8 +20,21 @@ public class RedisMessageUtils {
|
||||
* @param redisTemplate Redis 操作模板
|
||||
* @param message 消息
|
||||
*/
|
||||
public static <T extends ChannelMessage> void sendChannelMessage(RedisTemplate<?, ?> redisTemplate, T message) {
|
||||
public static <T extends ChannelMessage> void sendChannelMessage(RedisTemplate<?, ?> redisTemplate, T message) {
|
||||
redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送 Redis 消息,基于 Redis Stream 实现
|
||||
*
|
||||
* @param redisTemplate Redis 操作模板
|
||||
* @param message 消息
|
||||
* @return 消息记录的编号对象
|
||||
*/
|
||||
public static <T extends StreamMessage> RecordId sendStreamMessage(RedisTemplate<String, ?> redisTemplate, T message) {
|
||||
return redisTemplate.opsForStream().add(StreamRecords.newRecord()
|
||||
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
|
||||
.withStreamKey(message.getStreamKey())); // 设置 stream key
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -269,7 +269,7 @@ public class GlobalExceptionHandler {
|
||||
private void initExceptionLog(ApiErrorLogCreateDTO errorLog, HttpServletRequest request, Throwable e) {
|
||||
// 处理用户信息
|
||||
errorLog.setUserId(WebFrameworkUtils.getLoginUserId(request));
|
||||
errorLog.setUserType(WebFrameworkUtils.getUesrType(request));
|
||||
errorLog.setUserType(WebFrameworkUtils.getUserType(request));
|
||||
// 设置异常字段
|
||||
errorLog.setExceptionName(e.getClass().getName());
|
||||
errorLog.setExceptionMessage(ExceptionUtil.getMessage(e));
|
||||
|
||||
@@ -31,7 +31,7 @@ public class WebFrameworkUtils {
|
||||
return (Long) request.getAttribute(REQUEST_ATTRIBUTE_LOGIN_USER_ID);
|
||||
}
|
||||
|
||||
public static Integer getUesrType(HttpServletRequest request) {
|
||||
public static Integer getUserType(HttpServletRequest request) {
|
||||
return UserTypeEnum.ADMIN.getValue(); // TODO 芋艿:等后续优化
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user