多线程并发编程

并发基础知识与核心知识

并发与高并发

  • 并发:同时拥有两个或者多个线程,如果程序在单核处理器上运行,多个线程将交替的换入或者换出内存,这些线程是同时存在的,每个线程处于执行过程中的某个状态,如果运行在多核处理器上,此时程序中的每个线程将被分到一个处理器核上,因此可以同时运行。
  • 高并发:high concurrency是互联网分布式系统架构设计中必须考虑的因素之一,它通常是指通过设计保证系统能够同时并行处理很多请求。

CPU多级缓存

CPU多级缓存

  • CPU的频率远远快与主存的速度,在处理器的时钟周期内,CPU需要等待主存,为了最大限度的利用CPU的性能,引入了Cache,环节CPU和内存之间速度的不匹配。
  • 缓存的意义:

    • 时间局部性:如果某个数据被访问,那么在不久的将来它很可能再次被访问

    • 空间局部性:如果某个数据被访问,那么与它相邻的数据很快也可能被访问

  • 缓存一致性-MESI:

    • 用于保证多个CPU cache之间缓存共享数据的一致性

      • M:修改,该缓存行只被缓存在缓存中,与CPU之间的数据是不同的,需要重新写入主存

      • E:独享,该缓存行只被缓存在缓存中,与CPU之间的数据一致

      • S:共享,该缓存行可能被多个CPU进行缓存

      • I:无效,有其他CPU修改了该缓存航

      • local read

      • local write

      • remote read

      • remote write

  • 乱序执行优化:处理器为提高运算速度而做出违背代码原有顺序的优化

  • Java内存模型

    • Heap:运行时数据区,可以动态分配内存大小,GC负责,存取速度相对较慢

    • Stack:存取速度较快,大小固定

    • 当两个线程同时调用同一个对象的同一个方法,两个线程则拥有这个对象的私有拷贝

      内存模型抽象图

    • 同步操作与规则:

      • 操作:

        • lock:作用于主内存的变量,把一个变量标识为一个线程独占状态

        • unlock:作用于主内存的变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。

        • read:作用于主内存的变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load使用

        • load:作用于工作内存的变量,把read操作从主内存中得到的变量值放入到工作内存的变量副本中

        • use:作用于工作内存的变量,将工作内存中的一个变量值传递给执行引擎

        • assign:作用于工作内存的变量,把一个从执行引擎接收到的值赋值给工作内存的变量

        • store:作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的写入操作

        • write:作用于主内存的变量,把store操作从工作内存的中一个变量的值传送到主内存的变量中

      • 规则:

        • 如果要把一个变量从主内存中复制到工作内存,就需要按顺序的执行read和load操作,如果把变量从工作内存中同步回主内存中,就要按顺序地执行store和write操作。但Java内存模型只要求上述操作必须按顺序执行,而没有保证必须是连续执行

        • 不允许read和load,store和write操作之一单独出现

        • 不允许一个线程丢弃它最近的assign操作,即变量在工作内存中改变了之后必须同步到主内存中

        • 不允许一个线程无原因的把数据从工作内存同步回主内存中

        • 一个新的变量只能在主内存中诞生,不允许在工作内存中直接使用一个未被初始化的变量。也就是对一个变量实施use和store操作之前,必须先执行过了assign和load操作

        • 一个变量在同一时刻只允许一条线程对其进行lock操作,但是咯擦卡操作可以被同一条线程重复多次执行,多次lock之后,只有执行相同次数的unlock操作,变量才会被解锁。

        • 如果对一个变量执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前需要重新执行load或assign操作初始化变量的值

        • 如果一个变量事先没有被lock操作锁定,则不允许对它执行unlock操作,也不允许unlock一个被其他线程锁定的变量

        • 对一个变量执行unlock操作之前,必须先把此变量同步到主内存中

  • 并发的优势

    • 速度:同时处理多个请求,相应更快,复杂的操作可以分成多个进程同时进行

    • 设计:程序设计在某些情况下会更简单,也可以有更多的选择

    • 资源利用:CPU能够在等待IO的时候做一些其他事情

  • 并发的风险:

    • 安全性:多个线程共享数据时可能会产生与预期不相符的结果

    • 活跃性:某个操作无法继续进行下去的时候,就会发生活跃性的问题,比如死锁问题,饥饿问题等

    • 性能:线程过多会使得CPU频繁切换线程,调度时间增多,同步机制,消耗过多的内存

并发的线程安全处理

