Guava Caches和Guava ListenableFuture学习

1.Guava是什么?

Guava是Google的一组核心Java库,其中包括新的集合类型(例如多图和多集),不可变的集合,图形库以及用于并发,I / O,哈希,缓存,原语,字符串等的实用程序!它广泛用于Google的大多数Java项目中,也被许多其他公司广泛使用。

它的github地址为: https://github.com/google/guava

本次主要参照官方文档,对项目中用到的guava caches和guava listenableFuture进行了学习.

2.Guava Caches

2.1.为什么要用Guava Caches?

基于内存的高速本地缓存在各种情况下都很用,我们在编程开发时,一定使用过HashMap或ConcurrentMap对数据进行过缓存.Guava Caches就与ConcurrentMap比较相似,最根本的区别就是ConcurrentMap会保存添加到其中的所有元素,直到调用代码将其明确删除为止。而Guava Caches则可以对存储的元素进行管理和剔除,以限制其内存占用量,而且它提供了LoadingCache,进行缓存的自动加载,非常的方便。

在官方文档上这样介绍Guava:

Generally, the Guava caching utilities are applicable whenever:

  • You are willing to spend some memory to improve speed.
  • You expect that keys will sometimes get queried more than once.
  • Your cache will not need to store more data than what would fit in RAM. (Guava caches are local to a single run of your application. They do not store data in files, or on outside servers. If this does not fit your needs, consider a tool like Memcached.)

If each of these apply to your use case, then the Guava caching utilities could be right for you!

当然如果不需要Guava Caches的这些特性,那么ConcurrentHashMap的内存效率更高——但是用任何旧的ConcurrentMap来复制大多数Guava Cahces的特性是极其困难或不可能的(官网都这么说了,有现成的那就用吧).

2.2.Guava Caches的学习和使用

首先把Guava的包引进来

1
2
3
4
5
6
<!--guava-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>

2.2.1.创建缓存和获取缓存基本用法

1
2
3
4
5
6
7
8
9
LoadingCache<String, String> graphs = CacheBuilder.newBuilder()
.maximumSize(1)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build(
new CacheLoader<String, String>() {
public String load(String s) throws Exception {
return getRealCache(s); // 这个方法可以自己写
}
});

代码中的build中出现了CacheLoader这个东西,他的用处意思就是设置一个默认的方法,在取值的时候如果key对应的value不存在,那么就用这个方法获取value然后自动进行设置.当然我们也可以直接用graphs.put(k,v)来进行缓存的添加,但是建议用CacheLoader来自动加载设置缓存,可以保证缓存加载的统一性.

当我们不想用默认的CacheLoader来进行缓存加载时,我们可以在获取缓存时指定方法对默认加载方法进行覆盖,它使用原子性的“ get-if-absent-compute”语义,传递 Callableget调用,此方法为常规的“如果已缓存,则返回;否则创建,缓存并返回”模式提供了简单的替代方法。如下面代码所示:

1
2
3
4
String noCacheValueCallable = graphs.get("helloCallable", () -> {
return "helloCallableValue:Callable";
});
log.info("获取不存在的缓存调用Callable进行加载: {}", noCacheValueCallable);

其他API简单列举:

  • get: 要么返回已经缓存的值,要么使用CacheLoader向缓存原子地加载新值;

  • getUnchecked: CacheLoader 会抛异常,定义的CacheLoader没有声明任何检查型异常,则可以 getUnchecked 查找缓存;反之不能;

  • getAll: 方法用来执行批量查询;

  • put: 向缓存显式插入值,Cache.asMap()也能修改值,但不具原子性;

  • getIfPresent: 该方法只是简单的把Guava Cache当作Map的替代品,不执行load方法;

2.2.2.清除key:

我们几乎可以肯定没有足够的内存来缓存我们可以缓存的所有内容.所以必须决定:什么情况下不值得保留缓存对?Guava Caches提供了三种缓存清除类型:

  • 基于大小的清除
  • 基于时间的清除
  • 基于引用的清除

基于大小的清除:

使用CacheBuilder.maximumSize(long)进行设置,缓存将清除最近没有使用或者不经常使用的缓存.值得注意的是,缓存会在这个数值接近设置值的时候就对缓存进行清除.除此之外,也可以用CacheBuilder.weigher(Weigher)来设置权重函数来进行清除的规则设置.

直接贴上一个伪代码帮助理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
LoadingCache<Key, Graph> graphs = CacheBuilder.newBuilder()
.maximumWeight(100000)
.weigher(new Weigher<Key, Graph>() {
public int weigh(Key k, Graph g) {
return g.vertices().size();
}
})
.build(
new CacheLoader<Key, Graph>() {
public Graph load(Key key) { // no checked exception
return createExpensiveGraph(key);
}
});

