一、对强一致要求比较高的,应采用实时同步方案,即查询缓存查询不到再从DB查询,保存到缓存;更新缓存时,先更新数据库,再将缓存的设置过期(建议不要去更新缓存内容,直接设置缓存过期)。

二、对于并发程度较高的,可采用异步队列的方式同步,可采用kafka等消息中间件处理消息生产和消费。

三、使用阿里的同步工具canal,canal实现方式是模拟mysql slave和master的同步机制,监控DB bitlog的日志更新来触发缓存的更新,此种方法可以解放程序员双手,减少工作量,但在使用时有些局限性。

四、采用UDF自定义函数的方式,面对mysql的API进行编程,利用触发器进行缓存同步,但UDF主要是c/c++语言实现,学习成本高。

  1. @Cacheable(key = "caches[0].name + T(String).valueOf(#userId)",unless = "#result eq null")
  1. @CachePut(key = "caches[0].name + T(String).valueOf(#user.userId)")
  1. @CacheEvict(key = "caches[0].name + T(String).valueOf(#userId)" )
  1. @Caching(evict = {@CacheEvict(key = "caches[0].name + T(String).valueOf(#userId)" ),
    @CacheEvict(key = "caches[0].name + #result.name" )})
  1. @Cacheable:查询时使用,注意Long类型需转换为Sting类型,否则会抛异常
  1. @CachePut:更新时使用,使用此注解,一定会从DB上查询数据
  1. @CacheEvict:删除时使用;
  1. @Caching:组合用法 具体注解的使用可参考官网

    注意:注解方式虽然能使我们的代码简洁,但是注解方式有局限性:对key的获取,以及嵌套使用时注解无效,如下所示
复制代码
  1. public class User {
  2. private Long userId;
  3. private String name;
  4. private Integer age;
  5. private String sex;
  6. private String addr;
      //get set .....
  7. }
复制代码

service接口

1
2
3
4
5
6
7
public interface UserService {
    User getUser(Long userId);
    User updateUser(User user);
    User getUserByName(String name);
    int insertUser(User user);
    User  delete (Long userId);
}<br>//实现类<br>//假设有需求是由name查询user的,一般我们是先由name->id,再由id->user,这样会减少redis缓存的冗余信息
  1. @Service(value = "userSerivceImpl")
    @CacheConfig(cacheNames = "user")
    public class UserServiceImpl implements UserService {
    private static Logger log = LoggerFactory.getLogger(UserServiceImpl.class);
    @Autowired
    UserMapper userMapper;

    @Cacheable(key = "caches[0].name + T(String).valueOf(#userId)",unless = "#result eq null")
    public User getUser(Long userId) {
    User user = userMapper.selectByPrimaryKey(userId);
    return user;
    }
    @Cacheable(key = "caches[0].name + #name")
    public String getIdByName(String name){
    Long userId = userMapper.getIdByName(name);
    return String.valueOf(userId);
    }

    //使用getUserByName方式调用getIdByName 和getUser方法来实现查询,但是如果用此方式在controller中直接调用
  1. //getUserByName方法,缓存效果是不起作用的,必须是直接调用getIdByName和getUser方法才能起作用
  1. public User getUserByName(String name) {
    //通过name 查询到主键 再由主键查询实体
    return getUser(Long.valueOf(getIdByName(name)));
    }

1.先定义一个RedisCacheConfig类用于生成RedisTemplate和对CacheManager的管理

复制代码
  1. @Configuration
  2. public class RedisCacheConfig extends CachingConfigurerSupport {
  3.  
  4. /*定义缓存数据 key 生成策略的bean
  5. *包名+类名+方法名+所有参数
  6. */
  7. @Bean
  8. public KeyGenerator keyGenerator() {
  9. return new KeyGenerator() {
  10. @Override
  11. public Object generate(Object target, Method method, Object... params) {
  12. StringBuilder sb = new StringBuilder();
  13. sb.append(target.getClass().getName());
  14. sb.append(method.getName());
  15. for (Object obj : params) {
  16. sb.append(obj.toString());
  17. }
  18. return sb.toString();
  19. }
  20. };
  21. }
  22.  
  23. //@Bean
  24. public CacheManager cacheManager(
  25. @SuppressWarnings("rawtypes") RedisTemplate redisTemplate) {
  26. //RedisCacheManager cacheManager = new RedisCacheManager(redisTemplate);
  27. //cacheManager.setDefaultExpiration(60);//设置缓存保留时间(seconds)
  28. return cacheManager;
  29. }
  30.  
  31. //1.项目启动时此方法先被注册成bean被spring管理
  32. @Bean
  33. public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
  34.  
  35. StringRedisTemplate template = new StringRedisTemplate(factory);
  36. Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
  37. ObjectMapper om = new ObjectMapper();
  38. om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
  39. om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
  40. jackson2JsonRedisSerializer.setObjectMapper(om);
  41. template.setValueSerializer(jackson2JsonRedisSerializer);
  42. template.afterPropertiesSet();
  43. return template;
  44. }
  45.  
  46. @Bean
  47. public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
  48. RedisTemplate<String, Object> template = new RedisTemplate<>();
  49. template.setConnectionFactory(connectionFactory);
  50.  
  51. //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
  52. Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class);
  53.  
  54. System.out.println("==============obj:"+Object.class.getName());
  55. ObjectMapper mapper = new ObjectMapper();
  56. mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
  57. mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
  58. serializer.setObjectMapper(mapper);
  59.  
  60. template.setValueSerializer(serializer);
  61. //使用StringRedisSerializer来序列化和反序列化redis的key值
  62. template.setKeySerializer(new StringRedisSerializer());
  63. template.afterPropertiesSet();
  64. return template;
  65. }
  66. }
