标准的 Java 库不能提供足够的方法来操纵其核心类,所以 Apache Commons Lang 为我们提供了这些额外的方法
本文便介绍 Apache Commons Lang 中 concurrent 包的使用说明
引入 jar 包
JDK 版本需要大于等于 1.7
Maven
1 | <dependency> |
Gradle
1 | compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.7' |
包结构说明
接口摘要
Interface | Description |
---|---|
CircuitBreaker |
描述断路器 Circuit Breaker 的接口 |
Computable<I,O> | 为单个参数的计算提供定义一个包装接口,并返回一个结果 |
ConcurrentInitializer |
定义线程安全的初始化接口 |
类摘要
Class | Description |
---|---|
AbstractCircuitBreaker |
断路器基础类 |
AtomicInitializer |
ConcurrentInitializer 接口基于 AtomicReference 变量的实现类 |
AtomicSafeInitializer |
ConcurrentInitializer 接口基于 AtomicReference 变量的实现类,但是初始化方法 AtomicSafeInitializer.initialize() 只会执行一次 |
BackgroundInitializer |
ConcurrentInitializer 接口实现类,通过后台线程完成初始化 |
BasicThreadFactory | ThreadFactory 接口实现类,提供一些配置获取方法 |
BasicThreadFactory.Builder | BasicThreadFactory 的 Builder 模式实例 |
CallableBackgroundInitializer |
BackgroundInitializer 实现类,通过传入 Callable 进行实现 |
ConcurrentUtils | 提供 java.util.concurrent 包相关功能的工具类 |
ConstantInitializer |
ConcurrentInitializer 接口实现类,直接返回构造器中传入的参数 |
EventCountCircuitBreaker | 计算特定事件的 Circuit Breaker 断路器模式的简单实现 |
LazyInitializer |
ConcurrentInitializer 接口实现类,通过双重检查锁进行实现 |
Memoizer<I,O> | 为单个参数的计算定义一个包装的接口,并返回一个结果 |
MultiBackgroundInitializer | BackgroundInitializer 实现类,可以在后台进行多个初始化 |
MultiBackgroundInitializer.MultiBackgroundInitializerResults | 一个数据类,用于存储MultiBackgroundInitializer 初始化后的结果 |
ThresholdCircuitBreaker | 通过给定阈值开启 Circuit Breaker 断路器模式的简单实现 |
TimedSemaphore | 一个专门的信号量实现,在给定的时间内提供许可 |
枚举摘要
Enum | Description |
---|---|
AbstractCircuitBreaker.State | 表示断路器不同状态的内部枚举 |
异常摘要
Exception | Description |
---|---|
CircuitBreakingException | 用于报告与 Circuit Breaker 断路器相关的运行时错误条件的异常类 |
ConcurrentException | 用于报告与访问后台任务数据相关的错误条件的异常类 |
ConcurrentRuntimeException | 用于报告与访问后台任务数据有关的运行时错误条件的异常类 |
使用说明
CircuitBreaker 及其子类
类层次结构
类详细介绍
CircuitBreaker
描述断路器的接口
一个断路器可用来防止不可靠的服务或意外负载的应用程序。 它通常监视特定的资源。 只要这个资源按照预期工作,它就处于关闭状态,这意味着资源可以被使用。 如果使用该资源时遇到问题时,断路器可以切换到打开状态 。 那么访问这个资源是被禁止的。 根据具体的实施方式,当资源再次可用时,断路器可能切换到关闭状态。
该接口定义了断路器组件的通用协议
AbstractCircuitBreaker
断路器的基础类,实现了 CircuitBreaker
大部分接口
EventCountCircuitBreaker
计算特定事件的断路器模式的简单实现
一个断路器可用来防止不稳定的服务或遭遇意外高峰负载的应用程序。 新创建的EventCountCircuitBreaker
对象最初处于关闭状态,意味着没有检测到任何问题。 当应用程序遇到特定事件(如错误或服务超时)时,它会通知断路器增加一个内部计数器。 如果在一个特定的时间间隔中报告的事件的数量超过配置的阈值时,断路器将被打开。 这意味着相关的子系统有问题; 应用程序不应该再使用它,而是给它一点时间让它安顿下来。 如果接收的事件数量低于阈值,断路器可以配置为在一定的时间之后切换回关闭状态。
当一个EventCountCircuitBreaker
对象被构造时,可以提供下列参数:
- 导致状态转换为打开的事件数量的阈值 。 如果在检查间隔内收到事件的数量等于阈值,则断路器将切换到打开状态。
- 检查断路器是否打开的时间间隔。 所以可以指定 「如果在一分钟内遇到 10 个以上的错误,断路器应该打开」。
- 自动关闭断路器的阈值。 如 「如果请求数量降低到每分钟 100 次,断路器应该再次自动关闭」。根据使用情况,关闭断路器的门槛略低于打开门槛,以避免在收到的事件数量接近门槛时连续转换。
构造方法如下
1 | // threshold 改变断路器状态的阈值; 如果在检查间隔内收到的事件数量大于此值,则断路器打开; 如果它低于这个值,它会再次关闭 |
这个类支持以下典型用例:
防止负载高峰
想象一下你有一个服务器可以每分钟处理一定数量的请求。 突然之间,请求数量显著增加,可能是遭到 DDoS(拒绝服务攻击)。 EventCountCircuitBreaker
可以配置为在检测到突然的高峰负载时停止应用程序处理请求,并在事情平静时再开始请求处理。 以下代码片段显示了这种情况的典型示例。 这里EventCountCircuitBreaker
在断路器打开之前允许每分钟最多 1000 个请求。 当负载再次下降到每秒 800 个请求时,它会切换回关闭状态:
1 | EventCountCircuitBreaker breaker = new EventCountCircuitBreaker(1000, 1, TimeUnit.MINUTE, 800); |
处理不稳定的服务
假如应用程序是一个不稳定的第三方服务。 如果错误太多,服务应被视为关闭,停止服务一段时间。 可以使用以下方式来实现,在这个具体的例子中,我们在 2 分钟内允许 5 个错误; 如果达到这个限制,服务会有 10 分钟的休息时间:
1 | EventCountCircuitBreaker breaker = new EventCountCircuitBreaker(5, 2, TimeUnit.MINUTE, 5, 10, TimeUnit.MINUTE); |
除了自动状态转换,断路器的状态可以使用 open()
和 close()
方法进行手动更改。 同时也可以使用 addChangeListener(final PropertyChangeListener listener)
方法注册监听器,当发生状态转换时,可以得到事件改变对象 PropertyChangeEvent
,此时可以根据状态更改的情况做出反应
实现说明:
- 该实现使用非阻塞算法来更新内部计数器和状态。 如果没有太多的竞争,这应该是非常有效的。
- 这个实现并不打算在非常短的检查间隔中作为高精度定时器来运行。 故意保持简单,以避免复杂和耗时的状态检查。 它应该在几秒到几分钟甚至更长的时间内运行良好。 如果时间间隔太短,可能会因竞争状态导致虚假的状态转换。
- 检查间隔的处理有点简单。 因此,不能保证断路器在特定的时间点被触发; 可能会有一些延迟(小于检查间隔)。
ThresholdCircuitBreaker
如果请求的次数大于给定阈值,则会打开 Circuit Breaker 模式的简单实现。
它包含一个从零开始的内部计数器,每次调用将计数器增加一个给定的数量。 如果阈值是零,断路器将永远处于打开状态。
一个内存断路器示例
1 | long threshold = 10L; |
Computable<I,O> 及其子类
类层次结构
- Computable<I,O>
- Memoizer<I,O>
类详细介绍
Computable<I,O>
为单个参数的计算定义一个包装的接口,并返回一个结果。
Memoizer<I,O>
为单个参数的计算定义一个包装的接口,并返回一个结果。 计算结果将被缓存
这不是一个全功能的缓存,一旦生成结果,就无法限制或删除结果。 但是,如果在上一次计算过程中抛出错误,则可以通过在构造方法中设置一个选项来实现重新生成给定参数的结果。 如果没有设置或设置为 false,将抛出缓存的异常
源码分析
1 | public class Memoizer<I, O> implements Computable<I, O> { |
ConcurrentInitializer 及其子类
类层次结构
类详细介绍
ConcurrentInitializer 及其子类主要是用来进行数据安全初始化的,开头的 Concurrent 表示支持并发操作
它只定义了一个接口
1 | public interface ConcurrentInitializer<T> { |
调用 get()
方法即可获取完成初始化的对象.同时提供了多种实现,可根据需要进行选择
ConstantInitializer
功能描述
ConstantInitializer 是最简单的 ConcurrentInitializer 实现
在构造器中传入一个对象,在它的 get()
方法中直接返回这个对象.适用于单元测试及替换其他实现类
源码分析
1 | public class ConstantInitializer<T> implements ConcurrentInitializer<T> { |
代码示例
1 | ConcurrentInitializer<String> initializer = new ConstantInitializer<>("test"); |
LazyInitializer
功能描述
顾名思义,使用懒加载的方式进行初始化,只有在第一次调用时才进行初始化操作,通过实现 initialize
方法返回初始化后的对象
源码分析
使用双重检查锁进行实现
1 | public abstract class LazyInitializer<T> implements ConcurrentInitializer<T> { |
代码示例
1 | ConcurrentInitializer<Properties> initializer = new LazyInitializer<Properties>() { |
AtomicInitializer
功能描述
与 LazyInitializer 功能,用法一致,不过实现方式不同,同时其 initialize
方法会存在调用多次的问题
源码分析
使用 CAS 方式进行实现
1 | public abstract class AtomicInitializer<T> implements ConcurrentInitializer<T> { |
代码示例
1 | ConcurrentInitializer<Properties> initializer = new AtomicInitializer<Properties>() { |
AtomicSafeInitializer
功能描述
AtomicInitializer 存在一个 initialize
执行多次的问题.而 AtomicSafeInitializer 则是为了解决这个问题而诞生,使用它,initialize
只会被执行一次
源码分析
1 | public abstract class AtomicSafeInitializer<T> implements |
代码示例
1 | ConcurrentInitializer<Properties> initializer = new AtomicSafeInitializer<Properties>() { |
BackgroundInitializer
功能描述
使用后台线程进行初始化,在创建对象后需要先执行 start()
保证后台线程运行,再调用 get()
方法
默认Executors.newFixedThreadPool(1)
创建一个线程数量为 1 的线程池,在 initialize
方法执行完成之后对线程池进行关闭
同时提供一个 protected BackgroundInitializer(final ExecutorService exec) {}
构造器方法,此时使用传入的 ExecutorService
对象执行 initialize
方法,执行完成后不会该关闭线程池
源码分析
1 | public abstract class BackgroundInitializer<T> implements |
代码示例
1 | BackgroundInitializer<Properties> initializer = new BackgroundInitializer<Properties>() { |
CallableBackgroundInitializer
功能描述
BackgroundInitializer 的子类,使用方式为在构造器中传入一个 Callable
对象,在初始化时调用该参数的 call()
方法
源码分析
1 | public class CallableBackgroundInitializer<T> extends BackgroundInitializer<T> { |
代码示例
1 | BackgroundInitializer<Properties> initializer = new CallableBackgroundInitializer<>(new Callable<Properties>() { |
MultiBackgroundInitializer
功能描述
BackgroundInitializer 多后台任务的实现,通过该类,可以并行执行多个 BackgroundInitializer
源码分析
1 | public class MultiBackgroundInitializer |
代码示例
1 | MultiBackgroundInitializer initializer = new MultiBackgroundInitializer(); |
使用选择
如果初始化程序执行时间过长且不希望在第一次调用时等待太久请选择 BackgroundInitializer
与 CallableBackgroundInitializer
否则使用 LazyInitializer
与 AtomicSafeInitializer
BasicThreadFactory
ThreadFactory
接口的实现,,提供一些配置获取方法
ThreadFactory
用于 ExecutorService
创建执行任务的线程。 在许多情况下,用户并不关心,因为 ExecutorService
会使用一个默认的 ThreadFactory
。 但是,如果对线程有特殊要求,则必须创建一个自定义的ThreadFactory
这个类为它创建的线程提供了一些经常需要的配置选项,这些是:
namingPattern
线程的名称模式。 如果希望日志输出或异常跟踪更容易阅读,可以为线程起一个有意义的名称。 在线程命名时使用thread.setName(String.format(namingPattern, count));
方式,count
即线程池当前创建的线程数量,从 1 开始。namingPattern
中的%d
会被替换为count
。 如 :namingPattern
为"My %d. worker thread"
,那么生成的线程名称为"My 1. worker thread"
,"My 2. worker thread"
daemonFlag
守护线程标志。 该工厂创建的线程是否是守护线程。 这会影响当前 Java 应用程序的退出行为,当正在运行的线程都是守护线程时,Java 虚拟机将退出,默认为 false ,即用户线程priority
线程的优先级.Integer
类型,值区间为 [1,10]uncaughtExceptionHandler
线程由于未捕获到异常而突然终止时调用的处理程序。 如果线程内发生未捕获的异常,则调用此处理程序
BasicThreadFactory
不是直接创建实例,而是使用内部类 Builder
类实现此目的,使用 Builder
只需要设置应用程序感兴趣的配置选项。 以下示例显示了 BasicThreadFactory
是如何创建并配置在在ExecutorService
中:
1 | // 创建一个线程工厂,配置名称模式,守护线程标志,优先级 |
ConcurrentUtils
提供 java.util.concurrent
包相关功能的工具方法
该类涉及到 ConcurrentException
的方法存在一个 原方法名+Unchecked
的相同功能方法,用于将受检异常 ConcurrentException
转换为运行时异常 ConcurrentRuntimeException
用法如下
1 | Future<Object> future = ...; |
方法列表
Modifier and Type | Method and Description |
---|---|
static <T> Future<T> |
constantFuture(T value) 创建一个已完成的 Future ,该 Future 的返回值及参数中的 T value |
static <K,V> V |
createIfAbsent(ConcurrentMap<K,V> map, K key, ConcurrentInitializer<V> init) 检查 ConcurrentMap 是否包含 key ,如果未包含,为 ConcurrentMap 设置 key : init.get() |
static <K,V> V |
createIfAbsentUnchecked(ConcurrentMap<K,V> map, K key, ConcurrentInitializer<V> init) createIfAbsent 的非受检异常版本 |
static ConcurrentException |
extractCause(ExecutionException ex) ExecutionException 异常原因转换, RuntimeException 与 Error 进行抛出,否则转换为受检异常 ConcurrentException 并返回 |
static ConcurrentRuntimeException |
extractCauseUnchecked(ExecutionException ex) extractCause 的非受检异常版本 |
static void |
handleCause(ExecutionException ex) ExecutionException 异常处理,RuntimeException 与 Error 进行抛出,否则转换为 ConcurrentException 在进行抛出 |
static void |
handleCauseUnchecked(ExecutionException ex) handleCauseUnchecked 的非受检异常版本 |
static <T> T |
initialize(ConcurrentInitializer<T> initializer) 获取 initializer 的初始化返回值 |
static <T> T |
initializeUnchecked(ConcurrentInitializer<T> initializer) initialize 的非受检异常版本 |
static <K,V> V |
putIfAbsent(ConcurrentMap<K,V> map, K key, V value) 检查 ConcurrentMap 是否包含 key ,如果未包含,为 ConcurrentMap 设置 key : value |
TimedSemaphore
一个专门的信号量实现,在给定的时间内提供许可,到期自动释放
该类的功能与 java.util.concurrent.Semaphore
有点类似,通过调用 acquire()
方法获取许可,但是没有提供 release()
方法进行许可释放,因为它会在时间到期后释放所有的许可
如果在许可已经用尽的情况下调用acquire()
,那么当前线程会一直阻塞,直到时间到期,所有许可被释放.此时再重新获取许可.这意味着可以在规定的时间范围内,限制给定操作的次数
用法示例
假如存在一个通过后台线程查询数据库以进行收集统计信息的应用程序.这种后台处理不应该对数据库产生太多负载,防止影响系统的功能和性能.所以可以使用 TimedSemaphore
来限制该线程每秒只能发出一定数量的数据库查询
执行查询的线程类伪代码可能如下
1 | public class StatisticsThread extends Thread { |
下面的代码片段显示了如何创建一个每秒只允许 10 次的 TimedSemaphore
并传递给统计线程 StatisticsThread
1 | TimedSemaphore sem = new TimedSemaphore(1, TimeUnit.SECONDS, 10); |
构造方法如下
1 | // timePeriod 时间段 |
所以 new TimedSemaphore(1, TimeUnit.SECOND, 10);
含义是在 timePeriod(1) * timeUnit(TimeUnit.SECOND) = 1秒
的时间内只发放 limit(10)
个许可
在使用时需要在限制操作前调用 acquire()
方法。 TimedSemaphore
会统计调用 acquire()
的次数,并在许可达到上限后阻塞当前线程,直到时间周期结束后释放所有许可,此时再进行许可获取
另外提供 tryAcquire()
方法,该方法尝试获取许可,如果获取成功,返回true
,否者返回false
,使用这种方法不会造成当前线程的阻塞
同时在运行过程中你可以随时调用 setLimit(final int limit)
修改许可数量。 如果一个操作的次数限制需要动态调整,比如白天减少许可,晚上增加许可。 如果设置的许可小于原本许可,那么会立即生效,但是设置的许可大于原来的许可,阻塞的线程依然阻塞,直到所有许可释放,阻塞的线程被唤醒.此时按照新设置的许可进行处理.如果许可数量设置小于等于 0,那么 acquire()
操作不会阻塞,直接放行
当TimedSemaphore
不需要时,可以调用 shutdown()
方法,该方法会取消释放所有许可的周期任务,如果执行周期任务的 ScheduledExecutorService
不是外部传入,同时也会结束该线程池