基于时间的清除:

请注意: 定时到期是在写入过程中进行定期维护的,偶尔在读取过程中进行维护,并不是实时的哦.

基于引用的清除:

Guava允许您设置缓存,以通过对键或值使用弱引用,对值使用软引用来对条目进行垃圾回收。

  • CacheBuilder.weakKeys()使用弱引用存储密钥。如果没有其他(强或软)键引用,则可以垃圾回收条目。由于垃圾回收仅取决于身份相等性,因此导致整个缓存使用身份(==)相等性来比较键,而不是equals()
  • CacheBuilder.weakValues()使用弱引用存储值。如果没有其他(强或软)值引用,则可以垃圾回收这些条目。由于垃圾回收仅取决于身份相等性,因此这导致整个缓存使用身份(==)相等性来比较值,而不是equals()
  • CacheBuilder.softValues()将值包装在软引用中。响应内存需求,以全局最近最少使用的方式软引用的对象进行垃圾回收。由于使用软引用会对性能产生影响,因此我们通常建议使用更可预测的最大高速缓存大小。使用softValues()会导致使用identity(==)相等而不是来比较值equals()

其他的清除 API:

guava cache 自带 清除机制,但仍旧可以手动清除:

  • 个别清除:Cache.invalidate(key)

  • 批量清除:Cache.invalidateAll(keys)

  • 清除所有缓存项:Cache.invalidateAll()

缓存清除的监听器:

可以通过CacheBuilder.removalListener(RemovalListener)为缓存指定删除侦听器,以便在删除条目时执行某些操作。向 RemovalListener传递一个RemovalNotification,其中指定了 RemovalCause、键和值。

注意,RemovalListener抛出的任何异常都会被记录(使用日志记录器)并被吞噬。

然后直接上一下我写的一个测试用例代码,Guava Caches的最基本的使用就在这块代码里了,通过代码可以更直观的看到缓存的基本操作用法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.bestqiang.guava.cache;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
* GuavaCacheStudyTest
*
* @author bestqiang
* @date 2020/7/27 10:51
*/
@Slf4j
public class GuavaCacheStudyTest {

@Test
public void testGuavaCaches() throws ExecutionException {
LoadingCache<String, String> graphs = CacheBuilder.newBuilder()
.maximumSize(1)
.removalListener((cache) -> {
log.info("key为{},value为{}的缓存被清除了...", cache.getKey(), cache.getValue());
})
.expireAfterWrite(10, TimeUnit.MINUTES)
.build(
new CacheLoader<String, String>() {
public String load(String s) throws Exception {
return getRealCache(s);
}
});
// 可以直接用put,但是建议使用自动缓存加载,这样可以保证缓存加载的统一性
graphs.put("hello", "helloValue");
String hello = graphs.get("hello");
log.info("获取已有的缓存: {}", hello);

String noCacheValue = graphs.get("helloCacheLoader");
log.info("获取不存在的缓存会自动调用load进行加载: {}", noCacheValue);

// get时指定加载方法
String noCacheValueCallable = graphs.get("helloCallable", () -> {
return "helloCallableValue:Callable";
});
log.info("获取不存在的缓存调用Callable进行加载: {}", noCacheValueCallable);
}

private String getRealCache(String s) {
return s + ":RealCache";
}

}

代码运行结果:

1
2
3
4
5
2020-07-27 21:07:23.631 INFO [main] com.bestqiang.guava.cache.GuavaCacheStudyTest:38 say: 获取已有的缓存: helloValue
2020-07-27 21:07:23.699 INFO [main] com.bestqiang.guava.cache.GuavaCacheStudyTest:26 say: key为hello,value为helloValue的缓存被清除了...
2020-07-27 21:07:23.700 INFO [main] com.bestqiang.guava.cache.GuavaCacheStudyTest:41 say: 获取不存在的缓存会自动调用load进行加载: helloCacheLoader:RealCache
2020-07-27 21:07:23.701 INFO [main] com.bestqiang.guava.cache.GuavaCacheStudyTest:26 say: key为helloCacheLoader,value为helloCacheLoader:RealCache的缓存被清除了...
2020-07-27 21:07:23.701 INFO [main] com.bestqiang.guava.cache.GuavaCacheStudyTest:47 say: 获取不存在的缓存调用Callable进行加载: helloCallableValue:Callable

应该什么时候进行缓存清理:

