MOOC 个人学习笔记

# 1. 并发框架 Executor

  • JDK 5 开始提供 Executor FrameWork (java.util.concurrent.*)
    • 分离任务的创建和执行者的创建
    • 线程重复利用 (new 线程代价很大)

# 共享线程池

  • 预设好的多个 Thread,可弹性增加
  • 多次执行很多很小的任务
  • 任务创建和执行过程解耦
  • 程序员无需关心线程池执行任务过程

# 主要类

  • Executors.newCachedThreadPool/newFixedThreadPool 创建线程池
  • ExecutorService 线程池服务
  • Callable 具体的逻辑对象 (线程类)
    • (Runnable 的 run 方法没有返回值,而 Callable 的 call 方法可以有返回值)
  • Future 返回结果
// 创建可变线程池
private ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();
// 创建固定线程池
private ThreadPoolExecutor executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(5);
// 关闭线程池
executor.shutdown();
// 执行任务,无返回值
executor.execute(task);
// 执行任务,有返回值
Future<Integer> result=executor.submit(task);
/*
public class Task implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        return null;
    }
}
*/
// 线程池基础信息
System.out.println(executor.getPoolSize());
System.out.println(executor.getActiveCount());
System.out.println(executor.getCompletedTaskCount());
//Future 返回类基本操作
result.isDone() // 判断任务是否完成
int sum=result.get(); // 获得结果

# 2. 并发框架 Fork-join

  • Java 7 提供另一种并行框架:分解、治理、合并 (分治编程)
  • 适合用于整体任务量不好确定的场合 (最小任务可确定)

# 主要类

  • ForkJoinPool 任务池
  • RecursiveAction
  • RecursiveTask
// 创建执行线程池
ForkJoinPool pool = new ForkJoinPool();
ForkJoinPool pool = new ForkJoinPool(4); // 固定线程池
// 创建任务
Task task = new Task(1, 10000000);
/*
public class Task extends RecursiveTask<Integer> {
    @Override
    protected Integer compute () {
        // 任务足够小,则直接执行
        //TODO
        // 任务大于阈值,分裂为 2 个任务
        invokeAll (subTask1,subTask2);
        Integer sum1 = subTask1.join ();
        Integer sum2 = subTask2.join ();
        // 结果合并
        sum = sum1 + sum2;
        return sum;
    }
}
*/
// 提交任务
ForkJoinTask<Integer> result = pool.submit(task);
// 输出结果
System.out.println(result.get().toString());

# 3. 并发数据结构

  • 常用的数据结构是线程不安全的
    • ArrayList, HashMap, HashSet 非同步的
    • 多个线程同时读写,可能会抛出异常或数据错误
  • 传统 Vector,Hashtable 等同步集合性能过差
  • 并发数据结构:数据添加和删除
    • 阻塞式集合:当集合为空或者满时,等待
    • 非阻塞式集合:当集合为空或者满时,不等待,返回 null 或异常

  • List
    • Vector 同步安全,写多读少
    • ArrayList 不安全
    • Collections.synchronizedList (List list) 基于 synchronized,效率差
    • CopyOnWriteArrayList 读多写少,基于复制机制,非阻塞
  • Set
    • HashSet 不安全
    • Collections.synchronizedSet (Set set) 基于 synchronized,效率差
    • CopyOnWriteArraySet (基于 CopyOnWriteArrayList 实现) 读多写少,非阻塞
  • Map
    • Hashtable 同步安全,写多读少
    • HashMap 不安全
    • Collections.synchronizedMap (Map map) 基于 synchronized,效率差
    • ConcurrentHashMap 读多写少,非阻塞
  • Queue & Deque (队列,JDK 1.5 提出)
    • ConcurrentLinkedQueue 非阻塞
    • ArrayBlockingQueue/LinkedBlockingQueue 阻塞

# 4. 并发协作控制

# Lock

  • Lock 也可以实现同步的效果
    • 实现更复杂的临界区结构
    • tryLock 方法可以预判锁是否空闲
    • 允许分离读写的操作,多个读,一个写
    • 性能更好
  • ReentrantLock 类,可重入的互斥锁
  • ReentrantReadWriteLock 类,可重入的读写锁
  • lock 和 unlock 函数
// 构造锁
private static final ReentrantLock queueLock = new ReentrantLock(); // 可重入锁
private static final ReentrantReadWriteLock orderLock = new ReentrantReadWriteLock(); // 可重入读写锁
// 基本函数
queueLock.tryLock()// 尝试加锁 (包含 lock),返回 boolean
queueLock.lock() // 加锁
queueLock.unlock()// 解锁
orderLock.writeLock().lock()/unlock() // 写加锁 / 解锁 (写锁只能一个线程拥有)
orderLock.readLock().lock()/unlock() // 读加锁 / 解锁 (读锁多个线程共享)