复制代码

2.定义一个redisUtil类用于存取缓存值

复制代码
  1. @Component
  2. public class RedisCacheUtil {
  3.  
  4. @Autowired
  5. private StringRedisTemplate stringRedisTemplate;
  6. @Autowired
  7. private RedisTemplate<String, Object> redisTemplate;
  8.  
  9. /**
  10. * 存储字符串
  11. * @param key string类型的key
  12. * @param value String类型的value
  13. */
  14. public void set(String key, String value) {
  15. stringRedisTemplate.opsForValue().set(key, value);
  16. }
  17.  
  18. /**
  19. * 存储对象
  20. * @param key String类型的key
  21. * @param value Object类型的value
  22. */
  23. public void set(String key, Object value) {
  24. redisTemplate.opsForValue().set(key, value);
  25. }
  26.  
  27. /**
  28. * 存储对象
  29. * @param key String类型的key
  30. * @param value Object类型的value
  31. */
  32. public void set(String key, Object value,Long timeOut) {
  33. redisTemplate.opsForValue().set(key, value,timeOut, TimeUnit.SECONDS);
  34. }
  35.  
  36. /**
  37. * 根据key获取字符串数据
  38. * @param key
  39. * @return
  40. */
  41. public String getValue(String key) {
  42. return stringRedisTemplate.opsForValue().get(key);
  43. }
  44.  
  45. // public Object getValue(String key) {
  46. // return redisTemplate.opsForValue().get(key);
  47. // }
  48. /**
  49. * 根据key获取对象
  50. * @param key
  51. * @return
  52. */
  53. public Object getValueOfObject(String key) {
  54. return redisTemplate.opsForValue().get(key);
  55. }
  56. /**
  57. * 根据key删除缓存信息
  58. * @param key
  59. */
  60. public void delete(String key) {
  61. redisTemplate.delete(key);
  62. }
  63. /**
  64. * 查询key是否存在
  65. * @param key
  66. * @return
  67. */
  68. @SuppressWarnings("unchecked")
  69. public boolean exists(String key) {
  70. return redisTemplate.hasKey(key);
  71. }
  72. }
复制代码

3.实现类