使用CacheBuilder构建的缓存不会“自动”执行清理和逐出值,也不会在值过期后立即执行此类操作。相反,它在写操作期间执行少量维护,或者在偶尔的读操作(如果写操作很少)期间执行少量维护。

这样做的原因如下:如果我们想要持续地执行缓存维护,我们将需要创建一个线程,而它的操作将与用户操作竞争共享锁。此外,一些环境限制线程的创建,这将使CacheBuilder在该环境中不可用。

相反,我们把选择权放在你的手中。如果您的缓存是高吞吐量的,那么您就不必担心执行缓存维护来清除过期的条目等等。如果您的缓存写入很少,并且您不希望清理阻塞缓存读取,您可能希望创建自己的维护线程,定期调用cache.cleanup()。

如果您想为很少写入的缓存安排常规的缓存维护,只需使用ScheduledExecutorService来安排维护。

总结就是一句话:高吞吐量不用担心,缓存在写入或读取的时候就会自动完成清理操作.如果缓存的写操作很少,不希望清理操作阻塞缓存读取,那么开一个线程定期调用cache.cleanup()进行缓存清理就ok.

Refresh操作:

CacheBuilder.refreshAfterWrite(1, TimeUnit.MINUTES) 刷新操作,对于刷新不完全等同于清除。如LoadingCache.refresh(K)中所指定的,刷新一个键将为该键加载一个新值,可能是异步操作的.在键值刷新时,仍然返回旧的值(如果有的话),不会阻塞.这与清除相反,清除的话会进行强制检索等待,直到重新加载该值然后返回.如果在刷新时抛出异常,则保留旧值,并记录并排除异常.CacheLoader可以通过覆盖CacheLoader来指定刷新时使用的特定方法 reload(K, V).

举一个简单的测试用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Test
public void testRefresh() throws ExecutionException, InterruptedException {
LoadingCache<String, String> graphs = CacheBuilder.newBuilder()
.maximumSize(10)
.removalListener((cache) -> {
log.info("key为{},value为{}的缓存被清除了...", cache.getKey(), cache.getValue());
})
.refreshAfterWrite(1, TimeUnit.SECONDS)
.build(
new CacheLoader<String, String>() {
public String load(String s) throws Exception {
log.info("load进行重新加载key为: " + s + " 的value");
return getRealCache(s);
}
public ListenableFuture<String> reload(String key, String value) {
log.info("reload进行重新加载key为: " + key + " 的value");
ListenableFutureTask<String> task = ListenableFutureTask.create(new Callable<String>() {
public String call() {
return "刷新方法异步执行了,key为: " + key;
}
});
new Thread(task).start();
return task;
}

});
String key1 = graphs.get("key1");
String key2 = graphs.get("key2");
Thread.sleep(2000);
String key11 = graphs.get("key1");// 此时符合缓存刷新条件,缓存会进行刷新
log.info(key1);
}

输出结果:

1
2
3
4
5
020-07-27 22:23:15.641 INFO [main] com.bestqiang.guava.cache.GuavaCacheStudyTest$1:36 say: load进行重新加载key为: key1 的value
2020-07-27 22:23:15.658 INFO [main] com.bestqiang.guava.cache.GuavaCacheStudyTest$1:36 say: load进行重新加载key为: key2 的value
2020-07-27 22:23:17.659 INFO [main] com.bestqiang.guava.cache.GuavaCacheStudyTest$1:40 say: reload进行重新加载key为: key1 的value
2020-07-27 22:23:17.877 INFO [main] com.bestqiang.guava.cache.GuavaCacheStudyTest:30 say: key为key1,value为key1:RealCache的缓存被清除了...
2020-07-27 22:23:17.881 INFO [main] com.bestqiang.guava.cache.GuavaCacheStudyTest:55 say: key1:RealCache

那么,我们在真实使用时用expire还是refresh呢?其实我们应该都用,这样,当缓存对符合刷新条件时,就不会盲目重置缓存对上的过期计时器,如果缓存对在符合刷新条件后没有查询,那么就让它过期。

2.3.统计:

CacheBuilder.recordStats()用来开启Guava Cache的统计功能。统计打开后Cache.stats()方法返回如下统计信息:

  • hitRate():缓存命中率;
  • hitMiss(): 缓存失误率;
  • loadcount() ; 加载次数;
  • averageLoadPenalty():加载新值的平均时间,单位为纳秒;
  • evictionCount():缓存项被回收的总数,不包括显式清除。

唯一值得注意的一点是:当通过asmap()方法查询key时,stat项是不作任何变化的,修改值时会有影响。此外,还有其他很多统计信息。这些统计信息对于调整缓存设置是至关重要的,在性能监控时可以依据的重要指标。

