1. 简单 redis stream 的 StreamMessage 和对应的消费者

2. 跑通 Redis Stream 的流程
This commit is contained in:
YunaiV
2021-03-20 20:39:01 +08:00
parent f5331ce6ac
commit be3fac7542
13 changed files with 458 additions and 3 deletions

View File

@@ -1,15 +1,21 @@
package cn.iocoder.dashboard.framework.redis.config;
import cn.hutool.core.net.NetUtil;
import cn.iocoder.dashboard.framework.redis.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.dashboard.framework.redis.core.stream.AbstractStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.util.ErrorHandler;
import java.time.Duration;
import java.util.List;
/**
@@ -48,4 +54,52 @@ public class RedisConfig {
return container;
}
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
RedisConnectionFactory factory, List<AbstractStreamMessageListener<?>> listeners) {
// 创建配置对象
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>>
streamMessageListenerContainerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// 一次性最多拉取多少条消息
.batchSize(10)
// 执行消息轮询的执行器
// .executor(this.threadPoolTaskExecutor)
// 消息消费异常的handler
.errorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable t) {
// throw new RuntimeException(t);
t.printStackTrace();
}
})
// 超时时间设置为0表示不超时超时后会抛出异常
.pollTimeout(Duration.ZERO)
// 序列化器
.serializer(RedisSerializer.string())
.targetType(String.class)
.build();
// 根据配置对象创建监听容器对象
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer
.create(factory, streamMessageListenerContainerOptions);
RedisTemplate<String, Object> redisTemplate = redisTemplate(factory);
// 使用监听容器对象开始监听消费(使用的是手动确认方式)
String consumerName = NetUtil.getLocalHostName(); // TODO 需要优化下,晚点参考下 rocketmq consumer 的
for (AbstractStreamMessageListener<?> listener : listeners) {
try {
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
} catch (Exception ignore) {
// ignore.printStackTrace();
}
container.receive(Consumer.from(listener.getGroup(), consumerName),
StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed()), listener);
}
return container;
}
}

View File

@@ -4,6 +4,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* Redis Channel Message 接口
*
* @author 芋道源码
*/
public interface ChannelMessage {
@@ -12,7 +14,7 @@ public interface ChannelMessage {
*
* @return Channel
*/
@JsonIgnore // 必须序列化
@JsonIgnore // 避免序列化
String getChannel();
}

View File

@@ -0,0 +1,83 @@
package cn.iocoder.dashboard.framework.redis.core.stream;
import cn.hutool.core.util.ArrayUtil;
import lombok.Getter;
import lombok.SneakyThrows;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.stream.StreamListener;
import sun.reflect.generics.reflectiveObjects.ParameterizedTypeImpl;
import java.lang.reflect.Type;
/**
* Redis Stream 监听器抽象类,用于实现集群消费
*
* @param <T> 消息类型。一定要填写噢,不然会报错
*
* @author 芋道源码
*/
public abstract class AbstractStreamMessageListener<T extends StreamMessage>
implements StreamListener<String, ObjectRecord<String, String>> {
/**
* 消息类型
*/
private final Class<T> messageType;
/**
* Redis Channel
*/
@Getter
private final String streamKey;
/**
* Redis 消费者分组,默认使用 spring.application.name 名字
*/
@Value("spring.application.name")
@Getter
private String group;
@SneakyThrows
protected AbstractStreamMessageListener() {
this.messageType = getMessageClass();
this.streamKey = messageType.newInstance().getStreamKey();
}
@Override
public void onMessage(ObjectRecord<String, String> message) {
System.out.println(message);
}
/**
* 处理消息
*
* @param message 消息
*/
public abstract void onMessage(T message);
/**
* 通过解析类上的泛型,获得消息类型
*
* @return 消息类型
*/
@SuppressWarnings("unchecked")
// TODO 芋艿:稍后重构
private Class<T> getMessageClass() {
Class<?> targetClass = getClass();
while (targetClass.getSuperclass() != null) {
// 如果不是 AbstractMessageListener 父类,继续向上查找
if (targetClass.getSuperclass() != AbstractStreamMessageListener.class) {
targetClass = targetClass.getSuperclass();
continue;
}
// 如果是 AbstractMessageListener 父类,则解析泛型
Type[] types = ((ParameterizedTypeImpl) targetClass.getGenericSuperclass()).getActualTypeArguments();
if (ArrayUtil.isEmpty(types)) {
throw new IllegalStateException(String.format("类型(%s) 需要设置消息类型", getClass().getName()));
}
return (Class<T>) types[0];
}
throw new IllegalStateException(String.format("类型(%s) 找不到 AbstractStreamMessageListener 父类", getClass().getName()));
}
}

View File

@@ -0,0 +1,20 @@
package cn.iocoder.dashboard.framework.redis.core.stream;
import com.fasterxml.jackson.annotation.JsonIgnore;
/**
* Redis Stream Message 接口
*
* @author 芋道源码
*/
public interface StreamMessage {
/**
* 获得 Redis Stream Key
*
* @return Channel
*/
@JsonIgnore // 避免序列化
String getStreamKey();
}

View File

@@ -1,7 +1,10 @@
package cn.iocoder.dashboard.framework.redis.core.util;
import cn.iocoder.dashboard.framework.redis.core.pubsub.ChannelMessage;
import cn.iocoder.dashboard.framework.redis.core.stream.StreamMessage;
import cn.iocoder.dashboard.util.json.JsonUtils;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.RedisTemplate;
/**
@@ -17,8 +20,21 @@ public class RedisMessageUtils {
* @param redisTemplate Redis 操作模板
* @param message 消息
*/
public static <T extends ChannelMessage> void sendChannelMessage(RedisTemplate<?, ?> redisTemplate, T message) {
public static <T extends ChannelMessage> void sendChannelMessage(RedisTemplate<?, ?> redisTemplate, T message) {
redisTemplate.convertAndSend(message.getChannel(), JsonUtils.toJsonString(message));
}
/**
* 发送 Redis 消息,基于 Redis Stream 实现
*
* @param redisTemplate Redis 操作模板
* @param message 消息
* @return 消息记录的编号对象
*/
public static <T extends StreamMessage> RecordId sendStreamMessage(RedisTemplate<String, String> redisTemplate, T message) {
return redisTemplate.opsForStream().add(StreamRecords.newRecord()
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
.withStreamKey(message.getStreamKey())); // 设置 stream key
}
}

View File

@@ -0,0 +1,15 @@
package cn.iocoder.dashboard.modules.system.mq.consumer.mail;
import cn.iocoder.dashboard.framework.redis.core.stream.AbstractStreamMessageListener;
import cn.iocoder.dashboard.modules.system.mq.message.mail.SysMailSendMessage;
import org.springframework.stereotype.Component;
@Component
public class SysMailSendConsumer extends AbstractStreamMessageListener<SysMailSendMessage> {
@Override
public void onMessage(SysMailSendMessage message) {
}
}

View File

@@ -0,0 +1,15 @@
package cn.iocoder.dashboard.modules.system.mq.consumer.sms;
import cn.iocoder.dashboard.framework.redis.core.stream.AbstractStreamMessageListener;
import cn.iocoder.dashboard.modules.system.mq.message.sms.SysSmsSendMessage;
import org.springframework.stereotype.Component;
@Component
public class SysSmsSendConsumer extends AbstractStreamMessageListener<SysSmsSendMessage> {
@Override
public void onMessage(SysSmsSendMessage message) {
System.out.println(message);
}
}

View File

@@ -0,0 +1,46 @@
package cn.iocoder.dashboard.modules.system.mq.message.mail;
import cn.iocoder.dashboard.framework.redis.core.stream.StreamMessage;
import lombok.Data;
import javax.validation.constraints.NotNull;
import java.util.Map;
/**
* 邮箱发送消息
*
* @author 芋道源码
*/
@Data
public class SysMailSendMessage implements StreamMessage {
/**
* 邮箱地址
*/
@NotNull(message = "邮箱地址不能为空")
private String address;
/**
* 短信模板编号
*/
@NotNull(message = "短信模板编号不能为空")
private String templateCode;
/**
* 短信模板参数
*/
private Map<String, Object> templateParams;
/**
* 用户编号,允许空
*/
private Integer userId;
/**
* 用户类型,允许空
*/
private Integer userType;
@Override
public String getStreamKey() {
return "system.mail.send";
}
}

View File

@@ -0,0 +1,46 @@
package cn.iocoder.dashboard.modules.system.mq.message.sms;
import cn.iocoder.dashboard.framework.redis.core.stream.StreamMessage;
import lombok.Data;
import javax.validation.constraints.NotNull;
import java.util.Map;
/**
* 短信发送消息
*
* @author 芋道源码
*/
@Data
public class SysSmsSendMessage implements StreamMessage {
/**
* 手机号
*/
@NotNull(message = "手机号不能为空")
private String mobile;
/**
* 短信模板编号
*/
@NotNull(message = "短信模板编号不能为空")
private String templateCode;
/**
* 短信模板参数
*/
private Map<String, Object> templateParams;
/**
* 用户编号,允许空
*/
private Integer userId;
/**
* 用户类型,允许空
*/
private Integer userType;
@Override
public String getStreamKey() {
return "system.sms.send";
}
}