复制代码
  1. /**
  2. * Created by yexin on 2017/9/8.
  3. *
  4. * 在Impl基础上+ 防止缓存雪崩和缓存穿透功能
  5. */
  6. @Service(value = "userServiceImpl4")
  7. public class UserServiceImpl4 implements UserService {
  8.  
  9. @Autowired
  10. UserMapper userMapper;
  11.  
  12. @Autowired
  13. RedisCacheUtil redisCacheUtil;
  14.  
  15. @Value("${timeOut}")
  16. private long timeOut;
  17.  
  18. @Override
  19. public User getUser(Long userId) {
  20.  
  21. String key = "user" + userId;
  22. User user = (User) redisCacheUtil.getValueOfObject(key);
  23. String keySign = key + "_sign";
  24. String valueSign = redisCacheUtil.getValue(keySign);
  25. if(user == null){//防止第一次查询时返回时空结果
  26. //防止缓存穿透
  27. if(redisCacheUtil.exists(key)){
  28. return null;
  29. }
  30. user = userMapper.selectByPrimaryKey(userId);
  31.  
  32. redisCacheUtil.set(key,user);
  33. redisCacheUtil.set(keySign,"1",timeOut *(new Random().nextInt(10) + 1));
  34. // redisCacheUtil.set(keySign,"1",0L); //过期时间不能设置为0,必须比0大的数
  35. return user;
  36. }
  37.  
  38. if(valueSign != null){
  39. return user;
  40. }else {
  41. //设置标记的实效时间
  42. Long tt = timeOut * (new Random().nextInt(10) + 1);
  43. System.out.println("tt:"+tt);
  44. redisCacheUtil.set(keySign,"1",tt);
  45. //异步处理缓存更新 应对与高并发的情况,会产生脏读的情况
  46. ThreadPoolUtil.getExecutorService().execute(new Runnable(){
  47. public void run() { //
  48. System.out.println("-----执行异步操作-----");
  49. User user1 = userMapper.selectByPrimaryKey(userId);
  50. redisCacheUtil.set(key,user1);
  51. }
  52. });
  53.  
  54. // new Thread(){
  55. // public void run() { //应对与高并发的情况,会产生脏读的情况
  56. // System.out.println("-----执行异步操作-----");
  57. // User user1 = userMapper.selectByPrimaryKey(userId);
  58. // redisCacheUtil.set(key,user1);
  59. // }
  60. // }.start();
  61. }
  62. return user;
  63. }
  64. }
复制代码

异步实现通过kafka作为消息队列实现,异步只针对更新操作,查询无需异步,实现类如下

1.pom文件需依赖

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>

2.生产着代码