# Semaphore

  • 信号量:本质上是一个计数器
  • 计数器大于 0,可以使用,等于 0 不能使用
  • 可以设置多个并发量,例如限制 10 个访问
  • Semaphore
    • acquire 获取
    • release 释放
  • 比 Lock 更进一步,可以控制多个同时访问关键区
// 构造信号量
private final Semaphore placeSemaphore = new Semaphore(5);
// 获取(信号量 - 1)
placeSemaphore.acquire();
// 尝试获取(包含 acquire)
placeSemaphore.tryAcquire()
// 释放(信号量 + 1)
placeSemaphore.release();

# Latch

  • 等待锁,是一个同步辅助类
  • 用来同步执行任务的一个或者多个线程
  • 不是用来保护临界区或者共享资源
  • CountDownLatch
    • countDown () 计数减 1
    • await () 等待 latch 变成 0
// 构造等待锁
CountDownLatch startSignal = new CountDownLatch(1);
// 计数减 1
startSignal.countDown();
// 等待 latch 变成 0
startSignal.await();

# Barrier

  • 集合点,也是一个同步辅助类
  • 允许多个线程在某一个点上进行同步
  • CyclicBarrier
    • 构造函数是需要同步的线程数量
    • await 等待其他线程,到达数量后,就放行
// 当有 3 个线程在 barrier 上 await,就执行 finalResultCalculator 中的 run 方法
CalculateFinalResult finalResultCalculator = new CalculateFinalResult(results);
CyclicBarrier barrier = new CyclicBarrier(3, finalResultCalculator);
//class CalculateFinalResult implements Runnable {}
// 等待
barrier.await();

# Phaser

  • 允许执行并发多阶段任务,同步辅助类
  • 在每一个阶段结束的位置对线程进行同步,当所有的线程都到达这步,再进行下一步
  • Phaser
    • arrive()
    • arriveAndAwaitAdvance()
// 构造
Phaser phaser = new Phaser(5);
// 等到 5 个线程都到了,才放行
phaser.arriveAndAwaitAdvance();

# Exchanger

  • 允许在并发线程中互相交换消息
  • 允许在 2 个线程中定义同步点,当两个线程都到达同步点,它们交换数据结构
  • Exchanger
    • exchange (), 线程双方互相交互数据
    • 交换数据是双向的
// 构造,交换类型为 String
Exchanger<String> exchanger = new Exchanger<String>();
// 对外交换 null, 获得内容放入 item 中
String item = exchanger.exchange(null);

# 5. 定时任务执行

# 简单定时器机制

  • 设置计划任务,也就是在指定的时间开始执行某一个任务。
  • TimerTask 封装任务
  • Timer 类 定时器
/*
class Task extends TimerTask {
    public void run() {
        //TODO
    }
}
*/
// 当前时间 1 秒后,每 2 秒执行一次
timer.schedule(task, 1000, 2000);
// 取消当前的任务
task.cancel();
// 取消定时器
timer.cancel();

# Executor + 定时器机制

  • ScheduledExecutorService
    • 定时任务
    • 周期任务
// 固定时间(1s 后)启动
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.schedule(new MyTask(),1,TimeUnit.SECONDS);
// 周期任务 固定速率 是以上一个任务开始的时间计时,period 时间过去后,检测上一个任务是否执行完毕
// 如果上一个任务执行完毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(new MyTask(),1,3000,TimeUnit.MILLISECONDS);
// 周期任务 固定延时 是以上一个任务结束时开始计时,period 时间过去后,立即执行
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleWithFixedDelay(new MyTask(),1,3000,TimeUnit.MILLISECONDS);

# Quartz

  • Quartz 是一个较为完善的任务调度框架
  • 解决程序中 Timer 零散管理的问题
  • 功能更加强大
    • Timer 执行周期任务,如果中间某一次有异常,整个任务终止执行
    • Quartz 执行周期任务,如果中间某一次有异常,不影响下次任务执行
    • ……
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.Trigger;
import org.quartz.impl.StdSchedulerFactory;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;
// 创建 scheduler
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
// 定义一个 Trigger
Trigger trigger = newTrigger().withIdentity("trigger1", "group1") // 定义 name/group
        .startNow()// 一旦加入 scheduler,立即生效
        .withSchedule(simpleSchedule() // 使用 SimpleTrigger
                .withIntervalInSeconds(2) // 每隔 2 秒执行一次
                .repeatForever()) // 一直执行
        .build();
// 定义一个 JobDetail
JobDetail job = newJob(HelloJob.class) // 定义 Job 类为 HelloQuartz 类
        .withIdentity("job1", "group1") // 定义 name/group
        .usingJobData("name", "quartz") // 定义属性
        .build();
/*
public class HelloJob implements Job {
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        //TODO
    }
}
*/
// 加入这个调度
scheduler.scheduleJob(job, trigger);
// 启动
scheduler.start();
// 运行一段时间后关闭
Thread.sleep(10000);
scheduler.shutdown(true);