并发模拟

  • PostMan

  • Apache Bench

  • JMeter

  • Semaphore、countdownLatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ConcurrencyTest {
// 请求总数
public static int clientTotal = 5000;
// 线程总数
public static int threadTotal = 200;
public static int count = 0;
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
es.execute(() -> {
try {
semaphore.acquire();
add();
semaphore.release();
} catch (InterruptedException e) {
log.error("exception", e);
e.printStackTrace();
}
countDownLatch.countDown();
});
}
countDownLatch.await();
log.info("count:{}", count);
}
private static void add() {
count++;
}
}

线程安全性

当多个线程访问某个类时,不管运行时环境采用何种调度方式,或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或者协同,这个类都将表现出正确的行为,那么这个类时线程安全的。

  • 原子性:提供了互斥访问,同一个时刻只有一个线程对它进行操作

    • Atomic包:CAS完成原子性

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      @Slf4j
      @ThreadSafe
      public class CountExample2 {
      private static int clientTotal = 5000;
      private static int threadTotal = 10;
      private static AtomicInteger count = new AtomicInteger(0);
      public static void main(String[] args) throws InterruptedException {
      ExecutorService es = Executors.newCachedThreadPool();
      final Semaphore semaphore = new Semaphore(threadTotal);
      final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
      for (int i = 0; i < clientTotal; i++) {
      es.execute(() -> {
      try {
      semaphore.acquire();
      add();
      semaphore.release();
      } catch (InterruptedException e) {
      log.error("exception", e);
      }
      countDownLatch.countDown();
      });
      }
      countDownLatch.await();
      es.shutdown();
      log.info("count: {}", count);
      }
      private static void add() {
      count.incrementAndGet();
      }
      }
      //21:20:18.403 [main] INFO com.imooc.concurentcy.example.count.CountExample2 - count: 5000
      • AtomicXXX:CAS、Unsafe.compareAndSwapInt

      • AtomicLong、LongAddr

      • AtomicReference、AtomicReferenceFieldUpdater

      • AtomicStampReference:CAS的ABA问题

      • synchronized,关键字,依赖JVM,不可中断的锁,适合竞争不激烈,可读性好

        • 修饰代码块,作用于调用的对象

        • 修饰方法,整个方法,作用于对象

        • 修饰静态方法,整个静态方法,作用于所有对象

        • 修饰类,作用于所有对象

      • Lock:依赖特殊的CPU指令,可中断锁,多样化同步,适合竞争激烈时能维持常态

  • 可见性:一个线程对主内存的数据的修改可以及时的被其他线程观察到

    • 导致共享变量在线程间不可见的原因

      • 线程交叉执行

      • 重排序结合线程交叉执行

      • 共享变量更新后的值没有在工作内存与主内存间及时更新

    • JMM关于synchronized的规定:

      • 线程解锁前,必须把共享变量的值,刷新到主内存中

      • 线程加锁时,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值

    • volatile,通过内存屏障和禁止重排序优化来实现:

      • 对volatile变量写操作时,会在写操作后加入一条store屏障指令,将本地内存中的共享变量值刷新到主内存

      • 对volatile变量读操作时,会在读操作前键入一条load屏障指令,从主内存中读取共享变量

  • 有序性:一个线程观察其他线程中的指令的执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序

    • Java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程程序的执行,但是会影响多线程并发执行的正确性

    • volatile,synchronized,Lock

    • Happenes-Before原则

      • 程序次序原则:一个程序内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作

      • 锁定规则:一个unlock操作线性发生于后面对同一个锁的Lock操作

      • volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作

      • 传递规则:如果操作A先行发生于操作B,操作B先行发生于操作C,操作A先与操作C

      • 线程启动规则:Thread对象的start方法先行发生于此线程的每一个动作

      • 线程中断规则:对线程的interrupt方法的调用先行发生于被中断线程的代码检测到的中断事件的发生

      • 线程终结规则:线程中所有的操作都先行发生于线程的终止检测

      • 对象终结规则:一个对象的初始化完成先行发生于finalize()方法的开始

安全发布对象

  1. 发布对象:使一个对象能够被当前范围之外的代码所使用

  2. 对象逸出:一种错误的发布,一个对象没有构造完成就被其他线程所见

  3. 安全发布对象

    • 在静态初始化函数中初始化一个对象的引用

    • 将对象的引用保存到volatile类型域或者AtomicReference对象中

    • 将对象的引用保存到某个正确构造对象的final类型域中

    • 将对象的引用保存到一个由锁保护的域中