复制代码
  1. @EnableBinding(Source.class)
  2. public class SendService {
  3. @Autowired
  4. private Source source;
  5. public void sendMessage(String msg) {
  6. try{
  7. source.output().send(MessageBuilder.withPayload(msg).build());
  8. } catch (Exception e) {
  9. e.printStackTrace();
  10. }
  11. }
  12. //接受的是一个实体类,具体配置在application.yml
  13. public void sendMessage(TransMsg msg) {
  14. try {
  15. //MessageBuilder.withPayload(msg).setHeader(KafkaHeaders.TOPIC,"111111").build();
  16. source.output().send(MessageBuilder.withPayload(msg).build());
  17. } catch (Exception e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. }
复制代码

3.消费者代码

复制代码
  1. @EnableBinding(Sink.class)
  2. public class MsgSink {
  3. @Resource(name = "userSerivceImpl3")
  4. UserService userService;
  5. @StreamListener(Sink.INPUT)
  6. public void process(TransMsg<?> msg) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ClassNotFoundException {
  7. System.out.println("sink......"+msg);
  8. System.out.println("opt db strat ----");
  9. userService.updateUser((User) msg.getParams());
  10. System.out.println("执行db结束------");
  11. }
  12. }
复制代码

4.application.yml配置

复制代码
  1. spring:
  2. application:
  3. name: demo-provider
  4. redis:
  5. database: 0
  6. host: 192.168.252.128
  7. #host: localhost
  8. port: 6379
  9. password:
  10. pool:
  11. max-active: 50
  12. max-wait: -1
  13. max-idle: 50
  14. timeout: 0
  15. #kafka
  16. cloud:
  17. stream:
  18. kafka:
  19. binder:
  20. brokers: 192.168.252.128:9092
  21. zk-nodes: 192.168.252.128:2181
  22. minPartitionCount: 1
  23. autoCreateTopics: true
  24. autoAddPartitions: true
  25. bindings:
  26. input:
  27. destination: topic-02
  28. # content-type: application/json
  29. content-type: application/x-java-object #此种类型配置在消费端接受到的为一个实体类
  30. group: t1
  31. consumer:
  32. concurrency: 1
  33. partitioned: false
  34. output:
  35. destination: topic-02
  36. content-type: application/x-java-object
  37. producer:
  38. partitionCount: 1
  39. instance-count: 1
  40. instance-index: 0
复制代码

5.实现类

复制代码
  1. @Service(value = "userServiceImpl2")
  2. public class UserServiceImpl2 implements UserService{
  3. @Autowired
  4. UserMapper userMapper;
  5. @Autowired
  6. RedisCacheUtil redisCacheUtil;
  7. private static Logger log = LoggerFactory.getLogger(UserServiceImpl.class);
  8. @Autowired
  9. SendService sendService;
  10. public User updateUser(User user) {
  11. System.out.println(" impl2 active ");
  12. String key = "user"+ user.getUserId();
  13. System.out.println("key:"+key);
  14. //是否存在key
  15. if(!redisCacheUtil.exists(key)){
  16. return userMapper.updateByPrimaryKeySelective(user) == 1 ? user : null;
  17. }
  18. /* 更新key对应的value
  19. 更新队列
  20. */
  21. User user1 = (User)redisCacheUtil.getValueOfObject(key);
  22. try {
  23. redisCacheUtil.set(key,user);
  24. TransMsg<User> msg = new TransMsg<User>(key,user,this.getClass().getName(),"updateUser",user);
  25. sendService.sendMessage(msg);
  26.  
  27. }catch (Exception e){
  28. redisCacheUtil.set(key,user1);
  29. }
  30. return user;
  31. }
  32. }
复制代码

注意:kafka与zookeeper的配置在此不介绍

先要安装canal,配置canal的example文件等,配置暂不介绍

复制代码
  1. package org.example.canal;
  2.  
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.alibaba.otter.canal.client.CanalConnector;
  5. import com.alibaba.otter.canal.client.CanalConnectors;
  6. import com.alibaba.otter.canal.common.utils.AddressUtils;
  7. import com.alibaba.otter.canal.protocol.Message;
  8. import com.alibaba.otter.canal.protocol.CanalEntry.Column;
  9. import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
  10. import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
  11. import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
  12. import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
  13. import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
  14. import org.example.canal.util.RedisUtil;
  15.  
  16. import java.net.InetSocketAddress;
  17. import java.util.List;
  18.  
  19. public class CanalClient {
  20.  
  21. public static void main(String[] args) {
  22. // 创建链接
  23. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
  24. 11111), "example", "", "");
  25. int batchSize = 1000;
  26.  
  27. try {
  28. connector.connect();
  29. connector.subscribe(".*\\..*");
  30. connector.rollback();
  31. while (true) {
  32. Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
  33. long batchId = message.getId();
  34. int size = message.getEntries().size();
  35. if (batchId == -1 || size == 0) {
  36. try {
  37. Thread.sleep(1000);
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. } else {
  42. printEntry(message.getEntries());
  43. }
  44. connector.ack(batchId); // 提交确认
  45. // connector.rollback(batchId); // 处理失败, 回滚数据
  46. }
  47. } finally {
  48. connector.disconnect();
  49. }
  50. }
  51.  
  52. private static void printEntry( List<Entry> entrys) {
  53. for (Entry entry : entrys) {
  54. if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
  55. continue;
  56. }
  57. RowChange rowChage = null;
  58. try {
  59. System.out.println("tablename:"+entry.getHeaderOrBuilder().getTableName());
  60. rowChage = RowChange.parseFrom(entry.getStoreValue());
  61. } catch (Exception e) {
  62. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
  63. e);
  64. }
  65. EventType eventType = rowChage.getEventType();
  66. System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
  67. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  68. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
  69. eventType));
  70.  
  71. for (RowData rowData : rowChage.getRowDatasList()) {
  72. if (eventType == EventType.DELETE) {
  73. redisDelete(rowData.getBeforeColumnsList());
  74. } else if (eventType == EventType.INSERT) {
  75. redisInsert(rowData.getAfterColumnsList());
  76. } else {
  77. System.out.println("-------> before");
  78. printColumn(rowData.getBeforeColumnsList());
  79. System.out.println("-------> after");
  80. redisUpdate(rowData.getAfterColumnsList());
  81. }
  82. }
  83. }
  84. }
  85.  
  86. private static void printColumn( List<Column> columns) {
  87. for (Column column : columns) {
  88. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
  89. }
  90. }
  91.  
  92. private static void redisInsert( List<Column> columns){
  93. JSONObject json=new JSONObject();
  94. for (Column column : columns) {
  95. json.put(column.getName(), column.getValue());
  96. }
  97. if(columns.size()>0){
  98. RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
  99. }
  100. }
  101.  
  102. private static void redisUpdate( List<Column> columns){
  103. JSONObject json=new JSONObject();
  104. for (Column column : columns) {
  105. json.put(column.getName(), column.getValue());
  106. }
  107. if(columns.size()>0){
  108. RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
  109. }
  110. }
  111.  
  112. private static void redisDelete( List<Column> columns){
  113. JSONObject json=new JSONObject();
  114. for (Column column : columns) {
  115. json.put(column.getName(), column.getValue());
  116. }
  117. if(columns.size()>0){
  118. RedisUtil.delKey("user:"+ columns.get(0).getValue());
  119. }
  120. }
  121.  
  122. }
