MOOC 个人学习笔记
# 1. 并发框架 Executor
- JDK 5 开始提供 Executor FrameWork (java.util.concurrent.*)
- 分离任务的创建和执行者的创建
- 线程重复利用 (new 线程代价很大)
# 共享线程池
- 预设好的多个 Thread,可弹性增加
- 多次执行很多很小的任务
- 任务创建和执行过程解耦
- 程序员无需关心线程池执行任务过程
# 主要类
- Executors.newCachedThreadPool/newFixedThreadPool 创建线程池
- ExecutorService 线程池服务
- Callable 具体的逻辑对象 (线程类)
- (Runnable 的 run 方法没有返回值,而 Callable 的 call 方法可以有返回值)
- Future 返回结果
// 创建可变线程池 | |
private ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool(); | |
// 创建固定线程池 | |
private ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(5); | |
// 关闭线程池 | |
executor.shutdown(); | |
// 执行任务,无返回值 | |
executor.execute(task); | |
// 执行任务,有返回值 | |
Future<Integer> result=executor.submit(task); | |
/* | |
public class Task implements Callable<Integer> { | |
@Override | |
public Integer call() throws Exception { | |
return null; | |
} | |
} | |
*/ | |
// 线程池基础信息 | |
System.out.println(executor.getPoolSize()); | |
System.out.println(executor.getActiveCount()); | |
System.out.println(executor.getCompletedTaskCount()); | |
//Future 返回类基本操作 | |
result.isDone() // 判断任务是否完成 | |
int sum=result.get(); // 获得结果 |
# 2. 并发框架 Fork-join
- Java 7 提供另一种并行框架:分解、治理、合并 (分治编程)
- 适合用于整体任务量不好确定的场合 (最小任务可确定)
# 主要类
- ForkJoinPool 任务池
- RecursiveAction
- RecursiveTask
// 创建执行线程池 | |
ForkJoinPool pool = new ForkJoinPool(); | |
ForkJoinPool pool = new ForkJoinPool(4); // 固定线程池 | |
// 创建任务 | |
Task task = new Task(1, 10000000); | |
/* | |
public class Task extends RecursiveTask<Integer> { | |
@Override | |
protected Integer compute () { | |
// 任务足够小,则直接执行 | |
//TODO | |
// 任务大于阈值,分裂为 2 个任务 | |
invokeAll (subTask1,subTask2); | |
Integer sum1 = subTask1.join (); | |
Integer sum2 = subTask2.join (); | |
// 结果合并 | |
sum = sum1 + sum2; | |
return sum; | |
} | |
} | |
*/ | |
// 提交任务 | |
ForkJoinTask<Integer> result = pool.submit(task); | |
// 输出结果 | |
System.out.println(result.get().toString()); |
# 3. 并发数据结构
- 常用的数据结构是线程不安全的
- ArrayList, HashMap, HashSet 非同步的
- 多个线程同时读写,可能会抛出异常或数据错误
- 传统 Vector,Hashtable 等同步集合性能过差
- 并发数据结构:数据添加和删除
- 阻塞式集合:当集合为空或者满时,等待
- 非阻塞式集合:当集合为空或者满时,不等待,返回 null 或异常
- List
- Vector 同步安全,写多读少
- ArrayList 不安全
- Collections.synchronizedList (List list) 基于 synchronized,效率差
- CopyOnWriteArrayList 读多写少,基于复制机制,非阻塞
- Set
- HashSet 不安全
- Collections.synchronizedSet (Set set) 基于 synchronized,效率差
- CopyOnWriteArraySet (基于 CopyOnWriteArrayList 实现) 读多写少,非阻塞
- Map
- Hashtable 同步安全,写多读少
- HashMap 不安全
- Collections.synchronizedMap (Map map) 基于 synchronized,效率差
- ConcurrentHashMap 读多写少,非阻塞
- Queue & Deque (队列,JDK 1.5 提出)
- ConcurrentLinkedQueue 非阻塞
- ArrayBlockingQueue/LinkedBlockingQueue 阻塞
# 4. 并发协作控制
# Lock
- Lock 也可以实现同步的效果
- 实现更复杂的临界区结构
- tryLock 方法可以预判锁是否空闲
- 允许分离读写的操作,多个读,一个写
- 性能更好
- ReentrantLock 类,可重入的互斥锁
- ReentrantReadWriteLock 类,可重入的读写锁
- lock 和 unlock 函数
// 构造锁 | |
private static final ReentrantLock queueLock = new ReentrantLock(); // 可重入锁 | |
private static final ReentrantReadWriteLock orderLock = new ReentrantReadWriteLock(); // 可重入读写锁 | |
// 基本函数 | |
queueLock.tryLock()// 尝试加锁 (包含 lock),返回 boolean | |
queueLock.lock() // 加锁 | |
queueLock.unlock()// 解锁 | |
orderLock.writeLock().lock()/unlock() // 写加锁 / 解锁 (写锁只能一个线程拥有) | |
orderLock.readLock().lock()/unlock() // 读加锁 / 解锁 (读锁多个线程共享) |
# Semaphore
- 信号量:本质上是一个计数器
- 计数器大于 0,可以使用,等于 0 不能使用
- 可以设置多个并发量,例如限制 10 个访问
- Semaphore
- acquire 获取
- release 释放
- 比 Lock 更进一步,可以控制多个同时访问关键区
// 构造信号量 | |
private final Semaphore placeSemaphore = new Semaphore(5); | |
// 获取(信号量 - 1) | |
placeSemaphore.acquire(); | |
// 尝试获取(包含 acquire) | |
placeSemaphore.tryAcquire() | |
// 释放(信号量 + 1) | |
placeSemaphore.release(); |
# Latch
- 等待锁,是一个同步辅助类
- 用来同步执行任务的一个或者多个线程
- 不是用来保护临界区或者共享资源
- CountDownLatch
- countDown () 计数减 1
- await () 等待 latch 变成 0
// 构造等待锁 | |
CountDownLatch startSignal = new CountDownLatch(1); | |
// 计数减 1 | |
startSignal.countDown(); | |
// 等待 latch 变成 0 | |
startSignal.await(); |
# Barrier
- 集合点,也是一个同步辅助类
- 允许多个线程在某一个点上进行同步
- CyclicBarrier
- 构造函数是需要同步的线程数量
- await 等待其他线程,到达数量后,就放行
// 当有 3 个线程在 barrier 上 await,就执行 finalResultCalculator 中的 run 方法 | |
CalculateFinalResult finalResultCalculator = new CalculateFinalResult(results); | |
CyclicBarrier barrier = new CyclicBarrier(3, finalResultCalculator); | |
//class CalculateFinalResult implements Runnable {} | |
// 等待 | |
barrier.await(); |
# Phaser
- 允许执行并发多阶段任务,同步辅助类
- 在每一个阶段结束的位置对线程进行同步,当所有的线程都到达这步,再进行下一步
- Phaser
- arrive()
- arriveAndAwaitAdvance()
// 构造 | |
Phaser phaser = new Phaser(5); | |
// 等到 5 个线程都到了,才放行 | |
phaser.arriveAndAwaitAdvance(); |
# Exchanger
- 允许在并发线程中互相交换消息
- 允许在 2 个线程中定义同步点,当两个线程都到达同步点,它们交换数据结构
- Exchanger
- exchange (), 线程双方互相交互数据
- 交换数据是双向的
// 构造,交换类型为 String | |
Exchanger<String> exchanger = new Exchanger<String>(); | |
// 对外交换 null, 获得内容放入 item 中 | |
String item = exchanger.exchange(null); |
# 5. 定时任务执行
# 简单定时器机制
- 设置计划任务,也就是在指定的时间开始执行某一个任务。
- TimerTask 封装任务
- Timer 类 定时器
/* | |
class Task extends TimerTask { | |
public void run() { | |
//TODO | |
} | |
} | |
*/ | |
// 当前时间 1 秒后,每 2 秒执行一次 | |
timer.schedule(task, 1000, 2000); | |
// 取消当前的任务 | |
task.cancel(); | |
// 取消定时器 | |
timer.cancel(); |
# Executor + 定时器机制
- ScheduledExecutorService
- 定时任务
- 周期任务
// 固定时间(1s 后)启动 | |
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); | |
executor.schedule(new MyTask(),1,TimeUnit.SECONDS); | |
// 周期任务 固定速率 是以上一个任务开始的时间计时,period 时间过去后,检测上一个任务是否执行完毕 | |
// 如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行 | |
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); | |
executor.scheduleAtFixedRate(new MyTask(),1,3000,TimeUnit.MILLISECONDS); | |
// 周期任务 固定延时 是以上一个任务结束时开始计时,period 时间过去后,立即执行 | |
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); | |
executor.scheduleWithFixedDelay(new MyTask(),1,3000,TimeUnit.MILLISECONDS); |
# Quartz
- Quartz 是一个较为完善的任务调度框架
- 解决程序中 Timer 零散管理的问题
- 功能更加强大
- Timer 执行周期任务,如果中间某一次有异常,整个任务终止执行
- Quartz 执行周期任务,如果中间某一次有异常,不影响下次任务执行
- ……
import org.quartz.JobDetail; | |
import org.quartz.Scheduler; | |
import org.quartz.Trigger; | |
import org.quartz.impl.StdSchedulerFactory; | |
import static org.quartz.JobBuilder.newJob; | |
import static org.quartz.SimpleScheduleBuilder.simpleSchedule; | |
import static org.quartz.TriggerBuilder.newTrigger; | |
// 创建 scheduler | |
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); | |
// 定义一个 Trigger | |
Trigger trigger = newTrigger().withIdentity("trigger1", "group1") // 定义 name/group | |
.startNow()// 一旦加入 scheduler,立即生效 | |
.withSchedule(simpleSchedule() // 使用 SimpleTrigger | |
.withIntervalInSeconds(2) // 每隔 2 秒执行一次 | |
.repeatForever()) // 一直执行 | |
.build(); | |
// 定义一个 JobDetail | |
JobDetail job = newJob(HelloJob.class) // 定义 Job 类为 HelloQuartz 类 | |
.withIdentity("job1", "group1") // 定义 name/group | |
.usingJobData("name", "quartz") // 定义属性 | |
.build(); | |
/* | |
public class HelloJob implements Job { | |
@Override | |
public void execute(JobExecutionContext context) throws JobExecutionException { | |
//TODO | |
} | |
} | |
*/ | |
// 加入这个调度 | |
scheduler.scheduleJob(job, trigger); | |
// 启动 | |
scheduler.start(); | |
// 运行一段时间后关闭 | |
Thread.sleep(10000); | |
scheduler.shutdown(true); |