此外关于asmap还有几点注意事项:

您可以使用它的asMap视图将任何缓存看作是一个ConcurrentMap,但是asMap视图如何与缓存交互需要一些解释。

  • cache.asMap()包含当前在缓存中加载的所有条目。因此,例如,cache.asMap().keySet()包含所有当前加载的键。
  • asMap().get(key)本质上等效于cache.getIfPresent(key),并且从不导致值被加载。这与Map 合同一致。
  • 所有缓存读取和写入操作(包括Cache.asMap().get(Object)Cache.asMap().put(K, V))都会重置访问时间,但不会通过 containsKey(Object),也不会通过对的集合视图进行的操作来重置访问时间 Cache.asMap()。因此,例如,遍历 cache.asMap().entrySet()不会重置您检索的条目的访问时间。

3.ListenableFuture

3.1.为什么要用ListenableFuture?

官方文档这样介绍它:

并发是一个困难的问题,但是通过使用功能强大且简单的抽象可以大大简化并发。为了简化问题,Guava使用扩展Future了JDK 的 接口ListenableFuture

强烈建议您始终使用ListenableFuture而不是Future 在所有代码中使用,因为:

  • 大多数Futures方法都需要它。
  • 这比更改为ListenableFuture以后要容易。
  • 实用方法提供商将不再需要提供FutureListenableFuture他们的方法变种。

其实它就是jdk的Future的一个扩展,在Java8里,汲取了Guava ListenableFuture的优点,CompleteFuture类诞生了.ListenableFuture在项目用到了,在这里主要简单看一下如何去使用它,之后如果有新的需求,可以尝试使用CompleteFuture来完成.

3.2.ListenableFuture的学习和使用

ListenableFuture比原始的Future其实就是增加了一个回调操作,在执行完成后可以增加listener对执行进行监听,执行完毕后执行对应的方法,或者使用Futures.addCallback对不同的执行结果进行监听操作,根据执行结果的不同来进行不同的操作.

官方操作文档: https://github.com/google/guava/wiki/ListenableFutureExplained (操作相对全面,建议以此为准,但是测试用例有时不是很清晰)

对于这个工具方法的学习参考了博客: https://www.jianshu.com/p/9c57aa5e34af (说的比较通俗一点)

3.2.1.创建和使用ListenableFuture的方法

  • 使用create方法返回实例,这种就是简单的创建一个task,然后添加一个监听,当task执行完毕后监听执行后续方法,真实使用执行task的时候一定要用线程池,这里为了测试方便直接新开了一个线程执行task.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Test
public void test() {
//ListenableFutureTask通过静态create方法返回实例,还有一个重载方法,不太常用
ListenableFutureTask<String> task = ListenableFutureTask.create(new Callable<String>() {
@Override
public String call() throws Exception {
return "任务执行完毕";
}
});
//启动任务(当然,这里为了测试方便,真实使用要用线程池)
new Thread(task).start();
// 增加回调方法,MoreExecutors.directExecutor()返回guava默认的Executor,
// 执行回调方法不会新开线程,所有回调方法都在当前线程做(可能是主线程或者执行ListenableFutureTask的线程,具体可以看最后面的代码)。
// guava异步模块中参数有Executor的方法,一般还会有一个没有Executor参数的重载方法,使用的就是MoreExecutors.directExecutor()
task.addListener(new Runnable() {
@SneakyThrows
@Override
public void run() {
log.info(task.get());
log.info("回调方法执行");
}
}, MoreExecutors.directExecutor());

}

//MoreExecutors.directExecutor()源码,execute方法就是直接运行,没有新开线程
public static Executor directExecutor() {
return DirectExecutor.INSTANCE;
}

private enum DirectExecutor implements Executor {
INSTANCE;

@Override
public void execute(Runnable command) {
command.run();
}

@Override
public String toString() {
return "MoreExecutors.directExecutor()";
}
}

运行结果:

1
2
2020-07-28 10:54:33.350 INFO [main] com.bestqiang.guava.future.ListenableFutureStudyTest$2:61 say: 任务执行完毕
2020-07-28 10:54:33.356 INFO [main] com.bestqiang.guava.future.ListenableFutureStudyTest$2:62 say: 回调方法执行
  • 使用guava对ExecutorService的增强来创建线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//真正干活的线程池
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
5,
5,
0,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new CustomizableThreadFactory("demo"),
new ThreadPoolExecutor.DiscardPolicy());
//guava的接口ListeningExecutorService继承了jdk原生ExecutorService接口,重写了submit方法,修改返回值类型为ListenableFuture
ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(poolExecutor);