复制代码
复制代码
  1. package org.example.canal.util;
  2. import redis.clients.jedis.Jedis;
  3. import redis.clients.jedis.JedisPool;
  4. import redis.clients.jedis.JedisPoolConfig;
  5. public class RedisUtil {
  6.  
  7. // Redis服务器IP
  8. private static String ADDR = "192.168.252.128";
  9. // Redis的端口号
  10. private static int PORT = 6379;
  11. // 访问密码
  12. //private static String AUTH = "admin";
  13. // 可用连接实例的最大数目,默认值为8;
  14. // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
  15. private static int MAX_ACTIVE = 1024;
  16. // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
  17. private static int MAX_IDLE = 200;
  18. // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
  19. private static int MAX_WAIT = 10000;
  20. // 过期时间
  21. protected static int expireTime = 60 * 60 *24;
  22. // 连接池
  23. protected static JedisPool pool;
  24.  
  25. static {
  26. JedisPoolConfig config = new JedisPoolConfig();
  27. //最大连接数
  28. config.setMaxTotal(MAX_ACTIVE);
  29. //最多空闲实例
  30. config.setMaxIdle(MAX_IDLE);
  31. //超时时间
  32. config.setMaxWaitMillis(MAX_WAIT);
  33. //
  34. config.setTestOnBorrow(false);
  35. pool = new JedisPool(config, ADDR, PORT, 1000);
  36. }
  37. /**
  38. * 获取jedis实例
  39. */
  40. protected static synchronized Jedis getJedis() {
  41. Jedis jedis = null;
  42. try {
  43. jedis = pool.getResource();
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. if (jedis != null) {
  47. pool.returnBrokenResource(jedis);
  48. }
  49. }
  50. return jedis;
  51. }
  52.  
  53. /**
  54. * 释放jedis资源
  55. * @param jedis
  56. * @param isBroken
  57. */
  58. protected static void closeResource(Jedis jedis, boolean isBroken) {
  59. try {
  60. if (isBroken) {
  61. pool.returnBrokenResource(jedis);
  62. } else {
  63. pool.returnResource(jedis);
  64. }
  65. } catch (Exception e) {
  66.  
  67. }
  68. }
  69.  
  70. /**
  71. * 是否存在key
  72. * @param key
  73. */
  74. public static boolean existKey(String key) {
  75. Jedis jedis = null;
  76. boolean isBroken = false;
  77. try {
  78. jedis = getJedis();
  79. jedis.select(0);
  80. return jedis.exists(key);
  81. } catch (Exception e) {
  82. isBroken = true;
  83. } finally {
  84. closeResource(jedis, isBroken);
  85. }
  86. return false;
  87. }
  88.  
  89. /**
  90. * 删除key
  91. * @param key
  92. */
  93. public static void delKey(String key) {
  94. Jedis jedis = null;
  95. boolean isBroken = false;
  96. try {
  97. jedis = getJedis();
  98. jedis.select(0);
  99. jedis.del(key);
  100. } catch (Exception e) {
  101. isBroken = true;
  102. } finally {
  103. closeResource(jedis, isBroken);
  104. }
  105. }
  106.  
  107. /**
  108. * 取得key的值
  109. * @param key
  110. */
  111. public static String stringGet(String key) {
  112. Jedis jedis = null;
  113. boolean isBroken = false;
  114. String lastVal = null;
  115. try {
  116. jedis = getJedis();
  117. jedis.select(0);
  118. lastVal = jedis.get(key);
  119. jedis.expire(key, expireTime);
  120. } catch (Exception e) {
  121. isBroken = true;
  122. } finally {
  123. closeResource(jedis, isBroken);
  124. }
  125. return lastVal;
  126. }
  127.  
  128. /**
  129. * 添加string数据
  130. * @param key
  131. * @param value
  132. */
  133. public static String stringSet(String key, String value) {
  134. Jedis jedis = null;
  135. boolean isBroken = false;
  136. String lastVal = null;
  137. try {
  138. jedis = getJedis();
  139. jedis.select(0);
  140. lastVal = jedis.set(key, value);
  141. jedis.expire(key, expireTime);
  142. } catch (Exception e) {
  143. e.printStackTrace();
  144. isBroken = true;
  145. } finally {
  146. closeResource(jedis, isBroken);
  147. }
  148. return lastVal;
  149. }
  150.  
  151. /**
  152. * 添加hash数据
  153. * @param key
  154. * @param field
  155. * @param value
  156. */
  157. public static void hashSet(String key, String field, String value) {
  158. boolean isBroken = false;
  159. Jedis jedis = null;
  160. try {
  161. jedis = getJedis();
  162. if (jedis != null) {
  163. jedis.select(0);
  164. jedis.hset(key, field, value);
  165. jedis.expire(key, expireTime);
  166. }
  167. } catch (Exception e) {
  168. isBroken = true;
  169. } finally {
  170. closeResource(jedis, isBroken);
  171. }
  172. }
  173.  
  174. }