线程安全策略

  1. 不可变对象–String类型

    • 条件

      • 对象创建以后其状态不能修改

      • 对象所有的域都是final类型

      • 对象时正确创建的,创建过程中this引用没有逸出

    • final关键字

      • 修饰类:不能被继承,String,Integer,Long

      • 修饰方法:锁定方法不能被继承类修改,早期效率更高

      • 修饰变量:

        • 基本数据类型变量,数值在初始化之后不可被修改

        • 引用类型变量,初始化之后不能再指向另外一个对象,对象里面的值时可以进行修改的

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      public class ImmutableExample1 {
      private final static Integer a = 1;
      private final static String b = "2";
      private final static Map<Integer, Integer> map = Maps.newHashMap();
      static {
      map.put(1,2);
      map.put(3,4);
      map.put(5,6);
      }
      public static void main(String[] args) {
      // a = 2;
      // b = "a";
      // map = Maps.newTreeMap();
      log.info("{}", map.get(1));
      map.put(1,4);
      log.info("{}", map.get(1));
      }
      private void test(final int a) {
      // a = 1;
      }
      }
      /**
      * 17:04:10.511 [main] INFO com.imooc.concurentcy.example.immutable.ImmutableExample1 - 2
      * 17:04:10.517 [main] INFO com.imooc.concurentcy.example.immutable.ImmutableExample1 - 4
      */
    • Collections.unmodifiableXXX:Collection, List, Set, Map…

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      public class ImmutableExample2 {
      private final static Integer a = 1;
      private final static String b = "2";
      private static Map<Integer, Integer> map = Maps.newHashMap();
      static {
      map.put(1,2);
      map.put(3,4);
      map.put(5,6);
      map = Collections.unmodifiableMap(map);
      }
      public static void main(String[] args) {
      // a = 2;
      // b = "a";
      // map = Maps.newTreeMap();
      log.info("{}", map.get(1));
      map.put(1,4);
      log.info("{}", map.get(1));
      }
      }
      /**
      * Exception in thread "main" java.lang.UnsupportedOperationException
      * 17:02:41.546 [main] INFO com.imooc.concurentcy.example.immutable.ImmutableExample2 - 2
      * at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)
      * at com.imooc.concurentcy.example.immutable.ImmutableExample2.main(ImmutableExample2.java:34)
      */
    • Guava:ImmutableXXX:Collection, List, Set, Map…

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      public class ImmutableExample3 {
      private final static ImmutableList list = ImmutableList.of(1,2,3);
      private final static ImmutableSet set = ImmutableSet.copyOf(list);
      private final static ImmutableMap<Integer, Integer> map = ImmutableMap.of(1,2,3,4);
      private final static ImmutableMap<Integer, Integer> map2 = ImmutableMap.<Integer, Integer>builder().put(1,2)
      .put(3,4).put(5,6).build();
      public static void main(String[] args) {
      // list.add(4);
      // Exception in thread "main" java.lang.UnsupportedOperationException
      // at com.google.common.collect.ImmutableCollection.add(ImmutableCollection.java:222)
      // at com.imooc.concurentcy.example.immutable.ImmutableExample3.main(ImmutableExample3.java:26)
      map.put(6,7);
      map2.put(7,8);
      }
      }
  2. 线程封闭

    • Ad-hoc线程封闭:程序控制是实现

    • 堆栈封闭:局部变量,无并发问题

    • ThreadLocal线程封闭

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      public class RequestHolder {
      private final static ThreadLocal<Long> requesetHolder = new ThreadLocal<>();
      public static void add(Long id) {
      requesetHolder.set(id);
      }
      public static Long getId() {
      return requesetHolder.get();
      }
      public static void remove() {
      requesetHolder.remove();
      }
      }
  3. 线程不安全的类与写法

    • StringBuilder->StringBuffer

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      @Slf4j
      public class StringExample1 {
      // 请求总数
      public static int clientTotal = 5000;
      // 同时并发执行的线程数
      public static int threadTotal = 200;
      // public static StringBuilder sb = new StringBuilder();
      public static StringBuffer sb = new StringBuffer();
      public static void main(String[] args) throws Exception {
      ExecutorService executorService = Executors.newCachedThreadPool();
      final Semaphore semaphore = new Semaphore(threadTotal);
      final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
      for (int i = 0; i < clientTotal ; i++) {
      executorService.execute(() -> {
      try {
      semaphore.acquire();
      update();
      semaphore.release();
      } catch (Exception e) {
      log.error("exception", e);
      }
      countDownLatch.countDown();
      });
      }
      countDownLatch.await();
      executorService.shutdown();
      log.info("count:{}", sb.length());
      }
      private static void update() {
      sb.append(1);
      }
      }
    • SimpleDateFormate-JodaTime

    • ArrayList,HashSet,HashMap等集合类

  4. 同步容器

    • Vector,Stack

    • HashTable:key和value不能为null

    • Collections.synchronizedXXX(List, Set, Map)

  5. 并发容器

    • ArrayList->CopyOnWriteArrayList

      • 读写分离

      • 保证最终一致性

      • 通过另外开辟空间来解决并发冲突

    • HashSet,TreeSet->CopyOnWriteArraySet,ConcurrentSkipListSet

    • HashMap,TreeMap->ConcurrentHashMap,ConcurrentSkipListMap

    • JUC的实际构成:

J.U.C之AQS

AQS-AbstractQueuedSynchronizer

  • 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架

  • 利用一个int类型表示状态

  • 使用方法是继承

  • 子类通过继承并通过实现其方法管理其状态,也就是acquire方法和release方法

  • 可以同时实现排他锁和共享锁模式

CountDownLatch

Semaphore

CyclicBarrier

ReentrantLock

  1. ReentrantLock和synchronized的区别

    • 可重入性,都可以

    • 锁的实现,synchronized是依赖于JVM

    • 性能:优化之后差不多

    • 功能区别

  2. ReenTrantLock独有的功能:

    • 可以执行公平锁还是非公平锁

    • 提供一个Condition类,可以分组唤醒需要唤醒的线程

    • 可以终端等待锁的线程的机制

  3. StampLock–读,写,乐观读三种模式

Condition

FutureTask

  • Callable和Runnable接口对比

    • run,call

    • 返回值

  • Future接口–获取线程运行状态和结果

  • FutureTask

    • Runnable

    • Future

BlockingQueue

阻塞队列中的操作:

  • ArrayBlockingQueue

  • DelayQueue

  • LinkedBlockingQueue

  • PriorityBlockingQueue

  • SynchronousQueue

线程池

  • new Thread的弊端

    • 性能差

    • 线程缺乏统一管理,可能无限制的新建线程,相互竞争,有可能OOM

    • 缺少更多功能,

  • 线程池的好处

    • 重用存在的线程,减少对象的创建,销毁的开销,性能好

    • 可以有效的控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞

    • 提供定期执行,定时执行,单线程,并发数控制等功能

  • ThreadPoolExecutor

    • corePoolSize

    • maximumPoolSize

    • workQueue-阻塞队列

    • keepAliveTime

    • unit

    • threadFactory

    • rejectHandler

  • 线程池的状态

    • running

    • shutdown

    • stop

    • tidying

    • terminated

    线程池状态

  • 提供的方法

    • execute

    • submit 有返回值 execute+future

    • shutdown

    • shutdownNow

    • getTaskCount:获取一致性和未执行的任务总数

    • getCompletedTaskCount

    • getPoolSize

    • getActiveCount

  • 四种线程池

    • Executors.newCachedThreadPool

    • Executors.newFixedThreadPool

    • Executors.newScheduledhreadPool

    • Executors.newSingleThreadExecutor

  • 合理配置

    • CPU密集型任务,需要尽量压榨CPU,参考值可以设置为CPU的核数+1

    • IO密集型任务,参考值可以设置为2*CPU的核数

并发扩展

