就这样,使用guava快速创建两级缓存能力
首先,咱们都有一共识,即可以使用缓存来提升系统的访问速度!
现如今,分布式缓存这么强大,所以,大部分时候,我们可能都不会去关注本地缓存了!
而在一起高并发的场景,如果我们一味使用nosql式的缓存,如 redis, 那么也是好的吧!
但是有个问题我们得考虑下: redis 这样的缓存是快,但是它总有自己的瓶颈吧,如果什么东西我们都往里面存储,则在高并发场景下,应用瓶颈将受限于其缓存瓶颈吧!
所以,针对这种问题,在一些场景下,咱们可以使用本地缓存来存储一些数据,从而避免每次都将请求击穿到 redis 层面!
本文考虑的是使用 本地缓存 作为二级缓存存在,而非直接的充当缓存工具!
而使用本地缓存,则有几个讲究:
1. 缓存一致性问题;
2. 并发安全问题;
所谓缓存一致性问题,就是本地缓存,是否和redis等缓存中间件的数据保持一致,如果不一致的表现超过了可接受的程度,则要这访问速度也就没啥意义了!
所谓并发安全问题,即是,当使用本地缓存时,本地的缓存访问线程安全性问题,如果出现错乱情况,则严重了!
使用本地缓存,有什么好处?
1. 减少访问远程缓存的网络io,速度自然是要提升的;
2. 减少远程缓存的并发请求,从而表现出更大的并发处理能力;
本地缓存,都有什么应用场景?
1. 单机部署的应用咱们就不说了;
2. 读多写少的场景;(这也缓存的应用场景)
3. 可以容忍一定时间内的缓存不一致; (因涉及的本地缓存,分布式机器结果必可能不一致)
4. 应用对缓存的请求量非常大的场景; (如果直接打到redis缓存, 则redis压力巨大, 且应用响应速度将变慢)
所以,如果自己存在这样的使用场景,不防也考虑下,如何使用这本地缓存,来提升响应速度吧!
如果要求自己来实现这两级缓存功能,我想应该也是不能的!只要解决掉两个问题即可:
1. 缓存过期策略;
2. 缓存安全性;
其中一最简单直接的方式,就是使用一个定时刷新缓存的线程,在时间节点到达后,将缓存删除即可; 另一个安全问题,则可以使用 synchronized 或 并发包下的锁工具来实现即可。
但是真正做起来,可能不一定会简单,这也不是咱们想特别考虑的。
咱们主要看下 guava 是如何来解决这种问题的?以其思路来开拓自己的设想!
如何 guava 来作为我们的二级缓存?
1. 首先,咱们得引入 guava 的依赖
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>18.0</version>
- </dependency>
2. 创建guava缓存;
首先,创建的guava缓存实例,应是全局共用的,否则就失去了缓存的意义;
其次,一些过期参数,应支持配置化;
例子如下:
- @Component
- @Slf4j
- public class LocalEnhancedCacheHolder {
- @Value("${guava.cache.max.size}")
- private Integer maxCacheSize;
- @Value("${guava.cache.timeout}")
- private Integer guavaCacheTimeout;
- /**
- * 字符串类型取值, k->v 只支持字符串存取,避免复杂化
- */
- private LoadingCache<String, String> stringDbCacheContainer;
- /**
- * hash 数据缓存, 展示使用多key 作为 guava 的缓存key
- */
- private LoadingCache<HashDbItemEntry, byte[]> hashDbCacheContainer;
- @Resource
- private RedisTemplate redisTemplate;
- /**
- * 值为空 字符串标识
- */
- public static final String EMPTY_VALUE_STRING = "";
- /**
- * 值为空 字节标识
- */
- public static final byte[] EMPTY_VALUE_BYTES = new byte[0];
- @PostConstruct
- public void init() {
- Integer dbCount = 2;
- stringDbCacheContainer = CacheBuilder.newBuilder()
- .expireAfterWrite(guavaCacheTimeout, TimeUnit.SECONDS)
- .maximumSize(maxCacheSize)
- .build(new CacheLoader<String, String>() {
- @Override
- public String load(String key) throws Exception {
- log.info("【缓存】从redis获取配置:{}", key);
- String value = redisTemplate.get(key);
- return StringUtils.defaultIfBlank(value, EMPTY_VALUE_STRING);
- }
- });
- hashDbCacheContainer = CacheBuilder.newBuilder()
- .expireAfterWrite(guavaCacheTimeout, TimeUnit.SECONDS)
- .maximumSize(maxCacheSize / dbCount)
- .build(new CacheLoader<HashDbItemEntry, byte[]>() {
- @Override
- public byte[] load(HashDbItemEntry keyHolder) throws Exception {
- log.info("【缓存】从redis获取配置:{}", keyHolder);
- byte[] valueBytes = redisTemplate.hgetValue(
- keyHolder.getBucketKey(), keyHolder.getSlotKey());
- if(valueBytes == null) {
- valueBytes = EMPTY_VALUE_BYTES;
- }
- return valueBytes;
- }
- });
- }
- /**
- * 获取k-v中的缓存值
- *
- * @param key 键
- * @return 缓存值,没有值时,返回 null
- */
- public String getCache(String key) {
- try {
- return stringDbCacheContainer.get(key);
- } catch (ExecutionException e) {
- log.error("【缓存】获取缓存异常:{}, ex:{}", key, e);
- throw new RuntimeException(e);
- }
- }
- /**
- * 放入缓存,此处暂只实现为向redis写入值
- *
- * @param key 缓存key
- * @param value 缓存value
- */
- public void putCache(String key, String value) {
- redisTemplate.set(key, value, 0L);
- }
- /**
- * 放入缓存带超时时间设置,此处暂只实现为向redis写入值
- *
- * @param key 缓存key
- * @param value 缓存value
- * @param timeout 超时时间,单位 s
- */
- public void putCache(String key, String value, Long timeout) {
- redisTemplate.set(key, value, timeout);
- }
- /**
- * 删除单个kv缓存
- *
- * @param key 缓存键
- */
- public void removeCache(String key) {
- redisTemplate.remove(key);
- }
- /**
- * 批量删除单个kv缓存
- *
- * @param keyList 缓存键 列表,以 管道形式删除,性能更高
- */
- public void removeCache(Collection<String> keyList) {
- redisTemplate.remove(keyList);
- }
- /**
- * 从hash数据库中获取缓存值
- *
- * @param bucketKey 桶key, 对应一系列值 k->v
- * @param slotKey 槽key, 对应具体的缓存值
- * @return 缓存值
- */
- public byte[] getCacheFromHash(String bucketKey, String slotKey) {
- HashDbItemEntry entry = new HashDbItemEntry(bucketKey, slotKey);
- try {
- return hashDbCacheContainer.get(entry);
- } catch (ExecutionException e) {
- log.error("【缓存】获取缓存异常:{}, ex:{}", entry, e);
- throw new RuntimeException(e);
- }
- }
- /**
- * hash 数据结构存储
- *
- * value 暂不存储相应值,只做查询使用
- */
- class HashDbItemEntry {
- private String bucketKey;
- private String slotKey;
- private Object value;
- public HashDbItemEntry(String bucketKey, String slotKey) {
- this.bucketKey = bucketKey;
- this.slotKey = slotKey;
- }
- public String getBucketKey() {
- return bucketKey;
- }
- public String getSlotKey() {
- return slotKey;
- }
- public Object getValue() {
- return value;
- }
- // 必重写 equals & hashCode, 否则缓存将无法复用
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- HashDbItemEntry that = (HashDbItemEntry) o;
- return Objects.equals(bucketKey, that.bucketKey) &&
- Objects.equals(slotKey, that.slotKey) &&
- Objects.equals(value, that.value);
- }
- @Override
- public int hashCode() {
- return Objects.hash(bucketKey, slotKey, value);
- }
- @Override
- public String toString() {
- return "HashDbItemEntry{" +
- "bucketKey='" + bucketKey + '\'' +
- ", slotKey='" + slotKey + '\'' +
- ", value=" + value +
- '}';
- }
- }
- }
如上例子,展示了两种缓存,一种是 简单的 string -> string 的缓存, 另一种是 (string, string) -> byte[] 的缓存; 不管怎么样,只是想说明,缓存的方式有多种!
我们就以简单的 string -> string 来说明吧!
- stringDbCacheContainer = CacheBuilder.newBuilder()
- .expireAfterWrite(guavaCacheTimeout, TimeUnit.SECONDS)
- .maximumSize(maxCacheSize)
- .build(new CacheLoader<String, String>() {
- @Override
- public String load(String key) throws Exception {
- log.info("【缓存】从redis获取配置:{}", key);
- String value = redisTemplate.get(key);
- return StringUtils.defaultIfBlank(value, EMPTY_VALUE_STRING);
- }
- });
如上,咱们创建了一个缓存容器,它的最大容量是 maxCacheSize, 且每个key将在 guavaCacheTimeout 后过期, 过期后将从 redisTemplate 中获取数据!
如上,一个完整的 两级缓存组件就完成了,你大可以直接在项目进行相应的操作了!是不是很简单?
深入理解guava 二级缓存原理?
1. CacheBuilder 是如何创建的?
- @GwtCompatible(emulated = true)
- public final class CacheBuilder<K, V> {
- private static final int DEFAULT_INITIAL_CAPACITY = 16;
- private static final int DEFAULT_CONCURRENCY_LEVEL = 4;
- private static final int DEFAULT_EXPIRATION_NANOS = 0;
- private static final int DEFAULT_REFRESH_NANOS = 0;
- static final Supplier<? extends StatsCounter> NULL_STATS_COUNTER = Suppliers.ofInstance(
- new StatsCounter() {
- @Override
- public void recordHits(int count) {}
- @Override
- public void recordMisses(int count) {}
- @Override
- public void recordLoadSuccess(long loadTime) {}
- @Override
- public void recordLoadException(long loadTime) {}
- @Override
- public void recordEviction() {}
- @Override
- public CacheStats snapshot() {
- return EMPTY_STATS;
- }
- });
- static final CacheStats EMPTY_STATS = new CacheStats(0, 0, 0, 0, 0, 0);
- static final Supplier<StatsCounter> CACHE_STATS_COUNTER =
- new Supplier<StatsCounter>() {
- @Override
- public StatsCounter get() {
- return new SimpleStatsCounter();
- }
- };
- enum NullListener implements RemovalListener<Object, Object> {
- INSTANCE;
- @Override
- public void onRemoval(RemovalNotification<Object, Object> notification) {}
- }
- enum OneWeigher implements Weigher<Object, Object> {
- INSTANCE;
- @Override
- public int weigh(Object key, Object value) {
- return 1;
- }
- }
- static final Ticker NULL_TICKER = new Ticker() {
- @Override
- public long read() {
- return 0;
- }
- };
- private static final Logger logger = Logger.getLogger(CacheBuilder.class.getName());
- static final int UNSET_INT = -1;
- boolean strictParsing = true;
- int initialCapacity = UNSET_INT;
- int concurrencyLevel = UNSET_INT;
- long maximumSize = UNSET_INT;
- long maximumWeight = UNSET_INT;
- Weigher<? super K, ? super V> weigher;
- Strength keyStrength;
- Strength valueStrength;
- long expireAfterWriteNanos = UNSET_INT;
- long expireAfterAccessNanos = UNSET_INT;
- long refreshNanos = UNSET_INT;
- Equivalence<Object> keyEquivalence;
- Equivalence<Object> valueEquivalence;
- RemovalListener<? super K, ? super V> removalListener;
- Ticker ticker;
- Supplier<? extends StatsCounter> statsCounterSupplier = NULL_STATS_COUNTER;
- // TODO(fry): make constructor private and update tests to use newBuilder
- CacheBuilder() {}
- /**
- * Constructs a new {@code CacheBuilder} instance with default settings, including strong keys,
- * strong values, and no automatic eviction of any kind.
- */
- public static CacheBuilder<Object, Object> newBuilder() {
- return new CacheBuilder<Object, Object>();
- }
- /**
- * Sets the minimum total size for the internal hash tables. For example, if the initial capacity
- * is {@code 60}, and the concurrency level is {@code 8}, then eight segments are created, each
- * having a hash table of size eight. Providing a large enough estimate at construction time
- * avoids the need for expensive resizing operations later, but setting this value unnecessarily
- * high wastes memory.
- *
- * @throws IllegalArgumentException if {@code initialCapacity} is negative
- * @throws IllegalStateException if an initial capacity was already set
- */
- public CacheBuilder<K, V> initialCapacity(int initialCapacity) {
- checkState(this.initialCapacity == UNSET_INT, "initial capacity was already set to %s",
- this.initialCapacity);
- checkArgument(initialCapacity >= 0);
- this.initialCapacity = initialCapacity;
- return this;
- }
- int getInitialCapacity() {
- return (initialCapacity == UNSET_INT) ? DEFAULT_INITIAL_CAPACITY : initialCapacity;
- }
- /**
- * Guides the allowed concurrency among update operations. Used as a hint for internal sizing. The
- * table is internally partitioned to try to permit the indicated number of concurrent updates
- * without contention. Because assignment of entries to these partitions is not necessarily
- * uniform, the actual concurrency observed may vary. Ideally, you should choose a value to
- * accommodate as many threads as will ever concurrently modify the table. Using a significantly
- * higher value than you need can waste space and time, and a significantly lower value can lead
- * to thread contention. But overestimates and underestimates within an order of magnitude do not
- * usually have much noticeable impact. A value of one permits only one thread to modify the cache
- * at a time, but since read operations and cache loading computations can proceed concurrently,
- * this still yields higher concurrency than full synchronization.
- *
- * <p> Defaults to 4. <b>Note:</b>The default may change in the future. If you care about this
- * value, you should always choose it explicitly.
- *
- * <p>The current implementation uses the concurrency level to create a fixed number of hashtable
- * segments, each governed by its own write lock. The segment lock is taken once for each explicit
- * write, and twice for each cache loading computation (once prior to loading the new value,
- * and once after loading completes). Much internal cache management is performed at the segment
- * granularity. For example, access queues and write queues are kept per segment when they are
- * required by the selected eviction algorithm. As such, when writing unit tests it is not
- * uncommon to specify {@code concurrencyLevel(1)} in order to achieve more deterministic eviction
- * behavior.
- *
- * <p>Note that future implementations may abandon segment locking in favor of more advanced
- * concurrency controls.
- *
- * @throws IllegalArgumentException if {@code concurrencyLevel} is nonpositive
- * @throws IllegalStateException if a concurrency level was already set
- */
- public CacheBuilder<K, V> concurrencyLevel(int concurrencyLevel) {
- checkState(this.concurrencyLevel == UNSET_INT, "concurrency level was already set to %s",
- this.concurrencyLevel);
- checkArgument(concurrencyLevel > 0);
- this.concurrencyLevel = concurrencyLevel;
- return this;
- }
- int getConcurrencyLevel() {
- return (concurrencyLevel == UNSET_INT) ? DEFAULT_CONCURRENCY_LEVEL : concurrencyLevel;
- }
- /**
- * Specifies the maximum number of entries the cache may contain. Note that the cache <b>may evict
- * an entry before this limit is exceeded</b>. As the cache size grows close to the maximum, the
- * cache evicts entries that are less likely to be used again. For example, the cache may evict an
- * entry because it hasn't been used recently or very often.
- *
- * <p>When {@code size} is zero, elements will be evicted immediately after being loaded into the
- * cache. This can be useful in testing, or to disable caching temporarily without a code change.
- *
- * <p>This feature cannot be used in conjunction with {@link #maximumWeight}.
- *
- * @param size the maximum size of the cache
- * @throws IllegalArgumentException if {@code size} is negative
- * @throws IllegalStateException if a maximum size or weight was already set
- */
- public CacheBuilder<K, V> maximumSize(long size) {
- checkState(this.maximumSize == UNSET_INT, "maximum size was already set to %s",
- this.maximumSize);
- checkState(this.maximumWeight == UNSET_INT, "maximum weight was already set to %s",
- this.maximumWeight);
- checkState(this.weigher == null, "maximum size can not be combined with weigher");
- checkArgument(size >= 0, "maximum size must not be negative");
- this.maximumSize = size;
- return this;
- }
- /**
- * Specifies the maximum weight of entries the cache may contain. Weight is determined using the
- * {@link Weigher} specified with {@link #weigher}, and use of this method requires a
- * corresponding call to {@link #weigher} prior to calling {@link #build}.
- *
- * <p>Note that the cache <b>may evict an entry before this limit is exceeded</b>. As the cache
- * size grows close to the maximum, the cache evicts entries that are less likely to be used
- * again. For example, the cache may evict an entry because it hasn't been used recently or very
- * often.
- *
- * <p>When {@code weight} is zero, elements will be evicted immediately after being loaded into
- * cache. This can be useful in testing, or to disable caching temporarily without a code
- * change.
- *
- * <p>Note that weight is only used to determine whether the cache is over capacity; it has no
- * effect on selecting which entry should be evicted next.
- *
- * <p>This feature cannot be used in conjunction with {@link #maximumSize}.
- *
- * @param weight the maximum total weight of entries the cache may contain
- * @throws IllegalArgumentException if {@code weight} is negative
- * @throws IllegalStateException if a maximum weight or size was already set
- * @since 11.0
- */
- @GwtIncompatible("To be supported")
- public CacheBuilder<K, V> maximumWeight(long weight) {
- checkState(this.maximumWeight == UNSET_INT, "maximum weight was already set to %s",
- this.maximumWeight);
- checkState(this.maximumSize == UNSET_INT, "maximum size was already set to %s",
- this.maximumSize);
- this.maximumWeight = weight;
- checkArgument(weight >= 0, "maximum weight must not be negative");
- return this;
- }
- /**
- * Specifies the weigher to use in determining the weight of entries. Entry weight is taken
- * into consideration by {@link #maximumWeight(long)} when determining which entries to evict, and
- * use of this method requires a corresponding call to {@link #maximumWeight(long)} prior to
- * calling {@link #build}. Weights are measured and recorded when entries are inserted into the
- * cache, and are thus effectively static during the lifetime of a cache entry.
- *
- * <p>When the weight of an entry is zero it will not be considered for size-based eviction
- * (though it still may be evicted by other means).
- *
- * <p><b>Important note:</b> Instead of returning <em>this</em> as a {@code CacheBuilder}
- * instance, this method returns {@code CacheBuilder<K1, V1>}. From this point on, either the
- * original reference or the returned reference may be used to complete configuration and build
- * the cache, but only the "generic" one is type-safe. That is, it will properly prevent you from
- * building caches whose key or value types are incompatible with the types accepted by the
- * weigher already provided; the {@code CacheBuilder} type cannot do this. For best results,
- * simply use the standard method-chaining idiom, as illustrated in the documentation at top,
- * configuring a {@code CacheBuilder} and building your {@link Cache} all in a single statement.
- *
- * <p><b>Warning:</b> if you ignore the above advice, and use this {@code CacheBuilder} to build
- * a cache whose key or value type is incompatible with the weigher, you will likely experience
- * a {@link ClassCastException} at some <i>undefined</i> point in the future.
- *
- * @param weigher the weigher to use in calculating the weight of cache entries
- * @throws IllegalArgumentException if {@code size} is negative
- * @throws IllegalStateException if a maximum size was already set
- * @since 11.0
- */
- @GwtIncompatible("To be supported")
- public <K1 extends K, V1 extends V> CacheBuilder<K1, V1> weigher(
- Weigher<? super K1, ? super V1> weigher) {
- checkState(this.weigher == null);
- if (strictParsing) {
- checkState(this.maximumSize == UNSET_INT, "weigher can not be combined with maximum size",
- this.maximumSize);
- }
- // safely limiting the kinds of caches this can produce
- @SuppressWarnings("unchecked")
- CacheBuilder<K1, V1> me = (CacheBuilder<K1, V1>) this;
- me.weigher = checkNotNull(weigher);
- return me;
- }
- // Make a safe contravariant cast now so we don't have to do it over and over.
- @SuppressWarnings("unchecked")
- <K1 extends K, V1 extends V> Weigher<K1, V1> getWeigher() {
- return (Weigher<K1, V1>) MoreObjects.firstNonNull(weigher, OneWeigher.INSTANCE);
- }
- /**
- * Specifies that each entry should be automatically removed from the cache once a fixed duration
- * has elapsed after the entry's creation, or the most recent replacement of its value.
- *
- * <p>When {@code duration} is zero, this method hands off to
- * {@link #maximumSize(long) maximumSize}{@code (0)}, ignoring any otherwise-specificed maximum
- * size or weight. This can be useful in testing, or to disable caching temporarily without a code
- * change.
- *
- * <p>Expired entries may be counted in {@link Cache#size}, but will never be visible to read or
- * write operations. Expired entries are cleaned up as part of the routine maintenance described
- * in the class javadoc.
- *
- * @param duration the length of time after an entry is created that it should be automatically
- * removed
- * @param unit the unit that {@code duration} is expressed in
- * @throws IllegalArgumentException if {@code duration} is negative
- * @throws IllegalStateException if the time to live or time to idle was already set
- */
- public CacheBuilder<K, V> expireAfterWrite(long duration, TimeUnit unit) {
- checkState(expireAfterWriteNanos == UNSET_INT, "expireAfterWrite was already set to %s ns",
- expireAfterWriteNanos);
- checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit);
- this.expireAfterWriteNanos = unit.toNanos(duration);
- return this;
- }
- long getExpireAfterWriteNanos() {
- return (expireAfterWriteNanos == UNSET_INT) ? DEFAULT_EXPIRATION_NANOS : expireAfterWriteNanos;
- }
- /**
- * Specifies that each entry should be automatically removed from the cache once a fixed duration
- * has elapsed after the entry's creation, the most recent replacement of its value, or its last
- * access. Access time is reset by all cache read and write operations (including
- * {@code Cache.asMap().get(Object)} and {@code Cache.asMap().put(K, V)}), but not by operations
- * on the collection-views of {@link Cache#asMap}.
- *
- * <p>When {@code duration} is zero, this method hands off to
- * {@link #maximumSize(long) maximumSize}{@code (0)}, ignoring any otherwise-specificed maximum
- * size or weight. This can be useful in testing, or to disable caching temporarily without a code
- * change.
- *
- * <p>Expired entries may be counted in {@link Cache#size}, but will never be visible to read or
- * write operations. Expired entries are cleaned up as part of the routine maintenance described
- * in the class javadoc.
- *
- * @param duration the length of time after an entry is last accessed that it should be
- * automatically removed
- * @param unit the unit that {@code duration} is expressed in
- * @throws IllegalArgumentException if {@code duration} is negative
- * @throws IllegalStateException if the time to idle or time to live was already set
- */
- public CacheBuilder<K, V> expireAfterAccess(long duration, TimeUnit unit) {
- checkState(expireAfterAccessNanos == UNSET_INT, "expireAfterAccess was already set to %s ns",
- expireAfterAccessNanos);
- checkArgument(duration >= 0, "duration cannot be negative: %s %s", duration, unit);
- this.expireAfterAccessNanos = unit.toNanos(duration);
- return this;
- }
- long getExpireAfterAccessNanos() {
- return (expireAfterAccessNanos == UNSET_INT)
- ? DEFAULT_EXPIRATION_NANOS : expireAfterAccessNanos;
- }
- /**
- * Specifies that active entries are eligible for automatic refresh once a fixed duration has
- * elapsed after the entry's creation, or the most recent replacement of its value. The semantics
- * of refreshes are specified in {@link LoadingCache#refresh}, and are performed by calling
- * {@link CacheLoader#reload}.
- *
- * <p>As the default implementation of {@link CacheLoader#reload} is synchronous, it is
- * recommended that users of this method override {@link CacheLoader#reload} with an asynchronous
- * implementation; otherwise refreshes will be performed during unrelated cache read and write
- * operations.
- *
- * <p>Currently automatic refreshes are performed when the first stale request for an entry
- * occurs. The request triggering refresh will make a blocking call to {@link CacheLoader#reload}
- * and immediately return the new value if the returned future is complete, and the old value
- * otherwise.
- *
- * <p><b>Note:</b> <i>all exceptions thrown during refresh will be logged and then swallowed</i>.
- *
- * @param duration the length of time after an entry is created that it should be considered
- * stale, and thus eligible for refresh
- * @param unit the unit that {@code duration} is expressed in
- * @throws IllegalArgumentException if {@code duration} is negative
- * @throws IllegalStateException if the refresh interval was already set
- * @since 11.0
- */
- @Beta
- @GwtIncompatible("To be supported (synchronously).")
- public CacheBuilder<K, V> refreshAfterWrite(long duration, TimeUnit unit) {
- checkNotNull(unit);
- checkState(refreshNanos == UNSET_INT, "refresh was already set to %s ns", refreshNanos);
- checkArgument(duration > 0, "duration must be positive: %s %s", duration, unit);
- this.refreshNanos = unit.toNanos(duration);
- return this;
- }
- long getRefreshNanos() {
- return (refreshNanos == UNSET_INT) ? DEFAULT_REFRESH_NANOS : refreshNanos;
- }
- /**
- * Specifies a nanosecond-precision time source for use in determining when entries should be
- * expired. By default, {@link System#nanoTime} is used.
- *
- * <p>The primary intent of this method is to facilitate testing of caches which have been
- * configured with {@link #expireAfterWrite} or {@link #expireAfterAccess}.
- *
- * @throws IllegalStateException if a ticker was already set
- */
- public CacheBuilder<K, V> ticker(Ticker ticker) {
- checkState(this.ticker == null);
- this.ticker = checkNotNull(ticker);
- return this;
- }
- Ticker getTicker(boolean recordsTime) {
- if (ticker != null) {
- return ticker;
- }
- return recordsTime ? Ticker.systemTicker() : NULL_TICKER;
- }
- /**
- * Specifies a listener instance that caches should notify each time an entry is removed for any
- * {@linkplain RemovalCause reason}. Each cache created by this builder will invoke this listener
- * as part of the routine maintenance described in the class documentation above.
- *
- * <p><b>Warning:</b> after invoking this method, do not continue to use <i>this</i> cache
- * builder reference; instead use the reference this method <i>returns</i>. At runtime, these
- * point to the same instance, but only the returned reference has the correct generic type
- * information so as to ensure type safety. For best results, use the standard method-chaining
- * idiom illustrated in the class documentation above, configuring a builder and building your
- * cache in a single statement. Failure to heed this advice can result in a {@link
- * ClassCastException} being thrown by a cache operation at some <i>undefined</i> point in the
- * future.
- *
- * <p><b>Warning:</b> any exception thrown by {@code listener} will <i>not</i> be propagated to
- * the {@code Cache} user, only logged via a {@link Logger}.
- *
- * @return the cache builder reference that should be used instead of {@code this} for any
- * remaining configuration and cache building
- * @throws IllegalStateException if a removal listener was already set
- */
- @CheckReturnValue
- public <K1 extends K, V1 extends V> CacheBuilder<K1, V1> removalListener(
- RemovalListener<? super K1, ? super V1> listener) {
- checkState(this.removalListener == null);
- // safely limiting the kinds of caches this can produce
- @SuppressWarnings("unchecked")
- CacheBuilder<K1, V1> me = (CacheBuilder<K1, V1>) this;
- me.removalListener = checkNotNull(listener);
- return me;
- }
- // Make a safe contravariant cast now so we don't have to do it over and over.
- @SuppressWarnings("unchecked")
- <K1 extends K, V1 extends V> RemovalListener<K1, V1> getRemovalListener() {
- return (RemovalListener<K1, V1>)
- MoreObjects.firstNonNull(removalListener, NullListener.INSTANCE);
- }
- /**
- * Enable the accumulation of {@link CacheStats} during the operation of the cache. Without this
- * {@link Cache#stats} will return zero for all statistics. Note that recording stats requires
- * bookkeeping to be performed with each operation, and thus imposes a performance penalty on
- * cache operation.
- *
- * @since 12.0 (previously, stats collection was automatic)
- */
- public CacheBuilder<K, V> recordStats() {
- statsCounterSupplier = CACHE_STATS_COUNTER;
- return this;
- }
- boolean isRecordingStats() {
- return statsCounterSupplier == CACHE_STATS_COUNTER;
- }
- Supplier<? extends StatsCounter> getStatsCounterSupplier() {
- return statsCounterSupplier;
- }
- /**
- * Builds a cache, which either returns an already-loaded value for a given key or atomically
- * computes or retrieves it using the supplied {@code CacheLoader}. If another thread is currently
- * loading the value for this key, simply waits for that thread to finish and returns its
- * loaded value. Note that multiple threads can concurrently load values for distinct keys.
- *
- * <p>This method does not alter the state of this {@code CacheBuilder} instance, so it can be
- * invoked again to create multiple independent caches.
- *
- * @param loader the cache loader used to obtain new values
- * @return a cache having the requested features
- */
- public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(
- CacheLoader<? super K1, V1> loader) {
- checkWeightWithWeigher();
- return new LocalCache.LocalLoadingCache<K1, V1>(this, loader);
- }
- /**
- * Builds a cache which does not automatically load values when keys are requested.
- *
- * <p>Consider {@link #build(CacheLoader)} instead, if it is feasible to implement a
- * {@code CacheLoader}.
- *
- * <p>This method does not alter the state of this {@code CacheBuilder} instance, so it can be
- * invoked again to create multiple independent caches.
- *
- * @return a cache having the requested features
- * @since 11.0
- */
- public <K1 extends K, V1 extends V> Cache<K1, V1> build() {
- checkWeightWithWeigher();
- checkNonLoadingCache();
- return new LocalCache.LocalManualCache<K1, V1>(this);
- }
- }
如上,使用建造者模式创建 LoadingCache<K, V> 缓存; 设置好 最大值,过期时间等参数;
2. 如何获取一个guava缓存?
其实就是一个get方法而已! stringDbCacheContainer.get(key);
- // com.google.common.cache.LocalCache
- // LoadingCache methods
- @Override
- public V get(K key) throws ExecutionException {
- // 两种数据来源,一是直接获取,二是调用 load() 方法加载数据
- return localCache.getOrLoad(key);
- }
- // com.google.common.cache.LocalCache
- V getOrLoad(K key) throws ExecutionException {
- return get(key, defaultLoader);
- }
- V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
- int hash = hash(checkNotNull(key));
- // 还记得 ConcurrentHashMap 吗? 先定位segment, 再定位 entry
- return segmentFor(hash).get(key, hash, loader);
- }
- Segment<K, V> segmentFor(int hash) {
- // TODO(fry): Lazily create segments?
- return segments[(hash >>> segmentShift) & segmentMask];
- }
- // 核心取数逻辑在此get 中
- // loading
- V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
- checkNotNull(key);
- checkNotNull(loader);
- try {
- if (count != 0) { // read-volatile
- // don't call getLiveEntry, which would ignore loading values
- ReferenceEntry<K, V> e = getEntry(key, hash);
- if (e != null) {
- // 如果存在值,则依据 ticker 进行判断是否过期,从而直接返回值,具体过期逻辑稍后再说
- long now = map.ticker.read();
- V value = getLiveValue(e, now);
- if (value != null) {
- recordRead(e, now);
- statsCounter.recordHits(1);
- return scheduleRefresh(e, key, hash, value, now, loader);
- }
- ValueReference<K, V> valueReference = e.getValueReference();
- if (valueReference.isLoading()) {
- return waitForLoadingValue(e, key, valueReference);
- }
- }
- }
- // 初次加载或过期之后,进入加载逻辑,重要
- // at this point e is either null or expired;
- return lockedGetOrLoad(key, hash, loader);
- } catch (ExecutionException ee) {
- Throwable cause = ee.getCause();
- if (cause instanceof Error) {
- throw new ExecutionError((Error) cause);
- } else if (cause instanceof RuntimeException) {
- throw new UncheckedExecutionException(cause);
- }
- throw ee;
- } finally {
- postReadCleanup();
- }
- }
- // static class Segment<K, V> extends ReentrantLock
- // 整个 Segment 继承了 ReentrantLock, 所以 LocalCache 的锁是依赖于 ReentrantLock 实现的
- V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader)
- throws ExecutionException {
- ReferenceEntry<K, V> e;
- ValueReference<K, V> valueReference = null;
- LoadingValueReference<K, V> loadingValueReference = null;
- boolean createNewEntry = true;
- lock();
- try {
- // re-read ticker once inside the lock
- long now = map.ticker.read();
- // 在更新值前,先把过期数据清除
- preWriteCleanup(now);
- int newCount = this.count - 1;
- AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
- int index = hash & (table.length() - 1);
- ReferenceEntry<K, V> first = table.get(index);
- // 处理 hash 碰撞时的链表查询
- for (e = first; e != null; e = e.getNext()) {
- K entryKey = e.getKey();
- if (e.getHash() == hash && entryKey != null
- && map.keyEquivalence.equivalent(key, entryKey)) {
- valueReference = e.getValueReference();
- if (valueReference.isLoading()) {
- createNewEntry = false;
- } else {
- V value = valueReference.get();
- if (value == null) {
- enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED);
- } else if (map.isExpired(e, now)) {
- // This is a duplicate check, as preWriteCleanup already purged expired
- // entries, but let's accomodate an incorrect expiration queue.
- enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED);
- } else {
- recordLockedRead(e, now);
- statsCounter.recordHits(1);
- // we were concurrent with loading; don't consider refresh
- return value;
- }
- // immediately reuse invalid entries
- writeQueue.remove(e);
- accessQueue.remove(e);
- this.count = newCount; // write-volatile
- }
- break;
- }
- }
- // 如果是第一次加载,则先创建 Entry, 进入 load() 逻辑
- if (createNewEntry) {
- loadingValueReference = new LoadingValueReference<K, V>();
- if (e == null) {
- e = newEntry(key, hash, first);
- e.setValueReference(loadingValueReference);
- table.set(index, e);
- } else {
- e.setValueReference(loadingValueReference);
- }
- }
- } finally {
- unlock();
- postWriteCleanup();
- }
- if (createNewEntry) {
- try {
- // Synchronizes on the entry to allow failing fast when a recursive load is
- // detected. This may be circumvented when an entry is copied, but will fail fast most
- // of the time.
- // 同步加载数据源值, 从 loader 中处理
- synchronized (e) {
- return loadSync(key, hash, loadingValueReference, loader);
- }
- } finally {
- // 记录未命中计数,默认为空
- statsCounter.recordMisses(1);
- }
- } else {
- // The entry already exists. Wait for loading.
- // 如果有线程正在更新缓存,则等待结果即可,具体实现就是调用 Future.get()
- return waitForLoadingValue(e, key, valueReference);
- }
- }
- // 加载原始值
- // at most one of loadSync.loadAsync may be called for any given LoadingValueReference
- V loadSync(K key, int hash, LoadingValueReference<K, V> loadingValueReference,
- CacheLoader<? super K, V> loader) throws ExecutionException {
- // loadingValueReference中保存了回调引用,加载原始值
- ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
- // 存储数据入缓存,以便下次使用
- return getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
- }
- // 从 loader 中加载数据,
- public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
- stopwatch.start();
- V previousValue = oldValue.get();
- try {
- // 如果原来没有值,则直接加载后返回
- if (previousValue == null) {
- V newValue = loader.load(key);
- return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
- }
- // 否则一般为无过期时间的数据进行 reload, 如果 reload() 的结果为空,则直接返回
- // 须重写 reload() 实现
- ListenableFuture<V> newValue = loader.reload(key, previousValue);
- if (newValue == null) {
- return Futures.immediateFuture(null);
- }
- // To avoid a race, make sure the refreshed value is set into loadingValueReference
- // *before* returning newValue from the cache query.
- return Futures.transform(newValue, new Function<V, V>() {
- @Override
- public V apply(V newValue) {
- LoadingValueReference.this.set(newValue);
- return newValue;
- }
- });
- } catch (Throwable t) {
- if (t instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- return setException(t) ? futureValue : fullyFailedFuture(t);
- }
- }
- // com.google.common.util.concurrent.Uninterruptibles
- /**
- * Waits uninterruptibly for {@code newValue} to be loaded, and then records loading stats.
- */
- V getAndRecordStats(K key, int hash, LoadingValueReference<K, V> loadingValueReference,
- ListenableFuture<V> newValue) throws ExecutionException {
- V value = null;
- try {
- // 同步等待加载结果,注意,此处返回值不允许为null, 否则将报异常,这可能是为了规避缓存攻击漏洞吧
- value = getUninterruptibly(newValue);
- if (value == null) {
- throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + ".");
- }
- // 加载成功记录,此处扩展点,默认为空
- statsCounter.recordLoadSuccess(loadingValueReference.elapsedNanos());
- // 最后将值存入缓存容器中,返回(论hash的重要性)
- storeLoadedValue(key, hash, loadingValueReference, value);
- return value;
- } finally {
- if (value == null) {
- statsCounter.recordLoadException(loadingValueReference.elapsedNanos());
- removeLoadingValue(key, hash, loadingValueReference);
- }
- }
- }
- /**
- * Invokes {@code future.}{@link Future#get() get()} uninterruptibly.
- * To get uninterruptibility and remove checked exceptions, see
- * {@link Futures#getUnchecked}.
- *
- * <p>If instead, you wish to treat {@link InterruptedException} uniformly
- * with other exceptions, see {@link Futures#get(Future, Class) Futures.get}
- * or {@link Futures#makeChecked}.
- *
- * @throws ExecutionException if the computation threw an exception
- * @throws CancellationException if the computation was cancelled
- */
- public static <V> V getUninterruptibly(Future<V> future)
- throws ExecutionException {
- boolean interrupted = false;
- try {
- while (true) {
- try {
- return future.get();
- } catch (InterruptedException e) {
- interrupted = true;
- }
- }
- } finally {
- if (interrupted) {
- Thread.currentThread().interrupt();
- }
- }
- }
如上,就是获取一个缓存的过程。总结下来就是:
1. 先使用hash定位到 segment中,然后尝试直接到 map中获取结果;
2. 如果没有找到或者已过期,则调用客户端的load()方法加载原始数据;
3. 将结果存入 segment.map 中,本地缓存生效;
4. 记录命中情况,读取计数;
3. 如何处理过期?
其实刚刚我们在看get()方法时,就看到了一些端倪。
要确认两点: 1. 是否有创建异步清理线程进行过期数据清理? 2. 清理过程中,原始数据如何自处?
其实guava的清理时机是在加载数据之前进行的!
- // com.google.common.cache.LocalCache
- // static class Segment<K, V> extends ReentrantLock
- // 整个 Segment 继承了 ReentrantLock, 所以 LocalCache 的锁是依赖于 ReentrantLock 实现的
- V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader)
- throws ExecutionException {
- ReferenceEntry<K, V> e;
- ValueReference<K, V> valueReference = null;
- LoadingValueReference<K, V> loadingValueReference = null;
- boolean createNewEntry = true;
- lock();
- try {
- // re-read ticker once inside the lock
- long now = map.ticker.read();
- // 在更新值前,先把过期数据清除
- preWriteCleanup(now);
- int newCount = this.count - 1;
- AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
- int index = hash & (table.length() - 1);
- ReferenceEntry<K, V> first = table.get(index);
- // 处理 hash 碰撞时的链表查询
- for (e = first; e != null; e = e.getNext()) {
- K entryKey = e.getKey();
- if (e.getHash() == hash && entryKey != null
- && map.keyEquivalence.equivalent(key, entryKey)) {
- valueReference = e.getValueReference();
- if (valueReference.isLoading()) {
- createNewEntry = false;
- } else {
- V value = valueReference.get();
- if (value == null) {
- enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED);
- } else if (map.isExpired(e, now)) {
- // This is a duplicate check, as preWriteCleanup already purged expired
- // entries, but let's accomodate an incorrect expiration queue.
- enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED);
- } else {
- recordLockedRead(e, now);
- statsCounter.recordHits(1);
- // we were concurrent with loading; don't consider refresh
- return value;
- }
- // immediately reuse invalid entries
- writeQueue.remove(e);
- accessQueue.remove(e);
- this.count = newCount; // write-volatile
- }
- break;
- }
- }
- // 如果是第一次加载,则先创建 Entry, 进入 load() 逻辑
- if (createNewEntry) {
- loadingValueReference = new LoadingValueReference<K, V>();
- if (e == null) {
- e = newEntry(key, hash, first);
- e.setValueReference(loadingValueReference);
- table.set(index, e);
- } else {
- e.setValueReference(loadingValueReference);
- }
- }
- } finally {
- unlock();
- postWriteCleanup();
- }
- if (createNewEntry) {
- try {
- // Synchronizes on the entry to allow failing fast when a recursive load is
- // detected. This may be circumvented when an entry is copied, but will fail fast most
- // of the time.
- // 同步加载数据源值, 从 loader 中处理
- synchronized (e) {
- return loadSync(key, hash, loadingValueReference, loader);
- }
- } finally {
- // 记录未命中计数,默认为空
- statsCounter.recordMisses(1);
- }
- } else {
- // The entry already exists. Wait for loading.
- return waitForLoadingValue(e, key, valueReference);
- }
- }
- // 我们来细看下 preWriteCleanup(now); 是如何清理过期数据的
- /**
- * Performs routine cleanup prior to executing a write. This should be called every time a
- * write thread acquires the segment lock, immediately after acquiring the lock.
- *
- * <p>Post-condition: expireEntries has been run.
- */
- @GuardedBy("this")
- void preWriteCleanup(long now) {
- runLockedCleanup(now);
- }
- void runLockedCleanup(long now) {
- // 再次确保清理数据时,锁是存在的
- if (tryLock()) {
- try {
- // 当存在特殊类型数据时,可以先进行清理
- drainReferenceQueues();
- // 清理过期数据,按时间清理
- expireEntries(now); // calls drainRecencyQueue
- // 读计数清零
- readCount.set(0);
- } finally {
- unlock();
- }
- }
- }
- /**
- * Drain the key and value reference queues, cleaning up internal entries containing garbage
- * collected keys or values.
- */
- @GuardedBy("this")
- void drainReferenceQueues() {
- if (map.usesKeyReferences()) {
- drainKeyReferenceQueue();
- }
- if (map.usesValueReferences()) {
- drainValueReferenceQueue();
- }
- }
- @GuardedBy("this")
- void expireEntries(long now) {
- // 更新最近的访问队列
- drainRecencyQueue();
- ReferenceEntry<K, V> e;
- // 从头部开始取元素,如果过期就进行清理
- // 写队列超时: 清理, 访问队列超时: 清理
- while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
- if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
- throw new AssertionError();
- }
- }
- while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
- if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
- throw new AssertionError();
- }
- }
- }
- @Override
- public ReferenceEntry<K, V> peek() {
- ReferenceEntry<K, V> next = head.getNextInAccessQueue();
- return (next == head) ? null : next;
- }
- // 清理指定类型的元素,如 过期元素
- @GuardedBy("this")
- boolean removeEntry(ReferenceEntry<K, V> entry, int hash, RemovalCause cause) {
- int newCount = this.count - 1;
- AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
- int index = hash & (table.length() - 1);
- ReferenceEntry<K, V> first = table.get(index);
- for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
- if (e == entry) {
- ++modCount;
- // 调用 removeValueFromChain, 清理具体元素
- ReferenceEntry<K, V> newFirst = removeValueFromChain(
- first, e, e.getKey(), hash, e.getValueReference(), cause);
- newCount = this.count - 1;
- table.set(index, newFirst);
- this.count = newCount; // write-volatile
- return true;
- }
- }
- return false;
- }
- @GuardedBy("this")
- @Nullable
- ReferenceEntry<K, V> removeValueFromChain(ReferenceEntry<K, V> first,
- ReferenceEntry<K, V> entry, @Nullable K key, int hash, ValueReference<K, V> valueReference,
- RemovalCause cause) {
- enqueueNotification(key, hash, valueReference, cause);
- // 清理两队列
- writeQueue.remove(entry);
- accessQueue.remove(entry);
- if (valueReference.isLoading()) {
- valueReference.notifyNewValue(null);
- return first;
- } else {
- return removeEntryFromChain(first, entry);
- }
- }
- @GuardedBy("this")
- @Nullable
- ReferenceEntry<K, V> removeEntryFromChain(ReferenceEntry<K, V> first,
- ReferenceEntry<K, V> entry) {
- int newCount = count;
- // 普通情况,则直接返回 next 元素链即可
- // 针对有first != entry 的情况,则依次将 first 移动到队尾,然后跳到下一个元素返回
- ReferenceEntry<K, V> newFirst = entry.getNext();
- for (ReferenceEntry<K, V> e = first; e != entry; e = e.getNext()) {
- // 将first链表倒转到 newFirst 尾部
- ReferenceEntry<K, V> next = copyEntry(e, newFirst);
- if (next != null) {
- newFirst = next;
- } else {
- removeCollectedEntry(e);
- newCount--;
- }
- }
- this.count = newCount;
- return newFirst;
- }
到此,我们就完整的看到了一个 key 的过期处理流程了。总结就是:
1. 在读取的时候,触发清理操作;
2. 使用 ReentrantLock 来进行线程安全的更新;
3. 读取计数器清零,元素数量减少;
3. 怎样主动放入一个缓存?
这个和普通的map的put方法一样,简单看下即可!
- // com.google.common.cache.LocalCache$LocalManualCache
- @Override
- public void put(K key, V value) {
- localCache.put(key, value);
- }
- // com.google.common.cache.LocalCache
- @Override
- public V put(K key, V value) {
- checkNotNull(key);
- checkNotNull(value);
- int hash = hash(key);
- return segmentFor(hash).put(key, hash, value, false);
- }
- // com.google.common.cache.LocalCache$Segment
- @Nullable
- V put(K key, int hash, V value, boolean onlyIfAbsent) {
- lock();
- try {
- long now = map.ticker.read();
- preWriteCleanup(now);
- int newCount = this.count + 1;
- if (newCount > this.threshold) { // ensure capacity
- expand();
- newCount = this.count + 1;
- }
- AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
- int index = hash & (table.length() - 1);
- ReferenceEntry<K, V> first = table.get(index);
- // Look for an existing entry.
- for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
- K entryKey = e.getKey();
- if (e.getHash() == hash && entryKey != null
- && map.keyEquivalence.equivalent(key, entryKey)) {
- // We found an existing entry.
- ValueReference<K, V> valueReference = e.getValueReference();
- V entryValue = valueReference.get();
- if (entryValue == null) {
- ++modCount;
- if (valueReference.isActive()) {
- enqueueNotification(key, hash, valueReference, RemovalCause.COLLECTED);
- setValue(e, key, value, now);
- newCount = this.count; // count remains unchanged
- } else {
- setValue(e, key, value, now);
- newCount = this.count + 1;
- }
- this.count = newCount; // write-volatile
- evictEntries();
- return null;
- } else if (onlyIfAbsent) {
- // Mimic
- // "if (!map.containsKey(key)) ...
- // else return map.get(key);
- recordLockedRead(e, now);
- return entryValue;
- } else {
- // clobber existing entry, count remains unchanged
- ++modCount;
- enqueueNotification(key, hash, valueReference, RemovalCause.REPLACED);
- setValue(e, key, value, now);
- evictEntries();
- return entryValue;
- }
- }
- }
- // Create a new entry.
- ++modCount;
- ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
- setValue(newEntry, key, value, now);
- table.set(index, newEntry);
- newCount = this.count + 1;
- this.count = newCount; // write-volatile
- evictEntries();
- return null;
- } finally {
- unlock();
- postWriteCleanup();
- }
- }
就这样,基于guava的二级缓存功能就搞定了。其实并没有多高深!
老话:感谢那些折磨你的人!