并发基础知识与核心知识
并发与高并发
- 并发:同时拥有两个或者多个线程,如果程序在单核处理器上运行,多个线程将交替的换入或者换出内存,这些线程是同时存在的,每个线程处于执行过程中的某个状态,如果运行在多核处理器上,此时程序中的每个线程将被分到一个处理器核上,因此可以同时运行。
- 高并发:high concurrency是互联网分布式系统架构设计中必须考虑的因素之一,它通常是指通过设计保证系统能够同时并行处理很多请求。
CPU多级缓存
- CPU的频率远远快与主存的速度,在处理器的时钟周期内,CPU需要等待主存,为了最大限度的利用CPU的性能,引入了Cache,环节CPU和内存之间速度的不匹配。
缓存的意义:
时间局部性:如果某个数据被访问,那么在不久的将来它很可能再次被访问
空间局部性:如果某个数据被访问,那么与它相邻的数据很快也可能被访问
缓存一致性-MESI:
用于保证多个CPU cache之间缓存共享数据的一致性
M:修改,该缓存行只被缓存在缓存中,与CPU之间的数据是不同的,需要重新写入主存
E:独享,该缓存行只被缓存在缓存中,与CPU之间的数据一致
S:共享,该缓存行可能被多个CPU进行缓存
I:无效,有其他CPU修改了该缓存航
local read
local write
remote read
remote write
乱序执行优化:处理器为提高运算速度而做出违背代码原有顺序的优化
Java内存模型
Heap:运行时数据区,可以动态分配内存大小,GC负责,存取速度相对较慢
Stack:存取速度较快,大小固定
当两个线程同时调用同一个对象的同一个方法,两个线程则拥有这个对象的私有拷贝
同步操作与规则:
操作:
lock:作用于主内存的变量,把一个变量标识为一个线程独占状态
unlock:作用于主内存的变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
read:作用于主内存的变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load使用
load:作用于工作内存的变量,把read操作从主内存中得到的变量值放入到工作内存的变量副本中
use:作用于工作内存的变量,将工作内存中的一个变量值传递给执行引擎
assign:作用于工作内存的变量,把一个从执行引擎接收到的值赋值给工作内存的变量
store:作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的写入操作
write:作用于主内存的变量,把store操作从工作内存的中一个变量的值传送到主内存的变量中
规则:
如果要把一个变量从主内存中复制到工作内存,就需要按顺序的执行read和load操作,如果把变量从工作内存中同步回主内存中,就要按顺序地执行store和write操作。但Java内存模型只要求上述操作必须按顺序执行,而没有保证必须是连续执行
不允许read和load,store和write操作之一单独出现
不允许一个线程丢弃它最近的assign操作,即变量在工作内存中改变了之后必须同步到主内存中
不允许一个线程无原因的把数据从工作内存同步回主内存中
一个新的变量只能在主内存中诞生,不允许在工作内存中直接使用一个未被初始化的变量。也就是对一个变量实施use和store操作之前,必须先执行过了assign和load操作
一个变量在同一时刻只允许一条线程对其进行lock操作,但是咯擦卡操作可以被同一条线程重复多次执行,多次lock之后,只有执行相同次数的unlock操作,变量才会被解锁。
如果对一个变量执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前需要重新执行load或assign操作初始化变量的值
如果一个变量事先没有被lock操作锁定,则不允许对它执行unlock操作,也不允许unlock一个被其他线程锁定的变量
对一个变量执行unlock操作之前,必须先把此变量同步到主内存中
并发的优势
速度:同时处理多个请求,相应更快,复杂的操作可以分成多个进程同时进行
设计:程序设计在某些情况下会更简单,也可以有更多的选择
资源利用:CPU能够在等待IO的时候做一些其他事情
并发的风险:
安全性:多个线程共享数据时可能会产生与预期不相符的结果
活跃性:某个操作无法继续进行下去的时候,就会发生活跃性的问题,比如死锁问题,饥饿问题等
性能:线程过多会使得CPU频繁切换线程,调度时间增多,同步机制,消耗过多的内存
并发的线程安全处理
并发模拟
PostMan
Apache Bench
JMeter
Semaphore、countdownLatch
|
|
线程安全性
当多个线程访问某个类时,不管运行时环境采用何种调度方式,或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或者协同,这个类都将表现出正确的行为,那么这个类时线程安全的。
原子性:提供了互斥访问,同一个时刻只有一个线程对它进行操作
Atomic包:CAS完成原子性
12345678910111213141516171819202122232425262728293031323334354jpublic class CountExample2 {private static int clientTotal = 5000;private static int threadTotal = 10;private static AtomicInteger count = new AtomicInteger(0);public static void main(String[] args) throws InterruptedException {ExecutorService es = Executors.newCachedThreadPool();final Semaphore semaphore = new Semaphore(threadTotal);final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);for (int i = 0; i < clientTotal; i++) {es.execute(() -> {try {semaphore.acquire();add();semaphore.release();} catch (InterruptedException e) {log.error("exception", e);}countDownLatch.countDown();});}countDownLatch.await();es.shutdown();log.info("count: {}", count);}private static void add() {count.incrementAndGet();}}//21:20:18.403 [main] INFO com.imooc.concurentcy.example.count.CountExample2 - count: 5000AtomicXXX:CAS、Unsafe.compareAndSwapInt
AtomicLong、LongAddr
AtomicReference、AtomicReferenceFieldUpdater
AtomicStampReference:CAS的ABA问题
锁
synchronized,关键字,依赖JVM,不可中断的锁,适合竞争不激烈,可读性好
修饰代码块,作用于调用的对象
修饰方法,整个方法,作用于对象
修饰静态方法,整个静态方法,作用于所有对象
修饰类,作用于所有对象
Lock:依赖特殊的CPU指令,可中断锁,多样化同步,适合竞争激烈时能维持常态
可见性:一个线程对主内存的数据的修改可以及时的被其他线程观察到
导致共享变量在线程间不可见的原因
线程交叉执行
重排序结合线程交叉执行
共享变量更新后的值没有在工作内存与主内存间及时更新
JMM关于synchronized的规定:
线程解锁前,必须把共享变量的值,刷新到主内存中
线程加锁时,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值
volatile,通过内存屏障和禁止重排序优化来实现:
对volatile变量写操作时,会在写操作后加入一条store屏障指令,将本地内存中的共享变量值刷新到主内存
对volatile变量读操作时,会在读操作前键入一条load屏障指令,从主内存中读取共享变量
有序性:一个线程观察其他线程中的指令的执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序
Java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程程序的执行,但是会影响多线程并发执行的正确性
volatile,synchronized,Lock
Happenes-Before原则
程序次序原则:一个程序内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作
锁定规则:一个unlock操作线性发生于后面对同一个锁的Lock操作
volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作
传递规则:如果操作A先行发生于操作B,操作B先行发生于操作C,操作A先与操作C
线程启动规则:Thread对象的start方法先行发生于此线程的每一个动作
线程中断规则:对线程的interrupt方法的调用先行发生于被中断线程的代码检测到的中断事件的发生
线程终结规则:线程中所有的操作都先行发生于线程的终止检测
对象终结规则:一个对象的初始化完成先行发生于finalize()方法的开始
安全发布对象
发布对象:使一个对象能够被当前范围之外的代码所使用
对象逸出:一种错误的发布,一个对象没有构造完成就被其他线程所见
安全发布对象
在静态初始化函数中初始化一个对象的引用
将对象的引用保存到volatile类型域或者AtomicReference对象中
将对象的引用保存到某个正确构造对象的final类型域中
将对象的引用保存到一个由锁保护的域中
线程安全策略
不可变对象–String类型
条件
对象创建以后其状态不能修改
对象所有的域都是final类型
对象时正确创建的,创建过程中this引用没有逸出
final关键字
修饰类:不能被继承,String,Integer,Long
修饰方法:锁定方法不能被继承类修改,早期效率更高
修饰变量:
基本数据类型变量,数值在初始化之后不可被修改
引用类型变量,初始化之后不能再指向另外一个对象,对象里面的值时可以进行修改的
12345678910111213141516171819202122232425262728public class ImmutableExample1 {private final static Integer a = 1;private final static String b = "2";private final static Map<Integer, Integer> map = Maps.newHashMap();static {map.put(1,2);map.put(3,4);map.put(5,6);}public static void main(String[] args) {// a = 2;// b = "a";// map = Maps.newTreeMap();log.info("{}", map.get(1));map.put(1,4);log.info("{}", map.get(1));}private void test(final int a) {// a = 1;}}/*** 17:04:10.511 [main] INFO com.imooc.concurentcy.example.immutable.ImmutableExample1 - 2* 17:04:10.517 [main] INFO com.imooc.concurentcy.example.immutable.ImmutableExample1 - 4*/
Collections.unmodifiableXXX:Collection, List, Set, Map…
123456789101112131415161718192021222324252627public class ImmutableExample2 {private final static Integer a = 1;private final static String b = "2";private static Map<Integer, Integer> map = Maps.newHashMap();static {map.put(1,2);map.put(3,4);map.put(5,6);map = Collections.unmodifiableMap(map);}public static void main(String[] args) {// a = 2;// b = "a";// map = Maps.newTreeMap();log.info("{}", map.get(1));map.put(1,4);log.info("{}", map.get(1));}}/*** Exception in thread "main" java.lang.UnsupportedOperationException* 17:02:41.546 [main] INFO com.imooc.concurentcy.example.immutable.ImmutableExample2 - 2* at java.util.Collections$UnmodifiableMap.put(Collections.java:1457)* at com.imooc.concurentcy.example.immutable.ImmutableExample2.main(ImmutableExample2.java:34)*/Guava:ImmutableXXX:Collection, List, Set, Map…
12345678910111213141516171819public class ImmutableExample3 {private final static ImmutableList list = ImmutableList.of(1,2,3);private final static ImmutableSet set = ImmutableSet.copyOf(list);private final static ImmutableMap<Integer, Integer> map = ImmutableMap.of(1,2,3,4);private final static ImmutableMap<Integer, Integer> map2 = ImmutableMap.<Integer, Integer>builder().put(1,2).put(3,4).put(5,6).build();public static void main(String[] args) {// list.add(4);// Exception in thread "main" java.lang.UnsupportedOperationException// at com.google.common.collect.ImmutableCollection.add(ImmutableCollection.java:222)// at com.imooc.concurentcy.example.immutable.ImmutableExample3.main(ImmutableExample3.java:26)map.put(6,7);map2.put(7,8);}}
线程封闭
Ad-hoc线程封闭:程序控制是实现
堆栈封闭:局部变量,无并发问题
ThreadLocal线程封闭
123456789101112131415public class RequestHolder {private final static ThreadLocal<Long> requesetHolder = new ThreadLocal<>();public static void add(Long id) {requesetHolder.set(id);}public static Long getId() {return requesetHolder.get();}public static void remove() {requesetHolder.remove();}}
线程不安全的类与写法
StringBuilder->StringBuffer
12345678910111213141516171819202122232425262728293031323334353637384jpublic class StringExample1 {// 请求总数public static int clientTotal = 5000;// 同时并发执行的线程数public static int threadTotal = 200;// public static StringBuilder sb = new StringBuilder();public static StringBuffer sb = new StringBuffer();public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();final Semaphore semaphore = new Semaphore(threadTotal);final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);for (int i = 0; i < clientTotal ; i++) {executorService.execute(() -> {try {semaphore.acquire();update();semaphore.release();} catch (Exception e) {log.error("exception", e);}countDownLatch.countDown();});}countDownLatch.await();executorService.shutdown();log.info("count:{}", sb.length());}private static void update() {sb.append(1);}}SimpleDateFormate-JodaTime
ArrayList,HashSet,HashMap等集合类
同步容器
Vector,Stack
HashTable:key和value不能为null
Collections.synchronizedXXX(List, Set, Map)
并发容器
ArrayList->CopyOnWriteArrayList
读写分离
保证最终一致性
通过另外开辟空间来解决并发冲突
HashSet,TreeSet->CopyOnWriteArraySet,ConcurrentSkipListSet
HashMap,TreeMap->ConcurrentHashMap,ConcurrentSkipListMap
JUC的实际构成:
J.U.C之AQS
AQS-AbstractQueuedSynchronizer
使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
利用一个int类型表示状态
使用方法是继承
子类通过继承并通过实现其方法管理其状态,也就是acquire方法和release方法
可以同时实现排他锁和共享锁模式
CountDownLatch
Semaphore
CyclicBarrier
ReentrantLock
ReentrantLock和synchronized的区别
可重入性,都可以
锁的实现,synchronized是依赖于JVM
性能:优化之后差不多
功能区别
ReenTrantLock独有的功能:
可以执行公平锁还是非公平锁
提供一个Condition类,可以分组唤醒需要唤醒的线程
可以终端等待锁的线程的机制
StampLock–读,写,乐观读三种模式
Condition
FutureTask
Callable和Runnable接口对比
run,call
返回值
Future接口–获取线程运行状态和结果
FutureTask
Runnable
Future
BlockingQueue
阻塞队列中的操作:
ArrayBlockingQueue
DelayQueue
LinkedBlockingQueue
PriorityBlockingQueue
SynchronousQueue
线程池
new Thread的弊端
性能差
线程缺乏统一管理,可能无限制的新建线程,相互竞争,有可能OOM
缺少更多功能,
线程池的好处
重用存在的线程,减少对象的创建,销毁的开销,性能好
可以有效的控制最大并发线程数,提高系统资源利用率,同时可以避免过多资源竞争,避免阻塞
提供定期执行,定时执行,单线程,并发数控制等功能
ThreadPoolExecutor
corePoolSize
maximumPoolSize
workQueue-阻塞队列
keepAliveTime
unit
threadFactory
rejectHandler
线程池的状态
running
shutdown
stop
tidying
terminated
提供的方法
execute
submit 有返回值 execute+future
shutdown
shutdownNow
getTaskCount:获取一致性和未执行的任务总数
getCompletedTaskCount
getPoolSize
getActiveCount
四种线程池
Executors.newCachedThreadPool
Executors.newFixedThreadPool
Executors.newScheduledhreadPool
Executors.newSingleThreadExecutor
合理配置
CPU密集型任务,需要尽量压榨CPU,参考值可以设置为CPU的核数+1
IO密集型任务,参考值可以设置为2*CPU的核数
并发扩展
死锁
必要条件
互斥条件
请求和保持条件:对已经获取的资源不释放
不剥夺条件
环路等待条件:循环等待
死锁实例
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647public class DeadLock implements Runnable{public int flag = 1;private static Object o1 = new Object();private static Object o2 = new Object();public void run() {log.info("flag : {}", flag);if (flag == 1) {synchronized (o1) {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}synchronized (o2) {log.info("1");}}}if (flag == 0) {synchronized (o2) {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}synchronized (o1) {log.info("0");}}}}public static void main(String[] args) {DeadLock td1 = new DeadLock();DeadLock td2 = new DeadLock();td1.flag = 1;td2.flag = 0;//td1,td2都处于可执行状态,但JVM线程调度先执行哪个线程是不确定的。//td2的run()可能在td1的run()之前运行new Thread(td1).start();new Thread(td2).start();}}死锁避免方法:
加锁顺序
加锁时限,超时释放机制
死锁检测
多线程并发的最佳实践
使用本地变量
使用不可变类
最小化锁的作用于范围:s = 1 / (1 - a + a / n);
使用线程池的Executor,而不是直接new Thread
宁可使用同步也不要使用线程的wait和notify方法
使用blockingQueue实现生产-消费模式
使用并发集合而不是使用加了锁的同步集合
使用Semaphore创建有界的访问
宁可使用同步代码块,也不使用同步的方法
避免使用静态变量
高并发处理思路及手段
扩容
垂直扩容:提高系统部件能力,如加大内存
水平扩容:增加更多系统成员来实现,如增加机器
数据库扩容
读操作扩展:memcache,redis,CDN缓存等
写操作扩展:Cassandra,Hbase等
缓存
缓存特征
缓存命中率:命中数 / (命中数+未命中数)
最大空间:缓存清空策略
FIFO:淘汰最先创建的数据
LFU:最少使用次数,清除使用次数最少的数据
LRU:淘汰最远使用的数据的时间戳-热点数据
过期时间:过期时间最长的
Random:随机淘汰
影响缓存命中率的因素
业务场景和业务需求-读写
缓存的设计(粒度和过期策略)
缓存的容量和基础设施
缓存节点故障等问题–一致性哈希算法
缓存分类和应用场景:
本地缓存:编程实现,Guava Cache
分布式缓存:Memcached,Redis
Redis
高并发场景下存在的问题
缓存一致性
缓存并发问题
缓存穿透问题
- 缓存空对象
缓存雪崩现象
消息队列
消息队列特性
业务无关:只做消息分发
FIFO:先到先出
容灾:节点的动态增删和消息的持久化
性能:吞吐量提升,系统内部通信效率提高
消息队列的好处
业务解耦
最终一致性
广播
错峰与流控
队列实例
Kafka
高性能,跨语言,分布式发布订阅消息队列系统
快速持久化,O(1)
高吞吐
分布式系统
支持Hadoop数据加载
RabbitMQ
应用拆分-解决单个服务器处理上限的问题
应用拆分原则
业务优先
循序渐进
兼顾技术:重构,分层
可靠测试
拆分过程中的关注点
应用之间的通信:RPC or 消息队列 or …
应用之间的数据库设计:每个应用单独库
避免事务操作跨应用
应用拆分
服务化-Dubbo
微服务-springcloud
应用限流
算法
计数器法-滑动窗口的低精度情况
滑动窗口
漏桶算法-leaky Bucket
令牌桶算法-token Bucket-允许流量一定程度的突发
服务降级
服务降级:设置默认返回,解决处理不了的请求。
自动降级:超时,失败次数,故障,限流
人工降级:秒杀,双11大促等
服务熔断:过载保护
数据库分库分表
数据库瓶颈
单个库的数据量上限(1-2T)
单个数据库服务压力过大,读写瓶颈:多个库
单个表的数据量过大:分表
数据库切库
- 读写分离,降低主库的压力,多个从库通过负载均衡来调节
数据库分表-单个表的数据量过大
横向分表:同样结构的不同数据的表,id取模划分
纵向分表:将本来可以在一个表的内容认为的划分为多个表,根据不同列的数据的活跃数量等进行分配
高可用
任务调度系统分布式:elastic-job + zookeeper
主备切换:Apache curator + zookeeper分布式锁实现
监控报警机制