diff --git a/README.md b/README.md index f220758..e9aa90e 100644 --- a/README.md +++ b/README.md @@ -128,6 +128,7 @@ - 努力编写中... ## 番外篇(JDK 1.8) +- [Executor 线程池组件](docs/JDK/Executor线程池组件.md) - [HashMap 源码赏析]() - [ConcurrentHashMap 源码赏析]() - [String 源码赏析]() diff --git a/docs/JDK/Executor线程池组件.md b/docs/JDK/Executor线程池组件.md new file mode 100644 index 0000000..02df792 --- /dev/null +++ b/docs/JDK/Executor线程池组件.md @@ -0,0 +1,252 @@ +## 线程池核心组件图解 +看源码之前,先了解一下该组件 最主要的几个 接口、抽象类和实现类的结构关系。 + +![avatar](/images/JDK1.8/线程池组件类图.png) + +该组件中,Executor 和 ExecutorService接口 定义了线程池最核心的几个方法,提交任务submit +()、关闭线程池shutdown()。抽象类 AbstractExecutorService 主要对公共行为 submit()系列方法进行了实现,这些 submit()方法 的实现使用了 模板方法模式,其中调用的 execute()方法 是未实现的 来自 Executor接口 的方法。实现类 ThreadPoolExecutor 则对线程池进行了具体而复杂的实现。 + +另外还有一个常见的工具类 Executors,里面为开发者封装了一些可以直接拿来用的线程池。 + +## 源码赏析 +话不多说,直接上源码。(这里只看最主要的代码部分) + +### Executor 和 ExecutorService接口 +```java +public interface Executor { + + /** + * 在将来的某个时间执行给定的 Runnable。该 Runnable 可以在新线程、池线程或调用线程中执行。 + */ + void execute(Runnable command); +} + +public interface ExecutorService extends Executor { + + /** + * 优雅关闭,该关闭会继续执行完以前提交的任务,但不再接受新任务。 + */ + void shutdown(); + + /** + * 提交一个有返回值的任务,并返回该任务的 未来执行完成后的结果。 + * Future的 get()方法 将在成功完成后返回任务的结果。 + */ + Future submit(Callable task); + + Future submit(Runnable task, T result); + + Future submit(Runnable task); +} +``` +### AbstractExecutorService 抽象类 +```java +/** + * 该抽象类最主要的内容就是,实现了 ExecutorService 中的 submit()系列方法 + */ +public abstract class AbstractExecutorService implements ExecutorService { + + /** + * 提交任务 进行执行,返回获取未来结果的 Future对象。 + * 这里使用了 “模板方法模式”,execute()方法来自 Executor接口,该抽象类中并未进行实现, + * 而是交由子类具体实现。 + */ + public Future submit(Runnable task) { + if (task == null) throw new NullPointerException(); + RunnableFuture ftask = newTaskFor(task, null); + execute(ftask); + return ftask; + } + + public Future submit(Runnable task, T result) { + if (task == null) throw new NullPointerException(); + RunnableFuture ftask = newTaskFor(task, result); + execute(ftask); + return ftask; + } + + public Future submit(Callable task) { + if (task == null) throw new NullPointerException(); + RunnableFuture ftask = newTaskFor(task); + execute(ftask); + return ftask; + } +} +``` + +### ThreadPoolExecutor +```java +public class ThreadPoolExecutor extends AbstractExecutorService { + + /** + * ************** + * ** 主要属性 ** + * ************** + */ + + /** 阻塞队列 */ + private final BlockingQueue workQueue; + + /** 用于创建线程的 线程工厂 */ + private volatile ThreadFactory threadFactory; + + /** 核心线程数 */ + private volatile int corePoolSize; + + /** 最大线程数 */ + private volatile int maximumPoolSize; + + + /** + * ************** + * ** 构造方法 ** + * ************** + */ + + /** 最后都使用了最后一个构造方法的实现 */ + public ThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + Executors.defaultThreadFactory(), defaultHandler); + } + + public ThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + threadFactory, defaultHandler); + } + + public ThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + RejectedExecutionHandler handler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + Executors.defaultThreadFactory(), handler); + } + + public ThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + if (corePoolSize < 0 || + maximumPoolSize <= 0 || + maximumPoolSize < corePoolSize || + keepAliveTime < 0) + throw new IllegalArgumentException(); + if (workQueue == null || threadFactory == null || handler == null) + throw new NullPointerException(); + this.corePoolSize = corePoolSize; + this.maximumPoolSize = maximumPoolSize; + this.workQueue = workQueue; + this.keepAliveTime = unit.toNanos(keepAliveTime); + this.threadFactory = threadFactory; + this.handler = handler; + } + + /** + * ************** + * ** 主要实现 ** + * ************** + */ + + /** 执行 Runnable任务 */ + public void execute(Runnable command) { + if (command == null) + throw new NullPointerException(); + /* + * 分三步进行: + * + * 1、如果运行的线程少于 corePoolSize,尝试开启一个新的线程;否则尝试进入工作队列 + * + * 2. 如果工作队列没满,则进入工作队列;否则 判断是否超出最大线程数 + * + * 3. 如果未超出最大线程数,则尝试开启一个新的线程;否则 按饱和策略处理无法执行的任务 + */ + int c = ctl.get(); + if (workerCountOf(c) < corePoolSize) { + if (addWorker(command, true)) + return; + c = ctl.get(); + } + if (isRunning(c) && workQueue.offer(command)) { + int recheck = ctl.get(); + if (! isRunning(recheck) && remove(command)) + reject(command); + else if (workerCountOf(recheck) == 0) + addWorker(null, false); + } + else if (!addWorker(command, false)) + reject(command); + } + + /** + * 优雅关闭,在其中执行以前提交的任务,但不接受新任务。如果已关闭,则调用没有其他效果。 + */ + public void shutdown() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + checkShutdownAccess(); + advanceRunState(SHUTDOWN); + interruptIdleWorkers(); + onShutdown(); // hook for ScheduledThreadPoolExecutor + } finally { + mainLock.unlock(); + } + tryTerminate(); + } +} +``` +ThreadPoolExecutor 中的 execute()方法 执行 Runnable任务 的流程逻辑可以用下图表示。 + +![avatar](/images/ConcurrentProgramming/线程池流程.png) + +### 工具类 Executors +看类名也知道,它最主要的作用就是提供 static 的工具方法,为开发者提供各种封装好的 具有各自特性的线程池。 +```java +public class Executors { + + /** + * 创建一个固定线程数量的线程池 + */ + public static ExecutorService newFixedThreadPool(int nThreads) { + return new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); + } + + /** + * 创建一个单线程的线程池 + */ + public static ExecutorService newSingleThreadExecutor() { + return new FinalizableDelegatedExecutorService + (new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue())); + } + + /** + * 创建一个缓存的,可动态伸缩的线程池。 + * 可以看出来:核心线程数为0,最大线程数为Integer.MAX_VALUE,如果任务数在某一瞬间暴涨, + * 这个线程池很可能会把 服务器撑爆。 + * 另外需要注意的是,它们底层都是使用了 ThreadPoolExecutor,只不过帮我们配好了参数 + */ + public static ExecutorService newCachedThreadPool() { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue()); + } +} +``` \ No newline at end of file diff --git a/docs/Mybatis/基础支持层/2、DataSource及Transaction模块.md b/docs/Mybatis/基础支持层/2、DataSource及Transaction模块.md index a14c52e..133ce0a 100644 --- a/docs/Mybatis/基础支持层/2、DataSource及Transaction模块.md +++ b/docs/Mybatis/基础支持层/2、DataSource及Transaction模块.md @@ -168,7 +168,7 @@ public class UnpooledDataSource implements DataSource { 数据库连接池的设计思路一般为: 1. 连接池初始化时创建一定数量的连接,并添加到连接池中备用; 2. 当程序需要使用数据库连接时,从连接池中请求,用完后会将其返还给连接池,而不是直接关闭; -3. 连接池会控制总连接上限及空闲连接上线,如果连接池中的连接总数已达上限,且都被占用,后续的连接请求会进入阻塞队列等待,直到有连接可用; +3. 连接池会控制总连接上限及空闲连接上线,如果连接池中的连接总数已达上限,且都被占用,后续的连接请求会短暂阻塞后重新尝试获取连接,如此循环,直到有连接可用; 4. 如果连接池中空闲连接较多,已达到空闲连接上限,则返回的连接会被关闭掉,以降低系统开销。 PooledDataSource 实现了简易的数据库连接池功能,其创建数据库连接的功能依赖了上面的 UnpooledDataSource。 diff --git a/docs/Spring/SpringTransaction/Spring事务管理器的设计与实现.md b/docs/Spring/SpringTransaction/Spring事务管理器的设计与实现.md index 9537a32..93d6daa 100644 --- a/docs/Spring/SpringTransaction/Spring事务管理器的设计与实现.md +++ b/docs/Spring/SpringTransaction/Spring事务管理器的设计与实现.md @@ -12,129 +12,129 @@ 上面介绍了使用 DataSourceTransactionManager 实现事务创建、提交和回滚的过程,基本上与单独使用 Connection 实现事务处理是一样的,也是通过设置 autoCommit属性,调用 Connection 的 commit() 和 rollback()方法 来完成的。而我们在声明式事务处理中看到的那些事务处理属性,并不在 DataSourceTransactionManager 中完成,这和我们在前面分析中看到的是一致的。 -![avatar](/images/springTransaction/PlatformTransactionManager组件的设计.png) +![avatar](/images/springTransaction/实现DataSourceTransactionManager的时序图.png) ```java public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean { - /** 持有 javax.sql.DataSource对象 */ - private DataSource dataSource; - - /** - * 这里是产生 Transaction对象 的地方,为 Transaction 的创建提供服务,对数据库而言, - * 事务工作是由 Connection 来完成的。这里把数据库的 Connection对象 放到了 ConnectionHolder 中, - * 然后封装到一个 DataSourceTransactionObject对象 中,在这个封装过程中增加了许多为事务处理服务的 - * 控制数据 - */ - @Override - protected Object doGetTransaction() { - DataSourceTransactionObject txObject = new DataSourceTransactionObject(); - txObject.setSavepointAllowed(isNestedTransactionAllowed()); - // 获取与当前线程绑定的 数据库Connection,这个 Connection 在第一个事务开始 - // 的地方与线程绑定 - ConnectionHolder conHolder = - (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource); - txObject.setConnectionHolder(conHolder, false); - return txObject; - } - - /** - * 判断是否存在活跃的事务,由 ConnectionHolder 的 transactionActive属性 来控制 - */ - @Override - protected boolean isExistingTransaction(Object transaction) { - DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; - return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive()); - } - - /** - * 这里是处理事务开始的地方,在这里设置隔离级别,但忽略超时 - */ - @Override - protected void doBegin(Object transaction, TransactionDefinition definition) { - DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; - Connection con = null; - - try { - if (txObject.getConnectionHolder() == null || - txObject.getConnectionHolder().isSynchronizedWithTransaction()) { - Connection newCon = this.dataSource.getConnection(); - if (logger.isDebugEnabled()) { - logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); - } - txObject.setConnectionHolder(new ConnectionHolder(newCon), true); - } - - txObject.getConnectionHolder().setSynchronizedWithTransaction(true); - con = txObject.getConnectionHolder().getConnection(); - - Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); - txObject.setPreviousIsolationLevel(previousIsolationLevel); - - // 这里是 数据库Connection 完成事务处理的重要配置,需要把 autoCommit属性 关掉 - if (con.getAutoCommit()) { - txObject.setMustRestoreAutoCommit(true); - if (logger.isDebugEnabled()) { - logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); - } - con.setAutoCommit(false); - } - txObject.getConnectionHolder().setTransactionActive(true); - - int timeout = determineTimeout(definition); - if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { - txObject.getConnectionHolder().setTimeoutInSeconds(timeout); - } - - // 把当前的 数据库Connection 与线程绑定 - if (txObject.isNewConnectionHolder()) { - TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); - } - } - - catch (Throwable ex) { - DataSourceUtils.releaseConnection(con, this.dataSource); - throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); - } - } - - /** - * 事务提交的具体实现 - */ - @Override - protected void doCommit(DefaultTransactionStatus status) { - // 取得 Connection 以后,通过Connection 进行提交 - DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); - Connection con = txObject.getConnectionHolder().getConnection(); - if (status.isDebug()) { - logger.debug("Committing JDBC transaction on Connection [" + con + "]"); - } - try { - con.commit(); - } - catch (SQLException ex) { - throw new TransactionSystemException("Could not commit JDBC transaction", ex); - } - } - - /** - * 事务提交的具体实现,通过 Connection对象 的 rollback()方法 实现 - */ - @Override - protected void doRollback(DefaultTransactionStatus status) { - DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); - Connection con = txObject.getConnectionHolder().getConnection(); - if (status.isDebug()) { - logger.debug("Rolling back JDBC transaction on Connection [" + con + "]"); - } - try { - con.rollback(); - } - catch (SQLException ex) { - throw new TransactionSystemException("Could not roll back JDBC transaction", ex); - } - } + /** 持有 javax.sql.DataSource对象 */ + private DataSource dataSource; + + /** + * 这里是产生 Transaction对象 的地方,为 Transaction 的创建提供服务,对数据库而言, + * 事务工作是由 Connection 来完成的。这里把数据库的 Connection对象 放到了 ConnectionHolder 中, + * 然后封装到一个 DataSourceTransactionObject对象 中,在这个封装过程中增加了许多为事务处理服务的 + * 控制数据 + */ + @Override + protected Object doGetTransaction() { + DataSourceTransactionObject txObject = new DataSourceTransactionObject(); + txObject.setSavepointAllowed(isNestedTransactionAllowed()); + // 获取与当前线程绑定的 数据库Connection,这个 Connection 在第一个事务开始 + // 的地方与线程绑定 + ConnectionHolder conHolder = + (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource); + txObject.setConnectionHolder(conHolder, false); + return txObject; + } + + /** + * 判断是否存在活跃的事务,由 ConnectionHolder 的 transactionActive属性 来控制 + */ + @Override + protected boolean isExistingTransaction(Object transaction) { + DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; + return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive()); + } + + /** + * 这里是处理事务开始的地方,在这里设置隔离级别,但忽略超时 + */ + @Override + protected void doBegin(Object transaction, TransactionDefinition definition) { + DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; + Connection con = null; + + try { + if (txObject.getConnectionHolder() == null || + txObject.getConnectionHolder().isSynchronizedWithTransaction()) { + Connection newCon = this.dataSource.getConnection(); + if (logger.isDebugEnabled()) { + logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); + } + txObject.setConnectionHolder(new ConnectionHolder(newCon), true); + } + + txObject.getConnectionHolder().setSynchronizedWithTransaction(true); + con = txObject.getConnectionHolder().getConnection(); + + Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); + txObject.setPreviousIsolationLevel(previousIsolationLevel); + + // 这里是 数据库Connection 完成事务处理的重要配置,需要把 autoCommit属性 关掉 + if (con.getAutoCommit()) { + txObject.setMustRestoreAutoCommit(true); + if (logger.isDebugEnabled()) { + logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); + } + con.setAutoCommit(false); + } + txObject.getConnectionHolder().setTransactionActive(true); + + int timeout = determineTimeout(definition); + if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { + txObject.getConnectionHolder().setTimeoutInSeconds(timeout); + } + + // 把当前的 数据库Connection 与线程绑定 + if (txObject.isNewConnectionHolder()) { + TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); + } + } + + catch (Throwable ex) { + DataSourceUtils.releaseConnection(con, this.dataSource); + throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); + } + } + + /** + * 事务提交的具体实现 + */ + @Override + protected void doCommit(DefaultTransactionStatus status) { + // 取得 Connection 以后,通过Connection 进行提交 + DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); + Connection con = txObject.getConnectionHolder().getConnection(); + if (status.isDebug()) { + logger.debug("Committing JDBC transaction on Connection [" + con + "]"); + } + try { + con.commit(); + } + catch (SQLException ex) { + throw new TransactionSystemException("Could not commit JDBC transaction", ex); + } + } + + /** + * 事务提交的具体实现,通过 Connection对象 的 rollback()方法 实现 + */ + @Override + protected void doRollback(DefaultTransactionStatus status) { + DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); + Connection con = txObject.getConnectionHolder().getConnection(); + if (status.isDebug()) { + logger.debug("Rolling back JDBC transaction on Connection [" + con + "]"); + } + try { + con.rollback(); + } + catch (SQLException ex) { + throw new TransactionSystemException("Could not roll back JDBC transaction", ex); + } + } } ``` 上面介绍了使用 DataSourceTransactionManager 实现事务创建、提交和回滚的过程,基本上与单独使用 Connection 实现事务处理是一样的,也是通过设置 autoCommit属性,调用 Connection 的 commit() 和 rollback()方法 来完成的。看到这里,大家一定会觉得非常的熟悉。而我们在声明式事务处理中看到的那些事务处理属性,并不在 DataSourceTransactionManager 中完成,这和我们在前面分析中看到的是一致的。 diff --git a/images/JDK1.8/线程池组件类图.png b/images/JDK1.8/线程池组件类图.png new file mode 100644 index 0000000..b99079b Binary files /dev/null and b/images/JDK1.8/线程池组件类图.png differ