复制代码

穿透:频繁查询一个不存在的数据,由于缓存不命中,每次都要查询持久层。从而失去缓存的意义。

解决办法: 持久层查询不到就缓存空结果,查询时先判断缓存中是否exists(key) ,如果有直接返回空,没有则查询后返回,

                  注意insert时需清除查询的key,否则即便DB中有值也查询不到(当然也可以设置空缓存的过期时间)

雪崩:缓存大量失效的时候,引发大量查询数据库。
解决办法:①用锁/分布式锁或者队列串行访问

                  ②缓存失效时间均匀分布

热点key:某个key访问非常频繁,当key失效的时候有打量线程来构建缓存,导致负载增加,系统崩溃。

解决办法:

①使用锁,单机用synchronized,lock等,分布式用分布式锁。

②缓存过期时间不设置,而是设置在key对应的value里。如果检测到存的时间超过过期时间则异步更新缓存。

③在value设置一个比过期时间t0小的过期时间值t1,当t1过期的时候,延长t1并做更新缓存操作。

4设置标签缓存,标签缓存设置过期时间,标签缓存过期后,需异步地更新实际缓存  具体参照userServiceImpl4的处理方式

 

一、查询redis缓存时,一般查询如果以非id方式查询,建议先由条件查询到id,再由id查询pojo

二、异步kafka在消费端接受信息后,该怎么识别处理那张表,调用哪个方法,此问题暂时还没解决

三、比较简单的redis缓存,推荐使用canal

参考文档

http://blog.csdn.net/fly_time2012/article/details/50751316

http://blog.csdn.net/kkgbn/article/details/60576477

http://www.cnblogs.com/fidelQuan/p/4543387.html

版权声明:本文为ruiati原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/ruiati/p/8820687.html