死锁

  • 必要条件

    • 互斥条件

    • 请求和保持条件:对已经获取的资源不释放

    • 不剥夺条件

    • 环路等待条件:循环等待

  • 死锁实例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    public class DeadLock implements Runnable{
    public int flag = 1;
    private static Object o1 = new Object();
    private static Object o2 = new Object();
    @Override
    public void run() {
    log.info("flag : {}", flag);
    if (flag == 1) {
    synchronized (o1) {
    try {
    Thread.sleep(500);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    synchronized (o2) {
    log.info("1");
    }
    }
    }
    if (flag == 0) {
    synchronized (o2) {
    try {
    Thread.sleep(500);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    synchronized (o1) {
    log.info("0");
    }
    }
    }
    }
    public static void main(String[] args) {
    DeadLock td1 = new DeadLock();
    DeadLock td2 = new DeadLock();
    td1.flag = 1;
    td2.flag = 0;
    //td1,td2都处于可执行状态,但JVM线程调度先执行哪个线程是不确定的。
    //td2的run()可能在td1的run()之前运行
    new Thread(td1).start();
    new Thread(td2).start();
    }
    }
  • 死锁避免方法:

    • 加锁顺序

    • 加锁时限,超时释放机制

    • 死锁检测

多线程并发的最佳实践

  • 使用本地变量

  • 使用不可变类

  • 最小化锁的作用于范围:s = 1 / (1 - a + a / n);

  • 使用线程池的Executor,而不是直接new Thread

  • 宁可使用同步也不要使用线程的wait和notify方法

  • 使用blockingQueue实现生产-消费模式

  • 使用并发集合而不是使用加了锁的同步集合

  • 使用Semaphore创建有界的访问

  • 宁可使用同步代码块,也不使用同步的方法

  • 避免使用静态变量

高并发处理思路及手段

扩容

  • 垂直扩容:提高系统部件能力,如加大内存

  • 水平扩容:增加更多系统成员来实现,如增加机器

  • 数据库扩容

    • 读操作扩展:memcache,redis,CDN缓存等

    • 写操作扩展:Cassandra,Hbase等

缓存

  • 缓存特征

    • 缓存命中率:命中数 / (命中数+未命中数)

    • 最大空间:缓存清空策略

      • FIFO:淘汰最先创建的数据

      • LFU:最少使用次数,清除使用次数最少的数据

      • LRU:淘汰最远使用的数据的时间戳-热点数据

      • 过期时间:过期时间最长的

      • Random:随机淘汰

  • 影响缓存命中率的因素

    • 业务场景和业务需求-读写

    • 缓存的设计(粒度和过期策略)

    • 缓存的容量和基础设施

    • 缓存节点故障等问题–一致性哈希算法

  • 缓存分类和应用场景:

    • 本地缓存:编程实现,Guava Cache

    • 分布式缓存:Memcached,Redis

  • Redis

    redis核心对象

  • 高并发场景下存在的问题

    • 缓存一致性

    • 缓存并发问题

    • 缓存穿透问题

      • 缓存空对象
    • 缓存雪崩现象

消息队列

  • 消息队列特性

    • 业务无关:只做消息分发

    • FIFO:先到先出

    • 容灾:节点的动态增删和消息的持久化

    • 性能:吞吐量提升,系统内部通信效率提高

  • 消息队列的好处

    • 业务解耦

    • 最终一致性

    • 广播

    • 错峰与流控

  • 队列实例

    • Kafka

      • 高性能,跨语言,分布式发布订阅消息队列系统

      • 快速持久化,O(1)

      • 高吞吐

      • 分布式系统

      • 支持Hadoop数据加载

    • RabbitMQ

应用拆分-解决单个服务器处理上限的问题

  • 应用拆分原则

    • 业务优先

    • 循序渐进

    • 兼顾技术:重构,分层

    • 可靠测试

  • 拆分过程中的关注点

    • 应用之间的通信:RPC or 消息队列 or …

    • 应用之间的数据库设计:每个应用单独库

    • 避免事务操作跨应用

  • 应用拆分

    • 服务化-Dubbo

      dubbo架构

    • 微服务-springcloud

      微服务架构

应用限流

  • 算法

    • 计数器法-滑动窗口的低精度情况

      计数器法

    • 滑动窗口

      滑动窗口

    • 漏桶算法-leaky Bucket

      漏桶算法

    • 令牌桶算法-token Bucket-允许流量一定程度的突发

      令牌桶算法

服务降级

  • 服务降级:设置默认返回,解决处理不了的请求。

    • 自动降级:超时,失败次数,故障,限流

    • 人工降级:秒杀,双11大促等

  • 服务熔断:过载保护

数据库分库分表

  • 数据库瓶颈

    • 单个库的数据量上限(1-2T)

    • 单个数据库服务压力过大,读写瓶颈:多个库

    • 单个表的数据量过大:分表

  • 数据库切库

    • 读写分离,降低主库的压力,多个从库通过负载均衡来调节
  • 数据库分表-单个表的数据量过大

    • 横向分表:同样结构的不同数据的表,id取模划分

    • 纵向分表:将本来可以在一个表的内容认为的划分为多个表,根据不同列的数据的活跃数量等进行分配

高可用

  • 任务调度系统分布式:elastic-job + zookeeper

  • 主备切换:Apache curator + zookeeper分布式锁实现

  • 监控报警机制

Donate comment here