Redis监听过期的key实现流程是什么

作者:有用网 阅读量:179 发布时间:2024-01-12
关键字 redis

本篇内容介绍了“Redis监听过期的key实现流程是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

    一、简介

    我们来个最简单的集群架构,如下图:

    Redis监听过期的key实现流程是什么

    我们上面图中看到是服务A和服务B就是同一个服务的不同实例。

    二、maven依赖

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.6.0</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.alian</groupId>
        <artifactId>expiration</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>expiration</name>
        <description>redis-key-expiration-listener</description>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <project.package.directory>target</project.package.directory>
            <java.version>1.8</java.version>
            <!--com.fasterxml.jackson 版本-->
            <jackson.version>2.9.10</jackson.version>
            <!--阿里巴巴fastjson 版本-->
            <fastjson.version>1.2.68</fastjson.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
            </dependency>
            <!--redis依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
                <version>${parent.version}</version>
            </dependency>
            <!--用于序列化-->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>${jackson.version}</version>
            </dependency>
            <!--java 8时间序列化-->
            <dependency>
                <groupId>com.fasterxml.jackson.datatype</groupId>
                <artifactId>jackson-datatype-jsr310</artifactId>
                <version>${jackson.version}</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.68</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.16.14</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.13.2</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    </project>

    三、编码实现

    3.1、application.properties

    # 端口
    server.port=8090
    # 上下文路径
    server.servlet.context-path=/expiration

    # Redis数据库索引(默认为0)
    spring.redis.database=0
    # Redis服务器地址
    spring.redis.host=192.168.0.193
    #spring.redis.host=127.0.0.1
    # Redis服务器连接端口
    spring.redis.port=6379
    # Redis服务器连接密码(默认为空)
    spring.redis.password=
    # 连接池最大连接数(使用负值表示没有限制)
    spring.redis.jedis.pool.max-active=20
    # 连接池中的最小空闲连接
    spring.redis.jedis.pool.min-idle=10
    # 连接池中的最大空闲连接
    spring.redis.jedis.pool.max-idle=10
    # 连接池最大阻塞等待时间(使用负值表示没有限制)
    spring.redis.jedis.pool.max-wait=20000
    # 读时间(毫秒)
    spring.redis.timeout=10000
    # 连接超时时间(毫秒)
    spring.redis.connect-timeout=10000

    3.2、Redis配置类

    RedisConfig

    package com.alian.expiration.config;
    import com.fasterxml.jackson.annotation.JsonAutoDetect;
    import com.fasterxml.jackson.annotation.PropertyAccessor;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.fasterxml.jackson.databind.SerializationFeature;
    import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
    import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
    import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
    import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;
    import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
    import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
    import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
    import org.springframework.data.redis.serializer.RedisSerializer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    import java.time.LocalDate;
    import java.time.LocalDateTime;
    import java.time.LocalTime;
    import java.time.format.DateTimeFormatter;
    @Configuration
    public class RedisConfig {
        /**
         * redis配置
         *
         * @param redisConnectionFactory
         * @return
         */
        @Bean
        public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
            // 实例化redisTemplate
            RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
            //设置连接工厂
            redisTemplate.setConnectionFactory(redisConnectionFactory);
            // key采用String的序列化
            redisTemplate.setKeySerializer(keySerializer());
            // value采用jackson序列化
            redisTemplate.setValueSerializer(valueSerializer());
            // Hash key采用String的序列化
            redisTemplate.setHashKeySerializer(keySerializer());
            // Hash value采用jackson序列化
            redisTemplate.setHashValueSerializer(valueSerializer());
            // 支持事务
            // redisTemplate.setEnableTransactionSupport(true);
            //执行函数,初始化RedisTemplate
            redisTemplate.afterPropertiesSet();
            return redisTemplate;
        }
        /**
         * key类型采用String序列化
         *
         * @return
         */
        private RedisSerializer<String> keySerializer() {
            return new StringRedisSerializer();
        }
        /**
         * value采用JSON序列化
         *
         * @return
         */
        private RedisSerializer<Object> valueSerializer() {
            //设置jackson序列化
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
            //设置序列化对象
            jackson2JsonRedisSerializer.setObjectMapper(getMapper());
            return jackson2JsonRedisSerializer;
        }
        /**
         * 使用com.fasterxml.jackson.databind.ObjectMapper
         * 对数据进行处理包括java8里的时间
         *
         * @return
         */
        private ObjectMapper getMapper() {
            ObjectMapper mapper = new ObjectMapper();
            //设置可见性
            mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            //默认键入对象
            mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            //设置Java 8 时间序列化
            JavaTimeModule timeModule = new JavaTimeModule();
            timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            timeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
            timeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
            timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            timeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));
            timeModule.addDeserializer(LocalTime.class, new LocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss")));
            //禁用把时间转为时间戳
            mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
            mapper.registerModule(timeModule);
            return mapper;
        }
        @Bean
        RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            return container;
        }
    }

    和我们之前整合redis差不多,只不过在最后增加了一个redis消息监听监听容器RedisMessageListenerContainer

    3.3、监听器

    RedisKeyExpirationListener

    package com.alian.expiration.listener;
    import com.alian.expiration.service.RedisExpirationService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.stereotype.Component;
    @Slf4j
    @Component
    public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
        @Autowired
        private RedisExpirationService redisExpirationService;
    	// 把我们上面一步配置的bean注入进去
        public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
            super(listenerContainer);
        }
        /**
         * 针对redis数据失效事件,进行数据处理
         *
         * @param message
         * @param pattern
         */
        @Override
        public void onMessage(Message message, byte[] pattern) {
            // 用户做自己的业务处理即可,注意message.toString()可以获取失效的key
            String expiredKey = message.toString();
            log.info("onMessage --> redis 过期的key是:{}", expiredKey);
            try {
                // 对过期key进行处理
                redisExpirationService.processingExpiredKey(expiredKey);
                log.info("过期key处理完成:{}", expiredKey);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("处理redis 过期的key异常:{}", expiredKey, e);
            }
        }
    }

    实现的步骤如下:

    • 继承KeyExpirationEventMessageListener

    • 把redis消息监听监听容器RedisMessageListenerContainer 注入到密钥空间事件消息侦 听器中

    • 重写onMessage方法

    • 通过Message 的 toString() 方法就可以获取到过期的key

    • 对key中关键信息进行业务处理,比如 id

    3.4、服务类

    RedisExpirationService

    package com.alian.expiration.service;
    import com.alian.expiration.util.SignUtils;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.stereotype.Service;
    import java.util.concurrent.TimeUnit;
    @Slf4j
    @Service
    public class RedisExpirationService {
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
        public void processingExpiredKey(String expiredKey) {
            // 如果是优惠券的key(一定要规范命名)
            if (expiredKey.startsWith("com.mall.coupon.id")) {
                // 临时key,此key可以在业务处理完,然后延迟一定时间删除,或者不处理
                String tempKey = SignUtils.md5(expiredKey, "UTF-8");
                // 临时key不存在才设置值,key超时时间为10秒(此处相当于分布式锁的应用)
                Boolean exist = redisTemplate.opsForValue().setIfAbsent(tempKey, "1", 10, TimeUnit.SECONDS);
                if (Boolean.TRUE.equals(exist)) {
                    log.info("Business Handing...");
                    // 比如截取里面的id,然后关联数据库进行处理
                } else {
                    log.info("Other service is handing...");
                }
            } else {
                log.info("Expired keys without processing");
            }
        }
    }

    基本流程如下:

    • 判断是否是需要处理的key,一般这种key通过命名规范加以处理

    • 以当前key生成一个新的key作为分布式key

    • 如果redis中不存在这个新的key,则为新的key设置一个值,达到分布式服务处理(核心)

    • 设置成功的,进行业务处理;设置失败了,说明其他服务正在处理这个key

    • 根据 key 的关键信息(比如截取id),进行业务处理

    3.5、工具类

    SignUtils

    package com.alian.expiration.util;
    import java.security.MessageDigest;
    public class SignUtils {
        public static final String md5(String s, String charset) {
            char[] hexDigits = new char[]{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};
            try {
                byte[] btInput = s.getBytes(charset);
                MessageDigest mdInst = MessageDigest.getInstance("MD5");
                mdInst.update(btInput);
                byte[] md = mdInst.digest();
                int j = md.length;
                char[] str = new char[j * 2];
                int k = 0;
                for (byte byte0 : md) {
                    str[k++] = hexDigits[byte0 >>> 4 & 15];
                    str[k++] = hexDigits[byte0 & 15];
                }
                return new String(str);
            } catch (Exception var11) {
                return "";
            }
        }
    }

    四、测试

    4.1、测试类

    简单模拟下发送一个优惠券数据到redis,然后设置超时时间

    package com.alian.expiration;
    import lombok.extern.slf4j.Slf4j;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeUnit;
    @Slf4j
    @RunWith(SpringJUnit4ClassRunner.class)
    @SpringBootTest
    public class RedisKeyExpirationTest {
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
        @Test
        public void keyExpiration() {
            // 优惠券信息
            String id = "2023021685264735";
            Map<String, String> map = new HashMap<>();
            map.put("id", id);
            map.put("amount", "1000");
            map.put("type", "1001");
            map.put("describe", "满减红包");
            // 缓存到redis
            redisTemplate.opsForHash().putAll("com.mall.coupon.id." + id, map);
            // 设置过期时间
            redisTemplate.expire("com.mall.coupon.id." + id, 10, TimeUnit.SECONDS);
        }
    }

    4.2、单实例

    单实例就是服务只部署了一份,我们启动一份,端口是8090,然后通过上面的测试类,发送一个消息,结果如下:

    10:23:39 701 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
    10:23:39 988 INFO [container-2]:Business Handing...
    10:23:39 989 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
    10:23:50 005 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
    10:23:50 005 INFO [container-3]:Expired keys without processing
    10:23:50 005 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12

    4.3、多实例

    多实例就是服务部署了多份,比如我们启动两份,端口分别为8090和8091,然后通过上面的测试类,发送一个消息,8090端口的服务结果如下(Business Handing&hellip;):

    11:39:06 691 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
    11:39:06 707 INFO [container-2]:Business Handing...
    11:39:06 707 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
    11:39:16 796 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
    11:39:16 796 INFO [container-3]:Expired keys without processing
    11:39:16 796 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12

    8091端口的服务结果如下(Other service is handing&hellip;):

    11:39:06 691 INFO [container-2]:onMessage --> redis 过期的key是:com.mall.coupon.id.2023021685264735
    11:39:06 707 INFO [container-2]:Other service is handing...
    11:39:06 707 INFO [container-2]:过期key处理完成:com.mall.coupon.id.2023021685264735
    11:39:16 796 INFO [container-3]:onMessage --> redis 过期的key是:450FCC35415BADC16805962CA5BC7E12
    11:39:16 796 INFO [container-3]:Expired keys without processing
    11:39:16 796 INFO [container-3]:过期key处理完成:450FCC35415BADC16805962CA5BC7E12

    结果分析:

    • 多实例的情况下,每个实例都会收到过期key通知

    • 通过redis分布式锁,实现只有一个实例会进行业务处理,防止重复

    • 使用分布式锁会有一个新的key过期,并且收到该key的通知,你可以业务执行完延迟一定时间(避免重复执行),再删除,也可以不处理(因为本就不是要处理业务的key)


    #发表评论
    提交评论