当多线程执行某一业务时(特别是对数据的更新、新增)等操作,可能就会出现多个线程对同一条数据进行修改。其最终的结果一
定与你期望的结果“不太一样”,这就与需要一把锁来控制线程排排队了 – java内部为我们提供了解决方案,可以使用synchronized
或Lock等方式来实现。
  但是在生产过程中,因为性能的关系,多数公司都会采用多台服务器来搭建”分布式”。一条请求过来之后,不一定会打到哪台服务
器上,这就保证不了多台服务器的某一”关键业务”同一时间只会有一条线程进行执行。这时就需要一个“媒介”来充当”锁”这个角色,这
个“媒介”需要满足一些特性才能胜任这个工作:
(1)各个服务器都可以对其进行获取和操作(基于数据库的实现方式);
(2)高性能的获取和释放锁;
(3)具备非阻塞锁特性,如果获取不到锁则立即返回失败;
(4)具备锁等待的特性,即没有获取到锁将继续等待获取锁;
(5)具有锁失效机制,防止死锁;
(6)具备可重入特性;

  高性能、失效时间、可重入、非阻塞等特性,Redis都能满足,所以基于Redis实现是解决分布式锁的一种方式。基于Redis的分布式
锁的实现思想:
  A、加锁
    1、计算开始时间,防止一直获取不到锁而死循环;
    2、根据系统名称:业务名称:主键ID来创建一个分布式锁的key;
    3、循环获取分布式锁(尝试时间=当前时间-开始时间<5000毫秒);
    4、使用setnx方法获取分布式锁(key,value – 当前系统时间+”$TRUE”,失效时间 – 10毫秒);
    5、如果设置成功则获取分布式锁成功,返回true;
    6、如果获取失败则睡5毫秒,继续尝试,直到过了时间;
    7、如果时间之内没有获取到分布式所,则返回失败;
  B、解锁
    1、根据系统号,业务名称,主键拼接分布式锁的key;
    2、根据key获取到真实的key(使用keys方法);
    3、判断真实的key是否为存在,是否唯一,如果否则为空;
    4、如果key唯一则进行删除;
  栗子:

  1. /**
  2. * Jedis的工具类.
  3. */
  4. public class JedisUtil {
  5. private static final Logger LOG = LoggerFactory.getLogger(JedisUtil.class);
  6. private static Map<String, JedisUtil> uniqueInstance = new HashMap();
  7. private JedisPool jedisPool;
  8. private static JedisUtil jedisUtil = null;
  9. public JedisUtil() {
  10. super();
  11. }
  12. /**
  13. * 选择需要链接哪个库
  14. * @param serverName
  15. */
  16. public JedisUtil(String serverName) {
  17. if (StringUtils.isBlank(serverName)) {
  18. throw new RuntimeException("请指定Redis主或从库!");
  19. } else {
  20. //实际使用需从配置文件中获取
  21. String ip = "127.0.0.1";
  22. Integer port = 6379;
  23. String pwd = "test";
  24. //仅仅是测试用
  25. String redisPoolMaxActive = "30";
  26. if ("master".equals(serverName)) {
  27. //链接到主库
  28. this.initialPool(ip, port, pwd, redisPoolMaxActive);
  29. }else{
  30. //可以链接为别的Redis库
  31. }
  32. }
  33. }
  34. /**
  35. * 初始化jedisPool
  36. * @param ip
  37. * @param port
  38. * @param pwd
  39. * @param redisPoolMaxActive
  40. */
  41. private void initialPool(String ip, int port, String pwd, String redisPoolMaxActive) {
  42. JedisPoolConfig config = new JedisPoolConfig();
  43. //最大连接数,默认8个
  44. config.setMaxTotal(256);
  45. //最大空闲连接数,默认8个
  46. config.setMaxIdle(256);
  47. if (StringUtils.isNotBlank(redisPoolMaxActive)) {
  48. Integer redisPoolMaxActiveInt = Integer.valueOf(Integer.parseInt(redisPoolMaxActive));
  49. if (redisPoolMaxActiveInt.intValue() > 512) {
  50. redisPoolMaxActiveInt = Integer.valueOf(512);
  51. }
  52. config.setMaxTotal(redisPoolMaxActiveInt.intValue());
  53. config.setMaxIdle(redisPoolMaxActiveInt.intValue());
  54. }
  55. //获取连接时的最大等待毫秒数
  56. config.setMaxWaitMillis(1000L);
  57. //在获取连接的时候检查有效性,默认false
  58. config.setTestOnBorrow(true);
  59. config.setTestOnReturn(true);
  60. //在空闲时检查有效性,默认false
  61. config.setTestWhileIdle(true);
  62. //每次逐出检查时 逐出的最大数目。如果为负数就是: 1/abs(n), 默认3
  63. config.setNumTestsPerEvictionRun(-1);
  64. //逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1
  65. config.setTimeBetweenEvictionRunsMillis(30000L);
  66. //逐出连接的最小空闲时间,默认1800000毫秒(30分钟)
  67. config.setMinEvictableIdleTimeMillis(720000L);
  68. //对象空闲多久后逐出,当空闲时间>该值且空闲连接 > 最大空闲数时直接逐出,不再根据MinEvictableIdleTimeMillis判断 (默认逐出策略)
  69. config.setSoftMinEvictableIdleTimeMillis(360000L);
  70. this.jedisPool = new JedisPool(config, ip, port, 2000, pwd);
  71. LogUtils.info(LOG, "jedisPool init finish", new Object[]{"jedisPool", this.jedisPool});
  72. }
  73. /**
  74. * 单例获取jedisPool
  75. * @param serverName Redis库的名称
  76. * @return
  77. */
  78. public static JedisUtil getInstance(String serverName) {
  79. jedisUtil = (JedisUtil)uniqueInstance.get(serverName);
  80. if(jedisUtil == null) {
  81. Class var1 = JedisUtil.class;
  82. synchronized(JedisUtil.class) {
  83. jedisUtil = (JedisUtil)uniqueInstance.get(serverName);
  84. if(jedisUtil == null) {
  85. jedisUtil = new JedisUtil(serverName);
  86. uniqueInstance.put(serverName, jedisUtil);
  87. }
  88. }
  89. }
  90. return jedisUtil;
  91. }
  92. public Long setnx(String key, String value, int expireTime, int dbIndex) {
  93. Jedis jedis = null;
  94. Long var1;
  95. try {
  96. jedis = this.jedisPool.getResource();
  97. jedis.select(dbIndex);
  98. Long ret = jedis.setnx(key, value);
  99. if(ret.longValue() == 1L && expireTime > 0) {
  100. jedis.expire(key, expireTime);
  101. }
  102. var1 = ret;
  103. } catch (Exception var11) {
  104. LogUtils.error(LOG, "redis setnx error. ", var11, new Object[0]);
  105. throw var11;
  106. } finally {
  107. if(jedis != null) {
  108. jedis.close();
  109. }
  110. }
  111. return var1;
  112. }
  113. /**
  114. * 查找key
  115. * @param sKey
  116. * @param index
  117. * @return
  118. */
  119. public Set<String> key(String sKey, int index) {
  120. Jedis jedis = null;
  121. Set var2;
  122. try {
  123. jedis = this.jedisPool.getResource();
  124. jedis.select(index);
  125. var2 = jedis.keys(sKey);
  126. } catch (Exception var8) {
  127. LogUtils.error(LOG, "redis key error. ", var8, new Object[0]);
  128. throw var8;
  129. } finally {
  130. if(jedis != null) {
  131. jedis.close();
  132. }
  133. }
  134. return var2;
  135. }
  136. /**
  137. * 删除key
  138. * @param sKey
  139. * @param index
  140. * @return
  141. */
  142. public Long del(String sKey, int index) {
  143. Jedis jedis = null;
  144. Long var3;
  145. try {
  146. jedis = this.jedisPool.getResource();
  147. jedis.select(index);
  148. var3 = jedis.del(sKey);
  149. } catch (Exception var8) {
  150. LogUtils.error(LOG, "redis del error. ", var8, new Object[0]);
  151. throw var8;
  152. } finally {
  153. if(jedis != null) {
  154. jedis.close();
  155. }
  156. }
  157. return var3;
  158. }
  159. }

  分布式锁的工具类:

  1. public class LockRedisService {
  2. private static final Logger LOG = LoggerFactory.getLogger(LockRedisService.class);private static final Integer EXPIRE_SECOND = Integer.valueOf(10);public LockRedisService() {
  3. }
  4. /**
  5. * 尝试获取分布式锁
  6. * @param systemName 系统名称
  7. * @param business 业务名称
  8. * @param keyfix 主键
  9. * @return
  10. */
  11. public static Boolean tryLock(String systemName, String business, String keyfix) {
  12. long start = System.currentTimeMillis();
  13. String key = String.format("lock:%s:%s:%s", new Object[]{systemName, business, keyfix});
  14. LogUtils.info(LOG, "尝试获取分布式锁", new Object[]{"key", key});
  15. do {
  16. try {
  17. long ret = JedisUtil.getInstance("master").setnx(key, System.currentTimeMillis() + "$TRUE", EXPIRE_SECOND, 1).longValue();
  18. if(ret == 1L) {
  19. LogUtils.info(LOG, "成功获得分布式锁", new Object[]{"key", key});
  20. return Boolean.TRUE;
  21. }
  22. Thread.sleep(5L);
  23. } catch (Exception var8) {
  24. LogUtils.error(LOG, "获取锁失败", var8, new Object[0]);
  25. }
  26. } while(System.currentTimeMillis() - start < 5000L);
  27. LogUtils.warn(LOG, "获取分布式锁失败", new Object[]{"key", key});
  28. return Boolean.FALSE;
  29. }
  30. /**
  31. * 解锁
  32. * @param systemName 系统名称
  33. * @param business 业务名称
  34. * @param keyfix 主键
  35. * @return
  36. */
  37. public static Boolean unlock(String systemName, String business, String keyfix) {
  38. String key = String.format("lock:%s:%s:%s", new Object[]{systemName, business, keyfix});
  39. try {
  40. //1为使用哪个Redis库,正常应该写为枚举类
  41. Set<String> realKeySet = JedisUtil.getInstance("master").key(key,1);
  42. if(CollectionUtils.isEmpty(realKeySet)) {
  43. return null;
  44. } else if(realKeySet.size() > 1) {
  45. LogUtils.warn(LOG, "lock key 通配符存在多个", new Object[]{"key", key});
  46. return null;
  47. } else {
  48. String realKey = (String)realKeySet.iterator().next();
  49. JedisUtil.getInstance("master").del(realKey, 1);
  50. }
  51. } catch (Exception var5) {
  52. LogUtils.error(LOG, "解锁失败", var5, new Object[0]);
  53. }
  54. LogUtils.info(LOG, "解锁成功", new Object[]{"key", key});
  55. return Boolean.valueOf(true);
  56. }
  57. }

  测试类:

  1. public class Test {
  2. public static void main(String[] args) {
  3. //获取cpu的核心数
  4. int count = Runtime.getRuntime().availableProcessors();
  5. //创建线程池
  6. ThreadPoolExecutor threadPool = new ThreadPoolExecutor(count, count * 10,
  7. 60L, TimeUnit.MILLISECONDS,
  8. new LinkedBlockingQueue<>(40960), new ThreadPoolExecutor.AbortPolicy());
  9. //多线程执行业务
  10. threadPool.execute(()->{
  11. String bussiness = "测试分布式锁";
  12. String key = "TestLock:" + 123;
  13. //尝试获取分布式锁
  14. if (LockRedisService.tryLock("测试系统", bussiness, key)) {
  15. System.out.println("---获取分布式锁成功---");
  16. try {
  17. //可能会抛出异常
  18. System.out.println("执行逻辑");
  19. }catch (Exception e){
  20. System.out.println("执行逻辑错误: "+e);
  21. }finally {
  22. //执行完毕后解锁
  23. LockRedisService.unlock("测试系统", bussiness, key);
  24. }
  25. }else {
  26. System.out.println("获取分布式锁失败");
  27. }
  28. });
  29. }
  30. }

 

版权声明:本文为0813lichenyu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/0813lichenyu/p/10376631.html