//获得一个随着jvm关闭而关闭的线程池,通过Runtime.getRuntime().addShutdownHook(hook)实现
//修改ThreadFactory为创建守护线程,默认jvm关闭时最多等待120秒关闭线程池,重载方法可以设置时间
ExecutorService newPoolExecutor = MoreExecutors.getExitingExecutorService(poolExecutor);

//只增加关闭线程池的钩子,不改变ThreadFactory
MoreExecutors.addDelayedShutdownHook(poolExecutor, 120, TimeUnit.SECONDS);

其实就是调用工具类将原生ExecutorService进行转换,线程池进行submit时,会返回ListenableFuture对象.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//像线程池提交任务,并得到ListenableFuture
ListenableFuture<String> listenableFuture = listeningExecutor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "";
}
});
//可以通过addListener对listenableFuture注册回调,但是通常使用Futures中的工具方法
Futures.addCallback(listenableFuture, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {

}

@Override
public void onFailure(Throwable t) {

}
});

/**
* Futures.addCallback源码,其实就是包装了一层addListener,可以不加executor参数,使用上文说的DirectExecutor
* 需要说明的是不加Executor的情况,只适用于轻型的回调方法,如果回调方法很耗时占资源,会造成线程阻塞
* 因为DirectExecutor有可能在主线程中执行回调
*/
public static <V> void addCallback(final ListenableFuture<V> future, final FutureCallback<? super V> callback, Executor executor) {
Preconditions.checkNotNull(callback);
Runnable callbackListener =
new Runnable() {
@Override
public void run() {
final V value;
try {
value = getDone(future);
} catch (ExecutionException e) {
callback.onFailure(e.getCause());
return;
} catch (RuntimeException e) {
callback.onFailure(e);
return;
} catch (Error e) {
callback.onFailure(e);
return;
}
callback.onSuccess(value);
}
};
future.addListener(callbackListener, executor);
}

3.2.2.异步操作链

使用ListenableFuture最重要的原因是,它可以拥有复杂的异步操作链,方便我们进行链式调用.

不用异步操作链,用addListener进行实现可以是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ListenableFutureTask<String> task1 = ListenableFutureTask.create(new Callable<String>() {
@Override
public String call() throws Exception {
return "";
}
});
new Thread(task1).start();
task1.addListener(new Runnable() {
@Override
public void run() {
ListenableFutureTask<String> task2 = ListenableFutureTask.create(new Callable<String>() {
@Override
public String call() throws Exception {
return "";
}
});
task2.addListener(new Runnable() {
@Override
public void run() {
...
}
}, MoreExecutors.directExecutor());
new Thread(task2).start();
}
}, MoreExecutors.directExecutor());

上述代码,其实就是一层监听器套一层,实现了链式调用.使用Guava的异步链式调用,可以用两个API简单的实现:

上述两个API的区别顾名思义就是一个异步链式调用一个同步链式调用.

还有两个API也比较常用,可以对ListenableFuture进行批量的处理:

  • allAsList(Iterable>) 对多个ListenableFuture的合并,返回一个当所有Future成功时返回多个Future返回值组成的List对象。注:当其中一个Future失败或者取消的时候,将会进入失败或者取消。

  • successfulAsList(Iterable>) 和allAsList相似,唯一差别是对于失败或取消的Future返回值用null代替。不会进入失败或者取消流程。

具体的链式调用代码可以参考下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//当task1执行完毕会回调执行Function的apply方法,如果有task1有异常抛出,则task2也抛出相同异常,不执行apply
ListenableFuture<String> task2 = Futures.transform(task1, new Function<String, String>() {
@Override
public String apply(String input) {
return "";
}
});
ListenableFuture<String> task3 = Futures.transform(task2, new Function<String, String>() {
@Override
public String apply(String input) {
return "";
}
});
//处理最终的异步任务
Futures.addCallback(task3, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {

}

@Override
public void onFailure(Throwable t) {

}
});

上述代码Futures.transform()和Futures.addCallback()都是对addListener做了封装,进行回调的设置,但是transform更适合用在链式处理的中间过程,addCallback更适合用在处理最终的结果上.另外,它们的参数都是可以带上线程池的.具体的使用根据情况来定吧.

4.结语

Guava的特性还有很多,这里主要结合官方文档,对Caches和ListenableFuture进行了学习,方便大家快速了解入门,建议大家学习时多去github看看官方文档,以官方文档为准,可以少走弯路.

-------------本文结束感谢您的阅读-------------
愿你所有幸运,都不期而遇;愿你所有美好,都如约而至。