小黄

黄小黄的幸福生活!


  • 首页

  • 标签

  • 分类

  • 归档

  • Java

Java多线程-线程池

发表于 2019-04-21 | 分类于 Java , Multi-Thread

线程池定义

  • 多线程的程序可以最大限度的发挥多核CPU的性能,但是如果不对线程进行合理的管控,会影响整体系统的性能。线程虽然相对进程来说很轻量级,但是也要占据一定的内存,新建和销毁也需要时间,不能无节制的创建线程。
  • 线程池本质上是一组线程实时处理休眠状态,等待唤醒执行,使用线程池中,创建线程变成了从线程池中获取线程。
  • 线程池可以较少创建和销毁线程上所花的时间以及系统资源的开销,同时将当前任务和主线程任务隔离,实现和主线程的异步执行,提高程序的执行效率。

线程池的种类

JDK对线程池的支持如下:

线程池框架
(图片来源:http://www.blogjava.net/xylz/archive/2010/12/21/341281.html)

Java中提供了四个静态方法来创建一个线程池,在线程池中执行任务比为每个任务分配一个线程优势更多,通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊线程的创建和销毁的开销。当请求到达时,线程已经存在,提高了响应速度。通过配置线程池的大小,可以创建爱你足够多的线程充分利用CPU的多核资源,同时防止线程太多耗尽资源。

newFixedThreadPool()

  • 该方法返回一个固定线程数量的线程池。
  • 该线程池中的线程数量始终不变。
  • 当有一个新的任务提交时,线程池中若有空闲线程,则立即执行
  • 如果没有空闲线程,则会将任务保存在一个任务队列中,线程空闲时,处理线程任务队列中的数据

Executors中方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
* If any thread terminates due to a failure during execution
* prior to shutdown, a new one will take its place if needed to
* execute subsequent tasks. The threads in the pool will exist
* until it is explicitly {@link ExecutorService#shutdown shutdown}.
*
* @param nThreads the number of threads in the pool
* @return the newly created thread pool
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue, using the provided
* ThreadFactory to create new threads when needed. At any point,
* at most {@code nThreads} threads will be active processing
* tasks. If additional tasks are submitted when all threads are
* active, they will wait in the queue until a thread is
* available. If any thread terminates due to a failure during
* execution prior to shutdown, a new one will take its place if
* needed to execute subsequent tasks. The threads in the pool will
* exist until it is explicitly {@link ExecutorService#shutdown
* shutdown}.
*
* @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

使用实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo {
public static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ":Thread ID: " + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
MyTask task = new MyTask();
ExecutorService service = Executors.newFixedThreadPool(5);
// ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
service.execute(task);
}
}
}
/*
1535943816251:Thread ID: 11
1535943816251:Thread ID: 12
1535943816251:Thread ID: 13
1535943816251:Thread ID: 14
1535943816251:Thread ID: 15
1535943817253:Thread ID: 11
1535943817253:Thread ID: 13
1535943817253:Thread ID: 12
1535943817253:Thread ID: 14
1535943817253:Thread ID: 15
*/

newSingleThreadExecutor()

  • 该方法返回一个只有一个线程的线程池
  • 当有一个新的任务提交时,空闲则执行,否则等待执行
  • 确保线程之间的串行执行

Executors中方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue. (Note however that if this single
* thread terminates due to a failure during execution prior to
* shutdown, a new one will take its place if needed to execute
* subsequent tasks.) Tasks are guaranteed to execute
* sequentially, and no more than one task will be active at any
* given time. Unlike the otherwise equivalent
* {@code newFixedThreadPool(1)} the returned executor is
* guaranteed not to be reconfigurable to use additional threads.
*
* @return the newly created single-threaded Executor
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

使用实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo {
public static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ":Thread ID: " + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
MyTask task = new MyTask();
// ExecutorService service = Executors.newFixedThreadPool(5);
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
service.execute(task);
}
}
}
/*
1535944120085:Thread ID: 11
1535944120086:Thread ID: 12
1535944120086:Thread ID: 13
1535944120086:Thread ID: 14
1535944120087:Thread ID: 15
1535944120087:Thread ID: 16
1535944120088:Thread ID: 17
1535944120088:Thread ID: 18
1535944120088:Thread ID: 19
1535944120088:Thread ID: 20
*/

newCachedThreadPool()

  • 返回一个可根据实际情况调整线程数量的线程池
  • 线程池中线程的数量不确定
  • 任务提交时,复用线程优于新建线程

Executors中方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

使用实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolDemo {
public static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis() + ":Thread ID: " + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
MyTask task = new MyTask();
// ExecutorService service = Executors.newFixedThreadPool(5);
// ExecutorService service = Executors.newCachedThreadPool();
ExecutorService service = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
service.execute(task);
}
}
}
/*
1535944197414:Thread ID: 11
1535944198425:Thread ID: 11
1535944199425:Thread ID: 11
1535944200425:Thread ID: 11
1535944201426:Thread ID: 11
1535944202426:Thread ID: 11
1535944203426:Thread ID: 11
1535944204427:Thread ID: 11
1535944205427:Thread ID: 11
1535944206428:Thread ID: 11
*/

newScheduleThreadPool()

  • 返回一个ScheduledExecutorService对象
  • 增加了定时执行的功能,如延时执行或者周期执行

Executors中方法

1
2
3
4
5
6
7
8
9
10
11
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

使用实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 计划任务
*/
public class ScheduledExecutorServiceDemo {
public static void main(String[] args) {
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
ses.scheduleAtFixedRate((Runnable) () -> {
try {
Thread.sleep(2000);
System.out.println(System.currentTimeMillis()/1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 0, 8, TimeUnit.SECONDS);
}
}
/*
1535945140
1535945148
1535945156
1535945164
*/

四种定时执行的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/**
* 延时执行
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
/**
* 带饭后之的延迟执行
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
/**
* 按照指定频率周期执行某个任务
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
/**
* 周期定时执行某个任务/按指定频率间隔执行魔鬼任务
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}

submit

  • submit()与execute()类似,区别在于一个有返回值一个没有
  • submit()适合生产者消费者模式,与Future一起使用,可以在没有返回结果时阻塞当前线程的作用
  • 实现源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
* Future's {@code get} method will return the task's result upon
* successful completion.
*
* <p>
* If you would like to immediately block waiting
* for a task, you can use constructions of the form
* {@code result = exec.submit(aCallable).get();}
*
* <p>Note: The {@link Executors} class includes a set of methods
* that can convert some other common closure-like objects,
* for example, {@link java.security.PrivilegedAction} to
* {@link Callable} form so they can be submitted.
*
* @param task the task to submit
* @param <T> the type of the task's result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Callable<T> task);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return the given result upon successful completion.
*
* @param task the task to submit
* @param result the result to return
* @param <T> the type of the result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Runnable task, T result);
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<?> submit(Runnable task);

ThreadPooLExecutor类

  • 线程池中最重要的类

基本参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 自动装箱和拆箱
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 线程池状态显示
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
// 线程池缓冲队列:当线程池线程数量超过corePoolSize时,待运行的线程会放入到这个队列中
private final BlockingQueue<Runnable> workQueue;
// 重入锁,更新核心线程池代销,最大线程池大小时进行加锁
private final ReentrantLock mainLock = new ReentrantLock();
// 运行线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// awaitTermination支持
private final Condition termination = mainLock.newCondition();
// 用来记录线程池中曾经出现过的最大线程数
private int largestPoolSize;
// 完成的线程数量
private long completedTaskCount;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 拒绝Handler
private volatile RejectedExecutionHandler handler;
// 线程执行完毕后在线程池中的缓存时间,超过这个时长,线程会被回收
private volatile long keepAliveTime;
// 是否开启keepAliveTime
private volatile boolean allowCoreThreadTimeOut;
// 核心线程池大小
private volatile int corePoolSize;
// 最大线程池大小
private volatile int maximumPoolSize;

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
  • 缓存队列的类型

    • ArrayBlockingQueue : 有界的数组队列
    • LinkedBlockingQueue : 可支持有界/无界的队列,使用链表实现
    • PriorityBlockingQueue : 优先队列,可以针对任务排序
    • SynchronousQueue : 队列长度为1的队列,和Array有点区别就是:client thread提交到block queue会是一个阻塞过程,直到有一个worker thread连接上来poll task。当线
  • 拒绝策略

    • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

execute

  • 表示向线程池中添加线程,可能会立即执行,也可能不会。
  • 无法预知线程的开始和结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 如果运行线程小于corePoolSize,则创建一个新的线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果运行的线程等于corePoolSize,则进入workQueue等待队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 不能进入等待队列,尝试创建一个新的线程,失败则执行拒绝任务
else if (!addWorker(command, false))
reject(command);
}
  • ThreadPoolExecutor中,包含了一个任务缓存队列和若干个执行线程,任务缓存队列是一个大小固定的缓冲区队列,用来缓存待执行的任务,执行线程用来处理待执行的任务。每个待执行的任务,都必须实现Runnable接口,执行线程调用其run()方法,完成相应任务。
  • ThreadPoolExecutor对象初始化时,不创建任何执行线程,当有新任务进来时,才会创建执行线程。
  • 构造ThreadPoolExecutor对象时,需要配置该对象的核心线程池大小和最大线程池大小:
  • 当目前执行线程的总数小于核心线程大小时,所有新加入的任务,都在新线程中处理
  • 当目前执行线程的总数大于或等于核心线程时,所有新加入的任务,都放入任务缓存队列中
  • 当目前执行线程的总数大于或等于核心线程,并且缓存队列已满,同时此时线程总数小于线程池的最大大小,那么创建新线程,加入线程池中,协助处理新的任务。
  • 当所有线程都在执行,线程池大小已经达到上限,并且缓存队列已满时,就rejectHandler拒绝新的任务

shutdown

  • 放在execute后面
  • 表示当前线程池不接受新添加的线程
  • 所有线程执行完毕时,回收线程池资源
  • 不会马上关闭线程池,执行完已经提交的线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException {@inheritDoc}
*/
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

shutdownNow

  • 立刻关闭线程池,会引起系统数据异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution. These tasks are drained (removed)
* from the task queue upon return from this method.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. This implementation
* cancels tasks via {@link Thread#interrupt}, so any task that
* fails to respond to interrupts may never terminate.
*
* @throws SecurityException {@inheritDoc}
*/
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @program: 03_JDK_Concurence_Package
* @author: cdx
* @create: 2018-09-03 12:34
**/
public class ThreadPoolTaskDemo {
public static final int PRODUCT_TASK_SLEEP_TIME = 2;
public static final int PRODUCT_TASK_MAX_NUMBER = 10;
public static void main(String[] args) {
// 创建一个线程池,核心线程池大小为2,最大线程池大小为4,线程缓存3秒钟,等待队列数组,拒绝侧率直接抛弃
ThreadPoolExecutor executor = new ThreadPoolExecutor(2,
4,
3,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i <= PRODUCT_TASK_MAX_NUMBER; i++) {
try {
String task = "task@ " + i;
System.out.println("put " + task);
executor.execute(new ThreadPooLTask(task));
Thread.sleep(PRODUCT_TASK_SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class ThreadPooLTask implements Runnable, Serializable {
private Object threadPoolTskData;
public ThreadPooLTask(Object threadPoolTskData) {
this.threadPoolTskData = threadPoolTskData;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "start .." + threadPoolTskData);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
threadPoolTskData = null;
}
public Object getTask() {
return this.threadPoolTskData;
}
}
/*
put task@ 0
pool-1-thread-1start ..task@ 0
put task@ 1
pool-1-thread-2start ..task@ 1
put task@ 2
put task@ 3
put task@ 4
put task@ 5
pool-1-thread-3start ..task@ 5
put task@ 6
pool-1-thread-4start ..task@ 6
put task@ 7
put task@ 8
put task@ 9
put task@ 10
pool-1-thread-1start ..task@ 8
pool-1-thread-2start ..task@ 9
pool-1-thread-3start ..task@ 10
*/

Java多线程-锁机制

发表于 2019-04-21 | 分类于 Java , Multi-Thread

锁与同步

  1. 同步synchronized用来修饰方法,lock是一个实例变量,通过调用lock()方法来取得锁
  2. 同步和锁都是针对方法,不是同步变量和类
  3. 同步无法保证线程取得方法执行的先后顺序,锁可以设置公平锁来确保
  4. 不必同步类中的所有方法,类可以同时拥有同步和非同步方法
  5. 如果线程拥有同步和非同步方法,则非同步方法和同步方法被多个线程自由访问而不受锁的限制
  6. 线程睡眠时,持有的锁不会被释放
  7. 线程可以获得多个锁,比如在一个对象的同步方法里面调用另一个对象的同步方法,则获取了两个对象的锁。
  8. 同步损害并发性,应该尽可能缩小同步的范围,同步不但可以同步整个方法,还可以同步方法中的一部分代码
  9. 在使用同步代码块的时候,应该指定在哪个对象上同步,也就是要获得哪个对象的锁

锁的实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.locks.ReentrantLock;
public class ReenterLock implements Runnable{
public static ReentrantLock lock = new ReentrantLock();
public static int i = 0;
@Override
public void run() {
for (int j = 0; j < 10000000; j++) {
lock.lock();
try {
i++;
}finally {
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
ReenterLock reenterLock = new ReenterLock();
Thread thread = new Thread(reenterLock);
Thread thread1 = new Thread(reenterLock);
thread.start();
thread1.start();
thread.join();
thread1.join();
System.out.println(i);
}
}

Lock接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface Lock {
// 获得锁,lock会忽视interrupt,拿不到锁会一直阻塞
void lock();
// 获取锁,相应interrupt()并抛出异常,跳出阻塞
void lockInterruptibly() throws InterruptedException;
// 尝试获取锁,成功返回true
boolean tryLock();
// 在指定时间内尝试获取锁,成功则返回true
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// 条件状态,阻塞队列中使用
Condition newCondition();
}

ReentrantLock重入锁

成员变量

1
2
3
4
5
6
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronize提供所有同步机制的实现 */
private final Sync sync;
...
}

只有一个成员变量sync,其抽象类如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
// 获取锁
abstract void lock();
// 尝试获取非公平锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 尝试释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

构造函数

1
2
3
4
5
6
7
8
9
// 默认创建非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 创建公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}

lock方法

1
2
3
4
// sync的lock
public void lock() {
sync.lock();
}

lockInterruptibly方法

1
2
3
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

非公平锁的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 非公平锁的实现是继承Sync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 获取锁的方法
final void lock() {
if (compareAndSetState(0, 1))
// 设置独占模式,也就是一个锁只能被一个线程独占
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 没有获取锁,尝试使用信号量的方式
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// compareAndSwapInt这个方法不是Java实现的,而是通过JNI来调用操作系统的原生程序
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

tryLock方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取状态
int c = getState();
if (c == 0) {
// 如果没有获取到锁,CAS设置状态
if (compareAndSetState(0, acquires)) {
// 独占线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 当前线程持有这个锁,设置重入次数
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 未获取到锁,退出
return false;
}

其中getState获取state的值,0表示未获取到锁,1表示已经获取到锁,大于1表示重入数

tryLock(long timeout, TimeUnit unit)方法

1
2
3
4
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

该方法实际上调用的是Sync父类AbstractQueuedSynchronizer的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
// 循环等待获取锁的方法
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
// 放入等待节点,组成一个链表
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
// 当前节点为头结点,尝试获取锁
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
// 超时退出
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 未超时
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
// 检测到中断
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

ReadWrite读写锁

  • 读写锁的使用场景
    • 多线程同时读取临界值,单一线程写
    • 阻塞同时读写
读 写
读 非阻塞 阻塞
写 阻塞 阻塞

实例

  • 提交18个读任务,2个写任务,使用重入锁耗时奖金20毫秒,使用读写锁耗时3秒左右
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* 读写锁,允许多个线程同时读取
*/
public class ReadWriteLockDemo {
private static Lock lock = new ReentrantLock();
private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static Lock readLock = readWriteLock.readLock();
private static Lock writeLock = readWriteLock.writeLock();
private int value;
private static CountDownLatch cd = new CountDownLatch(20);
public Object handleRead(Lock lock) throws InterruptedException {
try {
//模拟读操作
lock.lock();
Thread.sleep(1000);
return value;
} finally {
lock.unlock();
}
}
public void handleWrite(Lock lock, int index) throws InterruptedException {
try {
lock.lock();
Thread.sleep(1000);
value=index;
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
final ReadWriteLockDemo demo = new ReadWriteLockDemo();
Runnable readRunnable = () -> {
try {
//读写锁
// demo.handleRead(readLock);
demo.handleRead(lock);
} catch (InterruptedException e) {
e.printStackTrace();
}
cd.countDown();
};
Runnable writeRunnable = () -> {
try {
demo.handleWrite(writeLock, new Random().nextInt());
} catch (InterruptedException e) {
e.printStackTrace();
}
cd.countDown();
};
long start = System.currentTimeMillis();
for (int i = 0; i < 18; i++) {
Thread thread = new Thread(readRunnable);
thread.start();
}
for (int i = 18; i < 20; i++) {
Thread thread = new Thread(writeRunnable);
thread.start();
}
cd.await();
System.out.println("the program cost : " + (System.currentTimeMillis() - start) + " ms");
}
}
/*
* 读写锁:the program cost : 3020 ms
* 重入锁:the program cost : 18156 ms
*/

源码解读

ReadWriteLock接口

1
2
3
4
5
6
7
public interface ReadWriteLock {
// 读锁
Lock readLock();
// 写锁
Lock writeLock();
}

ReentrantReadWriteLock类

成员变量

1
2
3
4
5
6
7
8
9
10
11
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
// 内部类提供的读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 内部类提供的写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
// 实现同步操作的机制
final Sync sync;
//...
}

构造函数

1
2
3
4
5
6
7
8
9
10
11
// 非公平锁构造方法
public ReentrantReadWriteLock() {
this(false);
}
// 公平锁构方法
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

ReadLock内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
// Sync,和重入锁一样,同步机制由Sync来实现
private final Sync sync;
// 构造方法,传入sync的实例
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 获取共享锁
public void lock() {
sync.acquireShared(1);
}
// 相应中断,跳出阻塞
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 尝试获取锁
public boolean tryLock() {
return sync.tryReadLock();
}
// 在指定时间内尝试获取锁
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 释放锁
public void unlock() {
sync.releaseShared(1);
}
// 条件变量
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}

WriteLock内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
// sync同步同步机制的实现
private final Sync sync;
// 构造方法
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 获取锁
public void lock() {
sync.acquire(1);
}
// 响应中断
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
// 尝试获取锁,成功返回true
public boolean tryLock( ) {
return sync.tryWriteLock();
}
// 在指定时间内尝试获取锁
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// 释放锁
public void unlock() {
sync.release(1);
}
// 条件变量
public Condition newCondition() {
return sync.newCondition();
}
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
// 是否被当前线程所占据
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
// 返回写锁被当前线程持有的次数
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}

CountDownLatch

  • 倒计数器,控制线程等待,直到倒计数器为0,线程继续开始执行。

实例

  • 火箭发射中需要进行各个仪器的检查,只有所有的都正常,才可以发射
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 倒计时器,让某一个线程等待倒计时器结束再开始执行
*/
public class CountDownLatchDemo implements Runnable{
private static final CountDownLatch end = new CountDownLatch(10);
private static final CountDownLatchDemo demo = new CountDownLatchDemo();
@Override
public void run(){
try {
Thread.sleep(new Random().nextInt(10) * 100);
System.out.println("check complete");
end.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executorService.submit(demo);
}
end.await();
System.out.println("Fire");
executorService.shutdown();
}
}
/*
check complete
check complete
check complete
check complete
check complete
check complete
check complete
check complete
check complete
check complete
Fire
*/
  • 赛跑或者赛车问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 赛跑问题countDownLatch解决方法
**/
public class Player implements Runnable{
private int id;
private CountDownLatch begin;
private CountDownLatch end;
public Player(int id, CountDownLatch begin, CountDownLatch end) {
this.id = id;
this.begin = begin;
this.end = end;
}
@Override
public void run() {
try {
begin.await();
long startTime = System.currentTimeMillis();
System.out.printf("Player num %d start to run at %d!\n", id, startTime);
Thread.sleep(new Random().nextInt(100)); //随机分配时间,即运动员完成时间
System.out.printf("Player num %d finish game, time : %d!\n", id, System.currentTimeMillis() - startTime);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
end.countDown();
}
}
public static void main(String[] args) {
CountDownLatch begin = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(5);
Player[] players = new Player[5];
for (int i = 0; i < 5; i++) {
players[i] = new Player(i+1, begin, end);
}
ExecutorService es = Executors.newFixedThreadPool(5);
for (Player player : players) {
es.execute(player);
}
System.out.println("begin race");
begin.countDown();
try {
end.await();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("Race ends!");//宣布比赛结束
}
es.shutdown();
}
}
/*
begin race
Player num 1 start to run at 1536030032314!
Player num 5 start to run at 1536030032315!
Player num 4 start to run at 1536030032315!
Player num 3 start to run at 1536030032315!
Player num 2 start to run at 1536030032315!
Player num 4 finish game, time : 31!
Player num 1 finish game, time : 43!
Player num 2 finish game, time : 75!
Player num 5 finish game, time : 103!
Player num 3 finish game, time : 109!
Race ends!
*/

CyclicBarrier

  • 可以实现线程之间的等待,可以循环使用,如十个一组执行的场景
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* CycleBarrier,循环栅栏,可以反复使用的计数器
*/
public class CyclicBarrierDemo {
public static class Soldier implements Runnable {
private String soldier;
private final CyclicBarrier cyclicBarrier;
public Soldier(String soldier, CyclicBarrier cyclicBarrier) {
this.soldier = soldier;
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
try {
//等待所有人到齐
cyclicBarrier.await();
doWork();
//等待所有完成工作
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
private void doWork() {
try {
Thread.sleep(Math.abs(new Random().nextInt()%10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(soldier + ":任务完成");
}
}
public static class BarrierRun implements Runnable {
boolean flag;
int N;
public BarrierRun(boolean flag, int n) {
this.flag = flag;
N = n;
}
@Override
public void run() {
if (flag) {
System.out.println("司令:【士兵" + N + "个,任务完成!】");
} else {
System.out.println("司令:【士兵" + N + "个,集结完毕!】");
flag = true;
}
}
}
public static void main(String[] args) {
final int N = 10;
Thread[] allSoldier = new Thread[N];
boolean flag = false;
CyclicBarrier cyclic = new CyclicBarrier(N, new BarrierRun(flag, N));
//设置屏障点
System.out.println("集合队伍!");
for (int i = 0; i < N; i++) {
System.out.println("士兵" + i + "报道!");
allSoldier[i] = new Thread(new Soldier("士兵" + i + "报道!", cyclic));
allSoldier[i].start();
}
}
}
/*
集合队伍!
士兵0报道!
士兵1报道!
士兵2报道!
士兵3报道!
士兵4报道!
士兵5报道!
士兵6报道!
士兵7报道!
士兵8报道!
士兵9报道!
司令:【士兵10个,集结完毕!】
士兵9报道!:任务完成
士兵0报道!:任务完成
士兵8报道!:任务完成
士兵3报道!:任务完成
士兵7报道!:任务完成
士兵2报道!:任务完成
士兵1报道!:任务完成
士兵5报道!:任务完成
士兵4报道!:任务完成
士兵6报道!:任务完成
司令:【士兵10个,任务完成!】
*/

LockSupport

  • LockSupport类似于suspend(),可以让一个线程在任意位置阻塞,不用获取锁,也不会抛出异常
  • park方法阻塞当前线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.concurrent.locks.LockSupport;
public class LockSupportDemo {
public static Object u = new Object();
static ChangeObjectThread t1 = new ChangeObjectThread("t1");
static ChangeObjectThread t2 = new ChangeObjectThread("t2");
public static class ChangeObjectThread extends Thread {
public ChangeObjectThread(String name) {
super.setName(name);
}
@Override
public void run() {
synchronized (u) {
System.out.println("in " + getName());
LockSupport.park();
}
}
}
public static void main(String[] args) throws InterruptedException {
t1.start();
Thread.sleep(100);
t2.start();
LockSupport.unpark(t1);
LockSupport.unpark(t2);
t1.join();
t2.join();
}
}

Java多线程-ThreadLocal

发表于 2019-04-21 | 分类于 Java , Multi-Thread

ThreadLocal源码解读

基本参数

1
2
3
4
5
6
7
// 标识每一个ThreadLocal的唯一性
private final int threadLocalHashCode = nextHashCode();
// 自增哈希值
private static AtomicInteger nextHashCode =
new AtomicInteger();
// 增量
private static final int HASH_INCREMENT = 0x61c88647;

构造函数

  • 无参构造函数
1
2
3
4
5
6
/**
* Creates a thread local variable.
* @see #withInitial(java.util.function.Supplier)
*/
public ThreadLocal() {
}

get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 获取线程变量的副本
* @return the current thread's value of this thread-local
*/
public T get() {
Thread t = Thread.currentThread();
// getMap方法从线程t中获取threadLocalMap,threadLocalMap是线程的一个成员变量threadLocals,每一个线程有一个自己的ThreadLocalMap对象来维护自己的变量
ThreadLocalMap map = getMap(t);
if (map != null) {
// 第二次调用map获取Entry
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 第一次调用get方法会调用setInitialValue方法
return setInitialValue();
}

getMap返回一个ThreadLocalMap类型的值

1
2
3
4
5
6
7
8
9
10
/**
* Get the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @return the map
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

实际上第一次调用,map为空,调用setInitialValue()方法来初始化map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Variant of set() to establish initialValue. Used instead
* of set() in case user has overridden the set() method.
*
* @return the initial value
*/
private T setInitialValue() {
// 实际上初始化map的地方,需要重写initialvalue方法
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
// 调用createMap来创建map
createMap(t, value);
return value;
}
/**
* 重写之后的返回值作为ThreadLocal的value值
* @return the initial value for this thread-local
*/
protected T initialValue() {
return null;
}

createMap方法实际上完成ThreadLcoalMap的初始化,每个Thread都有一个ThreadLocal.ThreadLocalMap。其中key为ThreadLocal这个实例,value为每次initialValue()得到的变量。

1
2
3
4
5
6
7
8
9
10
11
/**
* Create the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @param firstValue value for the initial entry of the map
*/
void createMap(Thread t, T firstValue) {
// 创建ThreadLocalMap
t.threadLocals = new ThreadLocalMap(this, firstValue);
}

第二次调用get方法,会进入if里面,调用getEntry方法获取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Get the entry associated with key. This method
* itself handles only the fast path: a direct hit of existing
* key. It otherwise relays to getEntryAfterMiss. This is
* designed to maximize performance for direct hits, in part
* by making this method readily inlinable.
*
* @param key the thread local object
* @return the entry associated with key, or null if no such
*/
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}

set方法

1
2
3
4
5
6
7
8
9
10
11
12
public void set(T value) {
// 取得当前线程
Thread t = Thread.currentThread();
// 获取当前线程的ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null)
// map不为空,直接传值
map.set(this, value);
else
// map为空,则创建map并将初始值传入
createMap(t, value);
}

remove方法

1
2
3
4
5
6
7
8
9
10
11
/**
* Removes the current thread's value for this thread-local
* variable.
*/
public void remove() {
// 获取当前线程的ThreadLocalMap
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
// 移除当前实例的ThreadLocal
m.remove(this);
}

总结

  1. 每个线程都有自己的局部变量,一个线程的本地变量对于其他线程是不可见的
  2. 独立于变量的初始化副本,ThreadLocal可以给一个初始值,而每个线程都会获得到这个初始化值的一个副本,这样能保证不同的线程都有一份拷贝
  3. 状态与某一个线程相关联,ThreadLocal不是用于解决共享变量的问题的,不是为了协调线程同步而存在的,而是为了方便每个线程处理自己的状态而引入的一个机制。

ThreadLocal实例

  • 格式化时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package third;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadLocalDemo {
//SimpleDateFormat线程安全
// private static final SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static final ThreadLocal<SimpleDateFormat> t1= new ThreadLocal<>();
public static class ParseDate implements Runnable {
int i = 0;
public ParseDate(int i) {
this.i = i;
}
@Override
public void run() {
try {
// Date t = sdf.parse("2015-03-29 19:59:" + i%60);
if (t1.get() == null)
t1.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
Date t = t1.get().parse("2015-03-29 19:59:" + i%60);
System.out.println(i+ ":" + t);
} catch (ParseException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
ExecutorService ex = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
ex.execute(new ParseDate(i));
}
}
}
  • Gc演示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package third;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadLocalDemo_Gc {
static volatile ThreadLocal<SimpleDateFormat> t1 = new ThreadLocal<SimpleDateFormat>() {
protected void finalize() throws Throwable {
System.out.println(this.toString() + "is gc");
}
};
static volatile CountDownLatch cd = new CountDownLatch(10000);
public static class ParseDate implements Runnable {
int i = 0;
public ParseDate(int i) {
this.i = i;
}
@Override
public void run() {
try {
if (t1.get() == null) {
t1.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") {
protected void finalize() throws Throwable {
System.out.println(this.toString() + "is gc");
}
});
System.out.println(Thread.currentThread().getId() + ":create SimpleDateFormat");
}
Date t = t1.get().parse("2015-03-29 19:29:" + i % 60);
} catch (ParseException e) {
e.printStackTrace();
} finally {
cd.countDown();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10000; i++) {
es.execute(new ParseDate(i));
}
cd.await();
System.out.println("mission complete!!");
t1 = null;
System.gc();
System.out.println("first GC compile!!");
t1 = new ThreadLocal<SimpleDateFormat>();
cd = new CountDownLatch(10000);
for (int i = 0; i < 10000; i++) {
es.execute(new ParseDate(i));
}
cd.await();
Thread.sleep(1000);
System.gc();
System.out.println("Second GC compile!");
}
}

Java多线程-Future

发表于 2019-04-21 | 分类于 Java , Multi-Thread

Future模式

  • Future模式是多线程开发中的常见的一种设计模式,核心思想是异步调用。当我们调用一个函数方法时,如果函数执行的很慢,需要等待。很多情况上,我们不需要立刻获取结果,而是让函数方法先返回,后台去计算结果,等到需要使用时,再去尝试获取数据。

Future接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public interface Future<V> {
//试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程。此方法返回后,对 isDone() 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled()的后续调用将始终返回 true。
boolean cancel(boolean mayInterruptIfRunning);
// 任务正常完成前取消,返回true
boolean isCancelled();
// 任务是否正常结束
boolean isDone();
// 等待线程结果的返回,阻塞线程
V get() throws InterruptedException, ExecutionException;
// 设置超时时间
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask

  • FutureTask类是Future 的一个实现,并实现了RunnableFuture
  • 可通过Excutor(线程池) 来执行,也可传递给Thread对象执行。
  • 如果在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。
  • Executor框架利用FutureTask来完成异步任务,并可以用来进行任何潜在的耗时的计算。一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
  • FutureTask类既可以使用new Thread(Runnable r)放到一个新线程中跑,也可以使用ExecutorService.submit(Runnable r)放到线程池中跑,而且两种方式都可以获取返回结果,但实质是一样的,即如果要有返回结果那么构造函数一定要注入一个Callable对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class FutureTask<V> implements RunnableFuture<V> {
// 任务状态,及状态转换:
// * NEW -> COMPLETING -> NORMAL
// * NEW -> COMPLETING -> EXCEPTIONAL
// * NEW -> CANCELLED
// * NEW -> INTERRUPTING -> INTERRUPTED
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
/**
* 构造方法,传入Calable接口实现类
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/**
* 构造方法,传入Runnable接口实现类
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
// 。。。。
}

应用实例

Future模式的参与者:

参与者 作用
Main 系统启动,调用Client发出请求
Client 返回Data对象,立即返回FutureData,并开启ClientThread线程装配RealData
FutureData 返回数据的接口
RealData 真实数据

Date接口

1
2
3
public interface Data {
public String getResult();
}

FutureData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class FutureData implements Data{
protected RealData realData = null;
protected boolean isReady = false;
public synchronized void setRealData(RealData realData) {
if (isReady) {
return;
}
this.realData = realData;
isReady = true;
notifyAll();
}
@Override
public String getResult() {
while (!isReady) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return realData.result;
}
}

RealData

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class RealData implements Data{
protected final String result;
public RealData(String result) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < 10; i++) {
sb.append(result);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.result = sb.toString();
}
@Override
public String getResult() {
return result;
}
}

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Main {
public static void main(String[] args) {
Client client = new Client();
//会立即返回,得到FutureData
Data data = client.request("name");
System.out.println("请求完毕");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("数据 = " + data.getResult());
}
}
public class Client {
public Data request(final String queryStr) {
final FutureData futureData = new FutureData();
new Thread(() -> {
RealData realData = new RealData(queryStr);
futureData.setRealData(realData);
}).start();
return futureData;
}
}

Java多线程-CAS比较交换

发表于 2019-04-21 | 分类于 Java , Multi-Thread

CAS

  • CAS:compare and swap,比较交换
  • CAS有三个参数CAS(V,E,N)
    • V表示要更新的变量
    • E表示预期值
    • N表示新值
    • 只有V=E相等时,才会将V更新为N,否则说明其他线程进行了更新,什么也不做。
    • 失败会被告知,允许再次尝试
1
2
3
4
5
6
7
8
public int a = 1;
public boolean compareAndSwapInt(int b) {
if (a == 1) {
a = b;
return true;
}
return false;
}
  • 上面的代码中,多个线程同时更改a的值时,无法确定a最后的值,对compareAndSwapInt进行加锁,使其成为一个原子操作,同一个时刻只有一个线程才可以改变a的值。
  • CAS中的比较和替换是一组原子操作,不会被外部打断,先根据paramLong回去到当前内存中的值V,然后将内存值V和原值E做比较,相等则将V替换为N,效率要高于加锁的操作。

AtomicInteger

  • 无锁的线程安全整数类

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// java通过native来访问底层操作系统,Unsafe类提供了硬件级别的原子操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
// 变量值在内存中的偏移地址
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
// value使用volatile修饰,从而多个线程之间可见
private volatile int value;
// ...
}

构造方法

1
2
3
4
5
6
7
public AtomicInteger(int initialValue) {
value = initialValue;
}
public AtomicInteger() {
}

普通方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 读
public final int get() {
return value;
}
// 写
public final void set(int newValue) {
value = newValue;
}
// 延迟写
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
}

原子方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
原子操作,更新值,返回旧值
*/
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
public final int getAndSetInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var4));
return var5;
}
// 更新值,成功返回true
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
// 原子累加
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
// 原子递减
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
// 原子增加
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}

实例

  • AtomicInteger累加
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package third;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicIntegerDemo {
private static AtomicInteger integer = new AtomicInteger();
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(10);
CountDownLatch cd = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
service.execute(new AddThread(cd, integer));
}
cd.await();
System.out.println(integer);
}
}
class AddThread implements Runnable {
private CountDownLatch cd;
private AtomicInteger integer;
public AddThread(CountDownLatch cd, AtomicInteger integer) {
this.cd = cd;
this.integer = integer;
}
@Override
public void run() {
for (int k = 0; k < 10000; k++)
integer.incrementAndGet();
cd.countDown();
}
}
// 100000
  • AtomicIntegerArray累加
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class AtomicIntegerArrayDemo {
public static void main(String[] args) throws InterruptedException {
AtomicIntegerArray integers = new AtomicIntegerArray(10);
CountDownLatch latch = new CountDownLatch(10);
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++)
service.execute(new AddArrThread(integers, latch));
latch.await();
System.out.println(integers);
}
}
class AddArrThread implements Runnable {
private AtomicIntegerArray integers;
private CountDownLatch cd;
public AddArrThread(AtomicIntegerArray integers, CountDownLatch cd) {
this.integers = integers;
this.cd = cd;
}
@Override
public void run() {
for (int k = 0; k < 10000; k++)
integers.getAndIncrement(k % (integers.length()-2));
cd.countDown();
}
}

Java 反射

发表于 2019-04-21 | 分类于 Java

什么是反射

Java反射机制是在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法,对于任意一个对象,都能过调用他的任意一个方法和属性。这种动态获取的信息以及动态调用对象的方法的功能成为Java语言的反射机制。

反射其实就是把Java类中的各种成分映射成一个个Java的对象。比如一个类包括成员变量,方法,构造方法,接口,包等信息,利用反射技术可以对一个类进行解剖,把各个组成部分映射成一个个对象进行操作。首先需要了解类的加载过程,

类加载过程和对象创建过程

类加载过程

  1. JVM会先去方法区中找有没有相应类的.class存在,如果有直接使用就可以,如果没有,需要把相关类的.class文件加载到方法区。

  2. .class文件加载到方法区时,会分成两部分加载,先加载非静态内容,再加载静态内容

  3. 加载非静态内容:把.class中的所有非静态内容加载到方法区下的非静态区域内

  4. 加载静态内容:

    • 把.class文件中的所有静态内容加载到方法区的静态区域内

    • 静态内容加载完成后,对所有静态变量进行默认初始化

    • 所有的静态变量默认初始化完成之后,再进行显示初始化。

    • 当静态区域下的所有静态变量显示初始化完后,执行静态代码段。

  5. 静态区域下的静态代码快执行完毕之后,整个类的加载就完成了。

对象创建过程

  1. 在堆内存中开辟一块空间

  2. 给开辟的空间分配一个地址

  3. 把对象的所有非静态成员变量加载到所开辟的空间下

  4. 所有的非静态成员加载完成之后,对所有的非静态成员变量进行默认初始化

  5. 默认初始化完成之后,调用构造函数

  6. 构造函数入栈执行时,先执行构造函数中的隐式三步,再执行构造函数。

    • 隐式三步:

      • 执行super语句

      • 对开辟空间下所有非静态成员变量进行显示初始化

      • 执行构造代码块

  7. 构造函数执行完毕并弹栈之后,把空间分配的地址赋值给一个引用对象。

Class类

  1. Class类实现了Serializable, AnnotatedElement, GenericDeclaration, Type这四个接口

  2. final函数头,不可继承

    1
    2
    3
    public final class Class<T>
    extends Object
    implements Serializable, GenericDeclaration, Type, AnnotatedElement
  3. Class类实例表示Java程序中运行的类或者接口,枚举是类的一种而注解是接口的一种。同样数组也是一个可以被反射为Class对象,从而被所有数组来共享其维度和数据。8种基本类型也可以看做为class对象。

  4. 需要注意的是Class类没有公有的构造方法,Class对象是有JVM及ClassLoader记载类的过程中自动创建的。

  5. 方法列表

    • 静态方法
返回值 方法及描述
static Class<?> forName(String className) Returns the Class object associated with the class or interface with the given string name.
static Class<?> forName(String name, boolean initialize, ClassLoader loader) Returns the Class object associated with the class or interface with the given string name, using the given class loader.
  • 常用实例方法
返回值 方法及描述
Class<?>[] getClasses() Returns an array containing Class objects representing all the public classes and interfaces that are members of the class represented by this Class object
ClassLoader getClassLoader() Returns the class loader for the class.
Constructor getConstructor(Class<?>… parameterTypes) Returns a Constructor object that reflects the specified public constructor of the class represented by this Class object.
Field getField(String name) Returns a Field object that reflects the specified public member field of the class or interface represented by this Class object.
Class<?>[] getInterfaces() Determines the interfaces implemented by the class or interface represented by this object.
Method getMethod(String name, Class<?>… parameterTypes) Returns a Method object that reflects the specified public member method of the class or interface represented by this Class object.
T newInstance() Creates a new instance of the class represented by this Class object.

反射的使用

实际上,我们获取Class对象的方法有三种:

  1. Object对象的getClass()方法

  2. 任何数据类型都有一个都有一个“静态”的class属性

  3. 通过Class类的静态方法:forName(String className)

实际中我们需要使用反射来动态的生成对象,而如果已经有了对象也就不需要反射了,多以第一种很少用,第二种需要提前有类的包,同样导入了类的包可以直接创建。因此常用的是第三种,通过一个字符串来将要反射的类的信息传递进来动态生成对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package reflect;
/**
* Created by cdx0312
* 2018/4/20
*/
public class GetClassMethod {
public static void main(String[] args) throws ClassNotFoundException {
//通过getClass方法
Student s1 = new Student();
Class stuClass = s1.getClass();
System.out.println(stuClass.getName());
//通过类来获得Class对象
Class stuClass2 = Student.class;
System.out.println(stuClass2.getName());
// 通过forName来获取
Class stuClass3 = Class.forName("reflect.Student");
System.out.println(stuClass3.getName());
System.out.println(stuClass3 == stuClass2);
}
}

通过反射获取构造方法并使用

Student类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package reflect;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
/**
* Created by cdx0312
* 2018/4/20
*/
public class Student {
//(默认的构造方法)
Student(String str){
System.out.println("(默认)的构造方法 s = " + str);
}
//无参构造方法
public Student(){
System.out.println("调用了公有、无参构造方法执行了。。。");
}
//有一个参数的构造方法
public Student(char name){
System.out.println("姓名:" + name);
}
//有多个参数的构造方法
public Student(String name ,int age){
System.out.println("姓名:"+name+"年龄:"+ age);
}
//受保护的构造方法
protected Student(boolean n){
System.out.println("受保护的构造方法 n = " + n);
}
//私有构造方法
private Student(int age){
System.out.println("私有的构造方法 年龄:"+ age);
}
public static void main(String[] args) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
Object obj;
// 1. 加载Class对象
Class<?> aClass = Class.forName("reflect.Student");
// 2. 获取所有的公有构造方法
Constructor<?>[] constructors = aClass.getConstructors();
System.out.println("公有的构造方法:");
for (Constructor constructor : constructors)
System.out.println(constructor);
//3. 获得所有的构造方法
Constructor<?>[] declaredConstructors = aClass.getDeclaredConstructors();
System.out.println("所有声明的构造方法:");
for (Constructor constructor : declaredConstructors)
System.out.println(constructor);
//4. 获得无参构造方法
Constructor constructor1 = aClass.getConstructor(null);
System.out.println("所有公有无参构造方法:");
System.out.println(constructor1);
obj = constructor1.newInstance();
System.out.println("obj = " + obj);
//5. 获得私有的构造方法
Constructor<?> declaredConstructor = aClass.getDeclaredConstructor(int.class);
System.out.println("私有的构造方法:");
System.out.println(declaredConstructor);
declaredConstructor.setAccessible(true);
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
公有的构造方法:
public reflect.Student(java.lang.String,int)
public reflect.Student()
public reflect.Student(char)
所有声明的构造方法:
private reflect.Student(int)
protected reflect.Student(boolean)
public reflect.Student(java.lang.String,int)
reflect.Student(java.lang.String)
public reflect.Student()
public reflect.Student(char)
所有公有无参构造方法:
public reflect.Student()
调用了公有、无参构造方法执行了。。。
obj = reflect.Student@4554617c
私有的构造方法:
private reflect.Student(int)

获得成员变量并调用

  1. 批量的:
    public Method[] getMethods():获取所有”公有方法”;(包含了父类的方法也包含Object类)
    public Method[] getDeclaredMethods():获取所有的成员方法,包括私有的(不包括继承的)
  2. 获取单个的:
    public Method getMethod(String name,Class<?>… parameterTypes),name : 方法名;Class … : 形参的Class类型对象
    public Method getDeclaredMethod(String name,Class<?>… parameterTypes)

  3. 调用方法:
    Method –> public Object invoke(Object obj,Object… args): obj : 要调用方法的对象,args:调用方式时所传递的实参

Student1类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package reflect;
/**
* Created by cdx0312
* 2018/4/20
*/
public class Student1 {
public void show1(String s){
System.out.println("调用了:公有的,String参数的show1(): s = " + s);
}
protected void show2(){
System.out.println("调用了:受保护的,无参的show2()");
}
void show3(){
System.out.println("调用了:默认的,无参的show3()");
}
private String show4(int age){
System.out.println("调用了,私有的,并且有返回值的,int参数的show4(): age = " + age);
return "abcd";
}
}

测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package reflect;
import java.lang.reflect.Method;
/**
* Created by cdx0312
* 2018/4/20
*/
public class MethodInvoke {
public static void main(String[] args) throws Exception{
// 1. 获取Class对象
Class clazz = Class.forName("reflect.Student1");
// 2. 获取所有公有方法并打印
System.out.println("***************获取所有的”公有“方法*******************");
Method[] methods = clazz.getMethods();
for (Method method : methods)
System.out.println(method);
// 3. 获取所有方法并打印
System.out.println("***************获取所有的方法*******************");
methods = clazz.getDeclaredMethods();
for (Method method : methods)
System.out.println(method);
// 4. 获取所有公有show1方法并调用
System.out.println("***************获取show1()方法*******************");
Method show1 = clazz.getMethod("show1", String.class);
System.out.println(show1);
Object o = clazz.getConstructor().newInstance();
show1.invoke(o, "haha");
System.out.println("***************获取私有show4()方法*******************");
Method show4 = clazz.getDeclaredMethod("show4", int.class);
System.out.println(show4);
show4.setAccessible(true);
Object invoke = show4.invoke(o, 188);
System.out.println(invoke);
}
}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package reflect;
import java.lang.reflect.Method;
/**
* Created by cdx0312
* 2018/4/20
*/
public class MethodInvoke {
public static void main(String[] args) throws Exception{
// 1. 获取Class对象
Class clazz = Class.forName("reflect.Student1");
// 2. 获取所有公有方法并打印
System.out.println("***************获取所有的”公有“方法*******************");
Method[] methods = clazz.getMethods();
for (Method method : methods)
System.out.println(method);
// 3. 获取所有方法并打印
System.out.println("***************获取所有的方法*******************");
methods = clazz.getDeclaredMethods();
for (Method method : methods)
System.out.println(method);
// 4. 获取所有公有show1方法并调用
System.out.println("***************获取show1()方法*******************");
Method show1 = clazz.getMethod("show1", String.class);
System.out.println(show1);
Object o = clazz.getConstructor().newInstance();
show1.invoke(o, "haha");
System.out.println("***************获取私有show4()方法*******************");
Method show4 = clazz.getDeclaredMethod("show4", int.class);
System.out.println(show4);
show4.setAccessible(true);
Object invoke = show4.invoke(o, 188);
System.out.println(invoke);
}
}

反射main方法

Student2类

1
2
3
4
5
6
7
8
9
10
11
package reflect;
/**
* Created by cdx0312
* 2018/4/20
*/
public class Student3 {
public static void main(String[] args) {
System.out.println("Main 方法执行");
}
}

测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package reflect;
import java.lang.reflect.Method;
/**
* Created by cdx0312
* 2018/4/20
*/
public class MainInvoke {
public static void main(String[] args) throws Exception{
Class clazz = Class.forName("reflect.Student3");
Method main = clazz.getMethod("main", String[].class);
main.invoke(null, (Object) new String[]{"a", "b", "c"});
}
}

其中调用main方法时,第一个参数是对象的类型,由于为静态类,则设置为null就可以,第二个参数是方法参数,注意强制转换。

通过反射运行配置文件的内容

Student4类

1
2
3
4
5
6
7
8
9
10
11
package reflect;
/**
* Created by cdx0312
* 2018/4/20
*/
public class Studnet4 {
public void show() {
System.out.println("show");
}
}

a.txt文件

1
2
className = reflect.Studnet4
methodName = show

测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package reflect;
import java.io.FileReader;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Properties;
/**
* Created by cdx0312
* 2018/4/20
*/
public class TxtInvoke {
public static void main(String[] args) throws Exception{
//通过反射获取Class对象
Class clazz = Class.forName(getValue("className"));
//2获取show()方法
Method method = clazz.getMethod(getValue("methodName"));
//3.调用show()方法
method.invoke(clazz.getDeclaredConstructor().newInstance());
}
//此方法接收一个key,在配置文件中获取相应的value
public static String getValue(String key) throws IOException {
//获取配置文件的对象
Properties properties = new Properties();
//获取输入流
FileReader in = new FileReader("E:\\Java_Code\\JavaSE\\ConsistenceHash\\src\\reflect\\a.txt");
//将流加载到配置文件对象中
properties.load(in);
in.close();
//返回根据key获取的value值
return properties.getProperty(key);
}
}

更新要反射的类只操作配置文件就可以,便于后期维护和更新。

反射越过泛型检查

泛型用于编译器,编译过后进行泛型擦除,可以通过反射来越过泛型检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package reflect;
import java.lang.reflect.Method;
import java.util.ArrayList;
/**
* Created by cdx0312
* 2018/4/20
*/
public class Test {
public static void main(String[] args) throws Exception{
ArrayList<String> strList = new ArrayList<>();
strList.add("aaa");
strList.add("bbb");
// strList.add(100);
//获取ArrayList的Class对象,反向的调用add()方法,添加数据
Class clazz = strList.getClass(); //得到 strList 对象的字节码 对象
//获取add()方法
Method m = clazz.getMethod("add", Object.class);
//调用add()方法
m.invoke(strList, 100);
//遍历集合
for(Object obj : strList){
System.out.println(obj);
}
}
}

Hibernate总结

发表于 2019-04-21 | 分类于 Java , Hibernate

一、Hibernate简介

  • hibernate是一个开源的ORM框架,Object-Relational Mapping,在关系型数据库和对象之间做了一个映射,Hibernate是对JDBC的进一步封装,将JDBC封装起来,从而在和数据库进行操作的时候不需要直接对数据库进行读写,只需要对Hibernate接口进行调用就可以完成数据库方面的操作。
  • Hibernate位于JPA接口和Native接口和JDBC之间,从而实现对JDBC的封装和对关系型数据库的对象映射。
    Alt text
  • Hibernate核心
    Alt text
    1. Configuration接口负责配置并启动Hibernate
    2. SessionFactory接口负责初始化Hibernate
    3. Session接口负责持久化对象的CRUD操作
    4. Transaction接口负责事务性
    5. Query接口负责执行数据库的查询

Hibernate的优缺点

优点

  1. 对象化,只需要操作对象就可以完成数据库的操作
  2. 一致性,代码可复用程度高
  3. POJO对象,简单的Java对象
  4. 测试方便 JUnit
  5. 效率高

缺点

  1. 使用数据库特性的语句,调优比较困难
  2. 对大批量数据库更新存在问题
  3. 系统中存在大量的攻击查询功能

    优缺点参考了http://blog.csdn.net/jiuqiyuliang/article/details/39078749这篇博客,自己还不是很理解。


二、Hibernate环境搭建

开发环境

Win10 + Intellij2017 + Mysql-Front + jdk1.8 + hibernate5.2,
其中hibernate5.2改动比较大,我学习hibernate的目的在于了解这个经典的持久层框架,现在主流公司的框架为ssm,所以主要参考马士兵老师的hibernate视频学习,版本为hibernate3.2,有一些区别,不过一遍学习一边修正。

Intelij创建项目流程:

  1. file->new Project,勾选hibernate,会自动下载好需要的hibernate的包,但是JDBC和Junit单元测试的包需要手动导入,添加方式如下:
    Alt text
    选中自己的lib文件夹就可以。
    包结构如下:
    Alt text
  2. 创建一个测试文件夹,标记为测试文件夹,用于测试。
  3. 根据提供的或者自己拷贝的hibernate.cfg.xml文件,完成基本的配置,了解基本的参数的意思即可。
  4. 写实体层代码
    • 建立一个Student类
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      package com.hibernate;
      import javax.persistence.Basic;
      import javax.persistence.Column;
      import javax.persistence.Entity;
      import javax.persistence.Id;
      @Entity
      public class Student {
      private int id;
      private String name;
      private int age;
      public void setAge(Integer age) {
      this.age = age;
      }
      @Id
      @Column(name = "id", nullable = false)
      public int getId() {
      return id;
      }
      public void setId(int id) {
      this.id = id;
      }
      @Basic
      @Column(name = "name", nullable = true, length = 20)
      public String getName() {
      return name;
      }
      public void setName(String name) {
      this.name = name;
      }
      @Basic
      @Column(name = "age", nullable = true)
      public Integer getAge() {
      return age;
      }
      public void setAge(int age) {
      this.age = age;
      }
      @Override
      public boolean equals(Object o) {
      if (this == o) return true;
      if (o == null || getClass() != o.getClass()) return false;
      Student student = (Student) o;
      if (id != student.id) return false;
      if (age != student.age) return false;
      if (name != null ? !name.equals(student.name) : student.name != null) return false;
      return true;
      }
      @Override
      public int hashCode() {
      int result = id;
      result = 31 * result + (name != null ? name.hashCode() : 0);
      result = 31 * result + age;
      return result;
      }
      }

这里采用注解的方式完成实体类和数据库表之间的映射。在hibernate.cfg.xml文件中加入对应的mapping语句。
<mapping class="com.hibernate.Student"/>

  1. 测试映射是否正常工作,为了调试方便,推荐将hibernate默认的日志slf4j换成log4j,用JUnit进行单元测试。测试代码如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    public class StudentTest extends TestCase{
    private SessionFactory sessionFactory;
    @Override
    protected void setUp() throws Exception {
    final StandardServiceRegistry registry = new StandardServiceRegistryBuilder().configure().build();
    try {
    sessionFactory = new MetadataSources(registry).buildMetadata().buildSessionFactory();
    } catch (Exception e) {
    e.printStackTrace();
    StandardServiceRegistryBuilder.destroy(registry);
    }
    ServiceRegistry serviceRegistry = new StandardServiceRegistryBuilder().configure().build();
    Metadata metadata = new MetadataSources(serviceRegistry).buildMetadata();
    SchemaExport schemaExport = new SchemaExport();
    schemaExport.create(EnumSet.of(TargetType.DATABASE),metadata);
    }
    @Override
    protected void tearDown() throws Exception {
    if (sessionFactory != null) {
    sessionFactory.close();
    }
    }
    @Test
    public void testInsert() {
    Student student = new Student();
    student.setName("aa");
    student.setId(1);
    student.setAge(12);
    Session session = sessionFactory.getCurrentSession();
    session.beginTransaction();
    session.save(student);
    session.getTransaction().commit();
    }
    @Test
    public void testUpdate() {
    testInsert();
    Session session = sessionFactory.getCurrentSession();
    session.beginTransaction();
    Student student = (Student)session.get(Student.class, new Integer(1));
    student.setName("penny");
    session.getTransaction().commit();
    }
    @Test
    public void testGetById() {
    testInsert();
    Session session = sessionFactory.getCurrentSession();
    session.beginTransaction();
    Student student = session.get(Student.class, new Integer(1));
    session.getTransaction().commit();
    System.out.println("id" + student.getId() + " name = " + student.getName() + " Age = " + student.getAge());
    }
    @Test
    public void testDelete() {
    testInsert();
    Session session = sessionFactory.getCurrentSession();
    session.beginTransaction();
    Student student = session.get(Student.class, new Integer(1));
    session.delete(student);
    session.getTransaction().commit();
    }
    }

需要注意的是在hibe.cfg.xml文件中,我们配置了<property name="hbm2ddl.auto">update</property>,从而每次运行的时候会再数据库中删除原有的表,再执行后续操作,所有后面测试查找和删除功能时,需要首先插入数据才可以进行正常操作,不然会报空指针错误。

  • NOTE: Junit的测试类开头需要用testXXX, 不然会检测不到测试文件。
    1. 测试结果:
    • 插入:
      Alt text
    • 更新
      Alt text
    • ID查找
      Alt text
    • 删除
      Alt text

三、Hibernate核心接口

  1. Configuration : 配置并启动Hibernate,创建SessionFactory对象
  2. SessionFactory :初始化Hibernate,创建Session对象
  3. Session : 持久化对象的CRUD操作
  4. Transaction : 管理事务
  5. Query,Criteria : 执行数据库的查询

1、Configuration

一个Configuration实例允许应用指定在创建一个SessionFactory时使用的属性和映射文件,一个应用一般创建一个SessionFactory,Configuration是一个厨师时的对象,一个Configuration实例代表Hibernate所有Java类到SQL映射的集合。
Configuration配置SessionFactory的方法为:

1
2
StandardServiceRegistry registry = new StandardServiceRegistryBuilder().configure().build();
sessionFactory = new MetadataSources(registry).buildMetadata().buildSessionFactory();

2、sessionFactory

  • SessionFactory的作用是用于产生和管理Session的,创建SessionFactory成本比较大,一般一个程序只创建一个。SessionFactory一旦被创建,与Configuration对象就不再关联,内部状态是不可变的。
  • SessionFactory可以通过两种方式创建Session:
    • openSession: 每次打开都是新的Session,需要调用close方法关闭
    • getCurrentSession: 从上下文中获取Session并绑定到当前线程,第一次调用会自动创建一个Session实例,没有手动关闭的时候获取的是同一个Session,事务提交或者回滚时会自动关闭Session,不需要调用close方法。
  • 使用getCurrentSession的时候注意区分本地事务和全局事务:
    • 本地事务:JDBC事务,也就是一个数据库的事务<property name="current_session_context_class">thread</property>
    • 全局事务 JTA事务,分布式数据库的事务,跨越多个数据库<property name="current_session_context_class">jta</property>

3、Session

Session用于管理一个数据库的CRUD操作,是Java应用和Hibernate之间主要运行接口。
Session的生命周期是以一个逻辑事务的开始和结束为边界,Session的主要功能是提供创建,读取和删除映射的实体类操作。

  • 实体对象的三种状态:
    Alt text
    1. Transient:瞬时的,没有内置ID,只是内存中的一个对象,在缓存和数据库里面都找不到这个对象
    2. Persistent:持久化的,缓存中有ID,数据库中有,内存中有,缓存中有
    3. Detached:游离的,数据库中有ID,数据库和内存里面都有,缓存中没有
  • Session中存在一个缓存,成为Hibernate一级缓存,存放了当前单元加载的对象,缓存中存在一个Map,在执行save方法后,会对改对象生成一个id,并存储在Map中,提交之后,数据库就同步了这条数据,此时实例就处于持久化状态,关闭Session之后处于游离状态。

  • Session接口的方法:

    • save():将Transient状态的实体转换为Persistent状态,并给其分配一个ID标识符。
    • Delete():从数据库中删除持久化的实体
    • load()/get():将给定标识符的实体有Detached状态转变为Persistent状态。

      load返回的是代理对象,真正需要对象的内容时才会发出SQL语句
      get直接从数据库中加载对象,不会产生延迟
      当对象不存在时,get会立即报错,load只有需要使用该对象时才报错。

    • update():
      1. 更新指定标识符的Deta状态为Persistent状态
      2. 更新transient对象会报错
        1. 更新自己设定ID的transient对象可以(数据库中需要有相应的ID)
        2. persistent状态的对象只要设定不同对象就会发生更新
        3. 更新部分更改的字段
          • xml设定property标签的update属性,annotation设定@Column的updateable属性,很少用,不灵活
          • 使用xml中的dynamic-update,annotation在实体类上设定@DynamicUpdate
            • 使用HQL(EJBQL)语句–建议
    • clear:无论是load还是get,都会优先查找缓存,没找到才会去数据库中查找,调用clear可以强制清楚session缓存。

      4、Transaction

      Hibernate中的Transaction是对JDBC或者JTA的Transaction的封装,一个典型的事务会在创建完Session之后启动session.beginTransaction()连启动事务,commit提交事务:
      1
      2
      3
      4
      5
      Session session = sessionFactory.getCurrentSession();
      session.beginTransaction();
      Teacher t = (Teacher)session.get(Teacher.class, 1);
      t.setName("zhangsan2");
      session.getTransaction().commit();

5、Query

通过SessionFactory获取session对象,可以通过get方法来获取相应的对象,也可以通过获取Query对象来获得需要的对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Test
public void testHQL_01() {
testSave();
Session session = sessionFactory.openSession();
session.beginTransaction();
Query q = session.createQuery("from Category");
List<Category> categories = (List<Category>)q.list();
for(Category c : categories) {
System.out.println(c.getName());
}
session.getTransaction().commit();
session.close();
}

四、Hibernate关系映射

Hibernate之间的关系是指对象之间的关系,不是数据库之间的关系。ORM的思想是将关系型数据库中表单的数据映射成对象,以对象的形式展现,这样开发就可以把对数据库的操作转变为对对象的操作。
Hibernate实现ORM功能的主要文件有:

  • 映射类(xx.java):描述数据库表的结构,表中的字段在类中被描述成属性,可以实现把表中的记录映射成该类的对象。
  • 映射文件(xx.hbm.xml):指定数据库表和映射类之间的关系,包括映射类和数据库表的对应关系,表字段和类属性类型的对应关系以及表子弹和类属性名称的对应关系等
  • 数据库配置文件(hibernate.cfg.xml):指定与数据库连接时需要的连接信息,比如数据库类型,用户名,密码等。

1. 单向一对一关联

两个实体对象之间是一对一的关联映射,也就是一个对象只能与另外一个唯一的对象相对应。比如实体Student和StudentCard是典型的一对一的关系。一个学生有唯一的一个学号。一个学号对应唯一的一个学生。完成关系模型映射的方式有两种:

  • 主键关联:让两个对象具有相同的主键值,不需要其余的外键字段来维护关系。

    • Annotation : @OneToOne
      @PrimaryKeyJoinColumn
      
    • XML:
  • 外键关联:两个实体对象用一个外键来关联,本质上是多对一关联映射的特例,多的一端加上唯一的限制之后就成为了一对一的关联映射。

1.1 XML实现方式

StudentCard.java:

1
2
3
4
5
6
7
public class StudentCard {
private int id;
private String num;
private Student student;
//Getter and Setter are omitted
}

Student.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Student {
private int id;
private String name;
private int age;
private String sex;
private boolean good;
public Student() {
}
public Student(int age, String sex, boolean good) {
this.age = age;
this.sex = sex;
this.good = good;
}
public Student(String name, int age, String sex, boolean good) {
this.name = name;
this.age = age;
this.sex = sex;
this.good = good;
}
//Getter and Setter are omitted
}

Student.hbm.xml:

1
2
3
4
5
6
7
8
9
10
11
12
<hibernate-mapping>
<class name="com.hibernate.Student">
<id name="id" >
<generator class="native"/>
</id>
<property name="name"/>
<property name="age" />
<property name="sex" />
<property name="good" type="yes_no"></property>
</class>
</hibernate-mapping>

StudentCard.hbm.xml:

1
2
3
4
5
6
7
8
9
10
<hibernate-mapping>
<class name="com.hibernate.StudentCard">
<id name="id" >
<generator class="native"/>
</id>
<property name="num"/>
<many-to-one name="student" column="studentID" unique="true" />
</class>
</hibernate-mapping>

hibernate.cfg.xml:

1
2
<mapping resource="com/hibernate/Student.hbm.xml"/>
<mapping resource="com/hibernate/StudentCard.hbm.xml"/>

建表结果:
Alt text

1.2 Annotation方式

采用Husband和Wife两个实体对象,其中在Husband里面有对Wife的单向关联。
Wife:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Entity
public class Wife {
private int id;
private String name;
@Id
@GeneratedValue(generator = "increment")
@GenericGenerator(name = "increment", strategy = "increment")
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

Husband:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Entity
public class Husband {
private int id;
private String name;
private Wife wife;
@Id
@GeneratedValue(generator = "increment")
@GenericGenerator(name = "increment", strategy = "increment")
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@OneToOne
@JoinColumn(name = "wifeid")
public Wife getWife() {
return wife;
}
public void setWife(Wife wife) {
this.wife = wife;
}
}

hibernate.cfg.xml加上映射类关系:

1
2
<mapping class="com.hibernate.Wife"/>
<mapping class="com.hibernate.Husband"/>

2. 双向一对一关联

上面说的一对一单向映射只能从一方加载另一方,双向关联映射可以互相加载。与单向关联映射相比,并不影响其再数据库中的存储方式,只影响其加载方式。

  • 主键

    • 修改StudentCard.hbm.xml文件:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      <hibernate-mapping>
      <class name="com.bjsxt.hibernate.StuIdCard">
      <id name="id">
      <generator class="foreign">
      <param name="property">student</param>
      </generator>
      </id>
      <property name="num"/>
      <one-to-one name="student" constrained="true"></one-to-one>
      </class>
      </hibernate-mapping>
    • Annotation 将添加注解@PrimaryKeyJoinColumn即可

  • 外键
    • Annotation方式在Wife中创建一个Husband的引用, 在get方法上面添加注解@OneToOne(mappedBy = "wife")即可
    • XML方式在Student.hbm.xml中添加:<one-to-one name="studentCard" property-ref="student"/>即可

3. 一对多关联映射

一对多和多对一的关联映射原理一致,都是在多的一端加一个外键,指向一的一端。

  • 与一对多的区别在于维护的关系不同:多对一维护的关系是多指向一的关系,有了此关系,在加载多的时候可以将一加载上来,一对多维护的是多的关系,有了此关系,在加载一的时候可以将多加载上来。

选取的是Group和User类型,显然一个Group拥有多个User,一个User属于一个Group,是典型的一对多的关系。
Group:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Entity
@Table(name = "t_group")
public class Group {
private int id;
private String name;
private Set<User> users = new HashSet<>();
@Id
@GeneratedValue
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@OneToMany
@JoinColumn(name = "groupId")
public Set<User> getUsers() {
return users;
}
public void setUsers(Set<User> users) {
this.users = users;
}
}

User:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Entity
@Table(name = "t_user")
public class User {
private int id;
private String name;
@Id
@GeneratedValue
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

在hibernate.cfg.xml中注册之后即可进行测试:
Alt text
这里只写了注解实现的,因为考虑到注解是后面的主流,工作后也是注解用的比较多,XML的只写下思路。在Group配置文件下:

1
2
3
4
<set name="users">
<key column="groupid"></key>
<one-to-many class="com.hibernate.User"></one-to-many>
</set>

完成其一对多的单向映射。

4、一对多,多对一双向关联映射

完成一对多多对一的双向关联映射的关键在于在多的哪段添加对一的那端的引用。也就是在User实体里面创建一个Group类型的私有变量,从而持有Group的一个引用。与一对多的区别在于:

  1. Annotation方式下,User实体类对Group的getter方法上添加注解:@ManyToOne(),说明这是多对一的关联。
  2. XML方式下,在User的配置文件下添加:<many-to-one name="group" column="group_Id"/>即可。

5、单向多对多关联映射

多对多映射比较常见,比如一个老师教多个学生,一个学生有多个老师。关联是通过创建一个中间表来实现的。中间表可以很好的解决数据冗余的问题。
Student.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Entity
public class Student {
private int id;
private String name;
@Id
@GeneratedValue
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

Teacher.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Entity
public class Teacher {
private int id;
private String name;
private Set<Student> students = new HashSet<>();
@Id
@GeneratedValue
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@ManyToMany
@JoinTable(name = "t_s",
joinColumns = {@JoinColumn(name = "teacher_id")},
inverseJoinColumns = {@JoinColumn(name = "student_id")}
)
public Set<Student> getStudents() {
return students;
}
public void setStudents(Set<Student> students) {
this.students = students;
}
}

生成结果:
Alt text

6、双向多对多关联映射

与单向的相比,在Student类中添加了Teacher的引用,从而实现多对多的导航,其余的和5一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Entity
public class Student {
private int id;
private String name;
private Set<Teacher> teachers = new HashSet<>();
@Id
@GeneratedValue
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@ManyToMany(mappedBy = "students")
public Set<Teacher> getTeachers() {
return teachers;
}
public void setTeachers(Set<Teacher> teachers) {
this.teachers = teachers;
}
}

7、关联数据的CRUD操作

  • 设定cascade可以设定在持久化时对关联对象的操作
    • cascade属性指明做什么操作的时候关联对象是绑定在一起的
    • 双向关系在程序中要设定双向关联
    • 双向mappedBy
    • Fetch
      * 双向不要两边设置Eager,会有多余的查询语句发出
      * 对多方设置fetch时要谨慎吗,结合具体的应用,一般用Lazy。
      
    • ORMapping编程模型
      * 映射模型
              * JPA annotation
                * Hibernate annotation extension
                * Hibernate xml
                * Jpa xml
          * 编程接口
                * Jpa
                *  Hibernate
          *  数据查询语言
                * HQL
      * EJBQL
      
  • TIps:
    * *删除和更新之前需要load*
    * *如果想要删除关联关系,先设定关系为null,再删除对应记录,如果不删除,记录变为垃圾数据*
    
    测试程序:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    public class HibernateORMTest extends TestCase{
    private SessionFactory sessionFactory;
    @Override
    protected void setUp() throws Exception {
    try {
    sessionFactory = new Configuration().configure().buildSessionFactory();
    } catch (Exception e) {
    e.printStackTrace();
    }
    ServiceRegistry serviceRegistry = new StandardServiceRegistryBuilder().configure().build();
    Metadata metadata = new MetadataSources(serviceRegistry).buildMetadata();
    SchemaExport schemaExport = new SchemaExport();
    schemaExport.create(EnumSet.of(TargetType.DATABASE),metadata);
    }
    @Override
    protected void tearDown() throws Exception {
    if (sessionFactory != null) {
    sessionFactory.close();
    }
    }
    @Test
    public void testSaveUser() {
    User user = new User();
    user.setName("g1");
    Group g = new Group();
    g.setName("g1");
    user.setGroup(g);
    Session session = sessionFactory.openSession();
    session.beginTransaction();
    // session.save(g);
    session.save(user);
    session.getTransaction().commit();
    session.close();
    }
    @Test
    public void testSaveGroup() {
    User u1 = new User();
    u1.setName("u1");
    User u2 = new User();
    u2.setName("u2");
    Group g = new Group();
    g.setName("g1");
    g.getUsers().add(u1);
    g.getUsers().add(u2);
    u1.setGroup(g);
    u2.setGroup(g);
    Session session = sessionFactory.openSession();
    session.beginTransaction();
    // session.save(g);
    session.save(g);
    session.getTransaction().commit();
    session.close();
    }
    @Test
    public void testGetUser() {
    testSaveGroup();
    Session session = sessionFactory.openSession();
    session.beginTransaction();
    User user = session.get(User.class, 1);
    System.out.println(user.getGroup().getName());
    session.getTransaction().commit();
    session.close();
    }
    @Test
    public void testGetGroup() {
    testSaveGroup();
    Session session = sessionFactory.openSession();
    session.beginTransaction();
    Group g = session.get(Group.class, 1);
    session.getTransaction().commit();
    session.close();
    }
    @Test
    public void testLoadUser() {
    testSaveGroup();
    Session session = sessionFactory.openSession();
    session.beginTransaction();
    User user = session.load(User.class, 1);
    System.out.println(user.getGroup().getName());
    session.getTransaction().commit();
    session.close();
    }
    @Test
    public void testUpdateUser() {
    testSaveGroup();
    Session session = sessionFactory.getCurrentSession();
    session.beginTransaction();
    User user = new User();
    user.setName("hhhhhh");
    Group g = new Group();
    g.setName("g2");
    user.setGroup(g);
    // user.getGroup().setName("ggggg");
    session.persist(user);
    session.getTransaction().commit();
    }
    @Test
    public void testDeleteUser() {
    testSaveGroup();
    Session session = sessionFactory.getCurrentSession();
    session.beginTransaction();
    // User user = session.load(User.class, 1);
    // user.setGroup(null);
    // session.delete(user);
    session.createQuery("delete from User u where u.id = 1").executeUpdate();
    session.getTransaction().commit();
    }
    @Test
    public void testDeleteGroup() {
    testSaveGroup();
    Session session = sessionFactory.getCurrentSession();
    session.beginTransaction();
    Group g = session.load(Group.class, 1);
    session.delete(g);
    // session.createQuery("delete from User u where u.id = 1").executeUpdate();
    session.getTransaction().commit();
    }
    }

8、集合映射

对象之间存储的容器可以选择Set, List, Map三种,根据实际中的具体需求来选取不同的容器,其中Map需要设定键值对,相对来说要复杂一点。采用的是User和Group的双向的一对多的关系,在User中设置好ID生成策略和多对一的导航,在Group中设置装载user的容器为map:

1
2
3
4
5
6
7
private Map<Integer, User> users = new HashMap<>();
@OneToMany(mappedBy = "group", cascade = CascadeType.ALL)
@MapKey(name = "id")
public Map<Integer, User> getUsers() {
return users;
}

单元测试如下:

1
2
3
4
5
6
7
8
9
10
11
@Test
public void testMap() {
Session session = sessionFactory.getCurrentSession();
session.beginTransaction();
Group g = session.load(Group.class, 1);
for (Map.Entry<Integer, User> entry : g.getUsers().entrySet()) {
System.out.println(entry.getValue().getName());
}
session.getTransaction().commit();
}

9、继承关系

hibernate中有三种继承关系:

  • 一张总表 single_table
  • 每个类分别一张表 table_per_class
  • 每个子类一张表 joined

找到了一个说的很详细的博客,可以参考:http://blog.csdn.net/pursuer211/article/details/17318379


五、Hibernate查询方式

Hibernate支持多种查询方式:

  • NativeSQL查询语言
  • HQL Query q = session.createQuery("from Category");
  • EJBQL
  • QBC –query by Cretiral Criteria c = session.createCriteria(Topic.class)
  • QBE – query by Example
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    Topic tExample = new Topic();
    tExample.setTitle("T_");
    Example e = Example.create(tExample)
    .ignoreCase().enableLike();
    Criteria c = session.createCriteria(Topic.class)
    .add(Restrictions.gt("id", 2))
    .add(Restrictions.lt("id", 8))
    .add(e)
    ;

六、Hibernate性能优化

  1. 注意session.clear的运用,清除不用的缓存空间。

  2. 1+N问题:hibernate默认表和表的关联方式为fetchType.EAGER,这时候hibernate在一对多关系中查询多的一方的数据时,会额外发出N条SQL语句。原因在于设置了多到一的导航,而一那方的fetch默认为Eager,从而会取出一那方的所有的值。

    解决方案:

    • 多对一那边的fetch属性设置为LAZY。但是这样会影响级联查询。
    • 设置BatchSize(),发出的SQL会减少,但是没有在根本上解决问题。
    • HQL语句中使用join fetch,推荐使用:
      1
      List<Topic> topics = (List<Topic>)session.createQuery("from Topic t left join fetch t.category c").list();
  3. list和iterate的区别

    • list取出所有的元素
    • iterate先取ID,等到用的时候再根据ID来取对象
    • session中list第二次发出仍然回去数据库查询
    • iterate第二次发出,首先查找session缓存
  4. 缓存机制:

    • 缓存的作用:Hibernate是一个持久层框架,经常访问物理数据库,为了降低应用程序对物理数据源访问的频次,从而提高应用程序的运行性能。缓存内的数据是对物理数据源中的数据的复制,应用程序在运行时从缓存读写数据,在特定的时刻或事件会同步缓存和物理数据源的数据。
    • 缓存的分类:Hibernate一级缓存和Hibernate二级缓存Hibernate一级缓存又称为“Session的缓存”,它是内置的,不能被卸载(不能被卸载的意思就是这种缓存不具有可选性,必须有的功能,不可以取消session缓存)。由于Session对象的生命周期通常对应一个数据库事务或者一个应用事务,因此它的缓存是事务范围的缓存。第一级缓存是必需的,不允许而且事实上也无法卸除。在第一级缓存中,持久化类的每个实例都具有唯一的OID。 Hibernate二级缓存又称为“SessionFactory的缓存”,由于SessionFactory对象的生命周期和应用程序的整个过程对应,因此Hibernate二级缓存是进程范围或者集群范围的缓存,有可能出现并发问题,因此需要采用适当的并发访问策略,该策略为被缓存的数据提供了事务隔离级别。第二级缓存是可选的,是一个可配置的插件,在默认情况下,SessionFactory不会启用这个插件。
    • 第二级缓存存放的数据:
      • 很少被修改的数据
      • 不是很重要的数据,
      • 不会被并发访问的数据
      • 常量数据
      • 经常被访问的数据
    • 不适合存放在第二级缓存的数据:

      • 经常修改的数据
      • 不允许出现并发访问的数据
      • 与其他应用共享的数据
    • HIbernate查找对象中缓存的应用:

      • 当Hibernate根据ID访问数据对象的时候,首先从Session一级缓存中查;查不到,如果配置了二级缓存,那么从二级缓存中查;如果都查不到,再查询数据库,把结果按照ID放入到缓存,删除、更新、增加数据的时候,同时更新缓存。Hibernate管理缓存实例无论何时,当你给save()、update()或saveOrUpdate()方法传递一个对象时,或使用load()、 get()、list()、iterate() 或scroll()方法获得一个对象时, 该对象都将被加入到Session的内部缓存中。 当随后flush()方法被调用时,对象的状态会和数据库取得同步。 如果你不希望此同步操作发生,或者你正处理大量对象、需要对有效管理内存时,你可以调用evict() 方法,从一级缓存中去掉这些对象及其集合。
    • load默认使用二级缓存,iterate默认使用二级缓存,list默认往二级缓存中加数据,Query需要设置才可以使用二级缓存。

    • 缓存算法:

      • LRU Least Recently Used
      • LFU Least Frequently Used
      • FIFO
  5. 事务并发处理

    • 事务 :ACID
      • Atomic 原子性
      • Consistency 一致性
      • Isolation 隔离性
      • Durability 持久性
    • 事务并发可能出现的问题:
      • 第一类丢失更新, 把后启动事务的更新丢弃
      • 脏读 读到了其他事务还没有提交的数据
      • 第二类丢失更新 不可重复读,多次读取一个事务读出了不同的值
      • 幻读 读到了不存在的事务
    • 数据库的隔离机制
      • Serializable (串行化):可避免脏读、不可重复读、幻读的发生。

       * Repeatable read (可重复读):可避免脏读、不可重复读的发生。

       * Read committed (读已提交):可避免脏读的发生。

       * Read uncommitted (读未提交):最低级别,任何情况都无法保证。

    • Hibernate的事务隔离机制:
      1:读操作未提交(Read Uncommitted)
      2:读操作已提交(Read Committed)
      4:可重读(Repeatable Read)
      8:可串行化(Serializable)
      设置方式为:
      <property name=" hibernate.connection.isolation">4</property>

HTTP协议小结

发表于 2019-04-21 | 分类于 HTTP , TCP

简介

HTTP协议是基于TCP/IP协议来传递数据的应用层协议,规定了客户端和服务器之间的通信格式,其默认端口为80,HTTP是无连接无状态的。

主要特点:

  1. 简单快速:客户向服务器请求服务时只需要传送请求方法和路径,请求方法常用的有GET,POST,不同方法规定了客户和服务器不同的联系类型。协议通信速度很快。
  2. 灵活:HTTP协议允许传输任意类型的数据对象,正在传输的类型由Content-Type加以标记。
  3. 无连接:限制每次连接只处理一个请求,服务端确定用户端收到了处理后的结果之后,立刻断开连接。节省传输时间。
  4. 无状态:HTTP协议是无状态协议。无状态指的是协议对于事务处理没有记忆能力。
  5. 支持BS及CS模式

URL

http://host[:port][abs_path]

例如:

https://www.baidu.com/s?wd=HttP%E5%8D%8F%E8%AE%AE%E6%80%BB%E7%BB%93&rsv_spt=1&rsv_iqid=0xbb6ac0d200022f6c&issp=1&f=8&rsv_bp=1&rsv_idx=2&ie=utf-8&rqlang=cn&tn=baiduhome_pg&rsv_enter=1&oq=HttP%25E6%2580%25BB%25E7%25BB%2593&inputT=7363&rsv_t=089bmiWk56%2BnKt5Ljs7CZihPEekxaufgYcpdefPyy4J6qS3KjJEt6Qzyg5XhLwQPRZTF&rsv_pq=c43d53970003ea1e&rsv_sug3=58&rsv_sug1=42&rsv_sug7=100&rsv_sug2=0&rsv_sug4=8625&rsv_sug=2

  • 协议部分:如http,https,tcp,其后加://
  • 域名部分:www.baidu.com,合法的Internet主机域名或者IP地址
  • 端口部分:指定一个端口,拥有被请求资源的服务器主机监听该端口的TCP连接。如果port为空,HTTP协议默认为80,HTTPS协议默认为443。
  • 虚拟目录部分:从域名的第一个/开始到最后一个/为止。
  • 文件名部分:从域名后的最后一个/开始到?为止,是文件名部分,如果没有?文件名部分:从域名后的最后一个“/”开始到“?”为止,是文件名部分,如果没有“?”,则是从域名后的最后一个“/”开始到“#”为止,是文件部分,如果没有“?”和“#”,那么从域名后的最后一个“/”开始到结束,都是文件名部分。
  • 锚部分:从“#”开始到最后,都是锚部分。
  • 参数部分:从“?”开始到“#”为止之间的部分为参数部分,又称搜索部分、查询部分。

URL和URI的区别:

URL URI
全名 Uniform resource locator Uniform resource identifier
用途 url可以用来标识一个资源,还指明了如何locate这个资源 定位web上可用的各种资源文件
组成 协议+IP地址(端口)+主机资源的具体地址 访问资源的命名机制+存放资源的主机名+资源自身的名称

请求消息Request

客户端发送一个HTTP请求到服务器的请求消息包含:

  • 请求行
  • 请求头部
  • 空行
  • 请求数据

    Alt text

  1. 请求行:说明请求类型,要访问的资源以及使用的HTTP版本
  2. 请求头部:说明服务器要使用的附加信息,host将指出请求的目的地,User-Agent。
  3. 空行:强制空行划分请求头和请求数据
  4. 请求数据:也叫请求主题,可以添加任意的其他数据。

请求头的常用字段:

  • Accept:告诉服务器,客户机支持的数据格式
  • Accept-Charset:告诉服务器,客户机采用的编码
  • Accept-Encoding:告诉服务器,客户机支持的数据压缩格式
  • Accept-language:用于告诉服务器,资源缓存的时间
  • Host:用于告诉服务器,客户机向访问的主机名
  • If-Modified-Since:用于告诉客户机,资源缓存的时间
  • Referer: 用于告诉服务器,客户机是从哪个资源链接到本资源的
  • User-Agent:客户机通过这个头可以向服务器带数据。
1
2
3
4
5
6
7
8
9
Accept: text/javascript, application/javascript, application/ecmascript, application/x-ecmascript, */*; q=0.01
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,ja;q=0.6
Connection: keep-alive
Cookie: BAIDUID=CDFD2D15954E6F99667163D726B3A3E4:FG=1; BIDUPSID=CDFD2D15954E6F99667163D726B3A3E4; PSTM=1500866515; BDUSS=hEYVF-Wmpka0lRLXMwWGRQZmtiVkViUDB5OVJnblNrQVpEM29EcHJPZ2JMS1paSUFBQUFBJCQAAAAAAAAAAAEAAAD0yYIxX8biX9fTAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABufflkbn35ZWj; MCITY=-131%3A; BD_UPN=12314753; H_PS_PSSID=1445_21099_20928; BD_CK_SAM=1; PSINO=1; H_PS_645EC=b5cdqANCiMatErji8Z8BouSUOvY79iH5o%2Fjy%2FUKJM0yMzqrgjyQy2ee4g9p7aR60t86a
Host: www.baidu.com
Referer: https://www.baidu.com/s?ie=utf-8&f=8&rsv_bp=1&rsv_idx=2&tn=baiduhome_pg&wd=http%E8%AF%B7%E6%B1%82%E5%A4%B4%E4%B8%AD%E7%9A%84%E5%AD%97%E6%AE%B5&rsv_spt=1&oq=http%25E8%25AF%25B7%25E6%25B1%2582&rsv_pq=c189685100052af0&rsv_t=596djY6Nc%2B2XuiZI2%2BAjz0DgMcM0Kn2EZBS1dtxl4a3ykgijQGu4DuQ3OjfSPXh6SjPu&rqlang=cn&rsv_enter=1&rsv_sug3=5&rsv_sug1=4&rsv_sug7=100&rsv_sug2=0&inputT=5807&rsv_sug4=6646
User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36
X-Requested-With: XMLHttpRequest

消息相应Response

消息相应由四部分组成:

Alt text

  • 状态行:由HTTP协议版本号,状态码,状态消息三部分组成
  • 消息报头:说明客户端要使用的一些附加信息
  • 空行:分割用,必须
  • 响应正文:服务器返回给客户端的文本信息

状态码:

  1. 1XX:指示信息,表示请求已经接收,继续处理
  2. 2XX:成功,表示请求已经被成功接收
  3. 3XX:重定向,要完成请求需要进行更进一步的操作
  4. 4XX:客户端错误,请求有语法错误或者请求无法实现
  5. 5XX:服务器内部错误。

常见状态码:

  • 200 OK
  • 400 bad Request
  • 401 Unauthorized
  • 403 Forbidden
  • 404 Not Found
  • 500 Internal Server Error
  • 503 Server Unavailable

常见响应头

  • Location:这个响应头配合302状态码使用,用于告诉客户机资源的地址
  • Server:服务器通过这个头,告诉浏览器服务器的类型
  • Content-Encoding:服务器通过这个头,告诉浏览器数据压缩的格式
  • Content-Length:服务器通过这个头,告诉浏览器回送数据的长度
  • Content-Type:服务器通过这个头,告诉浏览器回送数据的类型
  • Last-Modified:服务器通过这个头,告诉浏览器当前资源缓存时间
  • Refresh:服务器通过这个头,告诉浏览器隔多长时间刷新一次
  • Content-Disposition:服务器通过这个头,告诉浏览器以下载方式打开数据
  • Transfer-Encoding:服务器通过这个头,告诉浏览器数据传送的格式
  • ETag:缓存相关的头,和Last-Modified功能一样,不过实时性更强(Last-Modified是一秒内即使内容更新也让浏览器找缓存)
  • Expires:服务器通过这个头,告诉浏览器把回送的资源缓存多长时间,0或-1表示不缓存
  • Cache-Control:no-cache
  • Pragma: no-cache

HTTP请求方法

HTTP1.0定义了三种请求方式,get、post、head,HTTP1.1新增了五种请求方法,options、put、delete、trace、connect。

  • GET:请求指定的页面信息,并返回实体主体
  • HEAD:类似于GET请求,只不过返回的响应中没有具体的内容,用于获取报头。
  • POST:向指定资源提交数据进行处理请求,数据被包含在请求体中,POST请求可能会导致新的资源的尽力或者已有资源的修改。
  • PUT:从客户端向服务器传送的数据取代指定的文档的内容。
  • DELETE:请求服务器删除指定的页面。
  • CONNECT:HTTP1.1协议中预留给能够将连接改为管道方式的代理服务器。
  • OPTIONS:允许客户端查看服务器性能。
  • TRACE:回显服务器收到的请求,主要用于测试或诊断。

HTTP工作原理

HTTP协议采用了请求/响应模型,客户端向服务器发送一个请求报文,请求报文包含请求的方法,URL,协议版本,请求头部和请求数据。服务器以一个状态行作为相应,相应的内容包括协议的版本,成功或者错误代码,服务器信息,响应头部和响应数据。

HTTP请求响应的步骤:

  1. 客户端连接到Web服务器,HTTP客户端,比如浏览器,与Web服务器的HTTP端口建立一个TCP套接字连接。

  2. 发送HTTP请求,通过TCP套接字,客户端向Web服务器发送一个文本的请求报文,一个请求报文由请求行,请求头部,空行和请求数据四部分组成。

  3. 服务器接收请求并返回HTTP响应,Web服务器解析请求,定位请求资源。服务器将资源副本写到TCP套接字,由客户端读取。一个响应由状态行、响应头部,空行和响应数据4部分组成。

  4. 释放TCP连接,如果Connection模式为close,则服务器主动关闭TCP连接,客户端被动关闭连接,释放TCP连接;若connection模式为keepalive,则该连接会保持一段时间,在该时间内可以继续接受请求。

  5. 客户端浏览器解析HTML内容。浏览器解析状态行,查看表明请求是否成功的状态代码,然后解析每一个响应头,响应头告知一下若干字节的HTML文档和文档的字符集。客户端浏览器读取响应数据HTML,根据HTML语法对其进行格式化,并在浏览器窗口中显示。

GET和POST请求的区别

GET POST
提交 GET提交,请求的数据会附在URL之后,也就是讲数据放在HTTP协议头之内,以?分隔URL和传输数据,多个参数用&连接 POST提交,把提交的数据放置在HTTP包的包体中,不像get提交会在地址栏上显示出来
传输数据大小 特定浏览器和服务器对URL长度有限制,get请求会收到URL长度的限制 理论上不受到任何限制
安全性 get提交数据,例如用户名和密码将明文出现在URL上 安全性较高
  • HTTP get,post,soap协议都是在HTTP上运行的

    • get:请求参数是作为一个key-value键值对的序列附加到URL上,查询字符串长度受到浏览器和服务器的限制,不适合传输大型数据集且其极不安全。

    • psot:请求参数是在HTTP标题的一个不同部分传输的,这一部分用来传输表单信息,因此必须将contentType设置为application/x-www-form- urlencoded。POST参数也是key-value键值对。但是它不支持复杂类型数据,因为POST没有定义传输数据结构的语义和规则。

    • soap:http POST的一个专用版本,遵循一种特殊的XML消息格式,contentType设置为text/xml 任何数据都可以xml化。

TCP连接建立过程

Alt text

  1. 两端TCP进程都是处于关闭状态,A是主动打开连接,B是被动打开连接。
  2. A的TCP客户端进程向B发出连接请报文段,这时首部中的同步位SYN=1,同时选择一个初始序号seq=x。TCP规定,SYN报文段不能携带数据但要消耗一个序号。这时,A的客户进程就进入SYN-SENT状态(同步已发送)。
  3. B收到连接请求报文之后,向A发送确认。在确认报文段中把SYN和ACK位都设置为1,确认号是ack=x+1,同时也为自己选择一个初始序号seq=y。这个报文段也不携带信息但是要消耗掉一个序号。这时B的TCP进程就进入SYN-RCVD(已收到同步)状态。
  4. A的TCP客户端收到B的确认后,还要向B给出确认,确认报文段的ACK设置为1,确认号ack=y+1,而自己的序号seq=x+1。这时TCP连接已经建立,A进入ESTABLISHED(已建立连接)状态。
  5. B收到A的确认后,也会进入ESTABLISHED状态。
  • 为什么A还要再发送一次确认呢?
    是为了防止已失效的连接请求报文段突然又回传到了B,而产生错误。(该报文段可能因为网络延时而失效)

TCP连接断开的过程

Alt text

  1. 断开连接时,发起方可以是客户端也可以是服务端。发起方会先发送一个FIN包,就进入FIN_WAIT状态,因此就不能发送用户数据,等待响应方回复ACK。

  2. 响应方收到FIN包后,就给发起方回复ACK。此时,响应方进入CLOSE_WAIT状态,因此响应方还有待发送的数据,需要等待发送完毕,才能关闭连接(发起方在发送FIN前,会保证已经发送完数据)。

  3. 发起方收到ACK后,就进入FIN_WAIT_2。此时还能继续接受数据,直到收到响应方的FIN。

  4. 响应方等数据发送完毕后,就给发起方发送FIN包。

  5. 发起方收到响应方的FIN包后,就进入TIME_WAIT状态,并向响应方发送ACK。

  6. 响应方收到ACK后,就进入CLOSE状态,完成了断开流程。

  7. 发起方的TIME_WAIT状态会持续2MSL(最大报文生存时间)。然后进入到CLOSED。

  • 为何是四次握手?
    响应方还有待发送端 数据,需要数据发送完才能发SYN包。

  • 为什么需要TIME_WAIT状态?
    因为在发起方发送ACK之后,不保证响应方一定能收到ACK。响应方在发送FIN后,就等待发起方的ACK,在等待一段时间之后,如果没有收到ACK,就会重发FIN包。发起方在TIME_WAIT的时间内如果又收到了FIN包,就会在发送ACK包。

Java Enum用法总结

发表于 2019-04-21 | 分类于 Java

常量

枚举类可以用来当做常量使用,非常的方便。

1
2
3
public enum Color {
RED, GREEN, BLACK, YELLOW
}

枚举类可以做switch中的判定类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Aweek {
Day day = Day.MONDAY;
public void getDay() {
switch (day) {
case MONDAY:
day = Day.MONDAY;
break;
case TUESDAY:
day = Day.TUESDAY;
break;
case WEDSDAY:
day = Day.WEDSDAY;
break;
case THURSDAY:
day = Day.THURSDAY;
break;
case SATURDAY:
day = Day.SATURDAY;
break;
case SUNDAY:
day = Day.SUNDAY;
break;
case FRIDAY:
day = Day.FRIDAY;
break;
}
System.out.println(day);
}
public static void main(String[] args) {
new Aweek().getDay();
}
}
enum Day {
MONDAY, TUESDAY, WEDSDAY, THURSDAY, FRIDAY, SATURDAY, SUNDAY
}

枚举中可以添加方法,也可以覆盖方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public enum Colors{
RED("red", 1), GREEN("green", 2), BLANK("blank", 3), YELLOW("yellow", 4);
//成员变量
private String name;
private int index;
// 构造方法
Colors(String name, int index) {
this.name = name;
this.index = index;
}
// 覆盖方法
@Override
public String toString() {
return this.index + "" + this.name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getIndex() {
return index;
}
public void setIndex(int index) {
this.index = index;
}
//普通方法
public static String getName(int index) {
for (Colors colors : Colors.values()) {
if (colors.getIndex() == index)
return colors.getName();
}
return null;
}
}

枚举类可以实现接口,但是不能继承类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface Print {
void print();
}
public enum Colors implements Print{
RED("red", 1), GREEN("green", 2), BLANK("blank", 3), YELLOW("yellow", 4);
//成员变量
private String name;
private int index;
// 构造方法
Colors(String name, int index) {
this.name = name;
this.index = index;
}
@Override
public void print() {
System.out.println("haha");
}
}

使用接口来组织枚举的结构

1
2
3
4
5
6
7
8
9
public interface Food {
enum Coffee implements Food {
BLACK_COFFEE, DECAF_CAFFEE, LATTE, CAPPUCCINO
}
enum Dessert implements Food{
FRUIT, CAKE, GELATO
}
}

ActiveMQ学习总结(二)

发表于 2019-04-21 | 分类于 JMS , ActiveMQ

上一节总结了ActiveMQ的使用,主要的还是总结了在项目中直接使用和用Spring集成使用。这一节要进一步深入ActiveMQ,学习其内部的原理性的东西。

Session

ActiveMQ消息的发送和接收到离不开Session的建立,首先查看Session的源码:

1
2
3
4
5
6
7
8
9
public interface Session extends Runnable {
int AUTO_ACKNOWLEDGE = 1;
int CLIENT_ACKNOWLEDGE = 2;
int DUPS_OK_ACKNOWLEDGE = 3;
int SESSION_TRANSACTED = 0;
BytesMessage createBytesMessage() throws JMSException;
//omit
}

而ActiveMQ的Session的创建需要通过Connection的createSession方法,该方法需要设置两个参数,第一个参数表示是否支持事务,第二个参数表示签收模式。

1
2
3
4
public interface Connection {
Session createSession(boolean var1, int var2) throws JMSException;
//omit
}

所谓的签收模式也就是消费者在收到消息之后,需要通知消息服务器收到了消息。当消息服务器收到回执之后,本条消息将失效。而如果消费者收到了消息却并不签收,则本条消息继续有效,可能会被其他消费者消费。

  • AUTO_ACKNOWLEDGE :表示消费者接收到消息时自动签收
  • CLIENT_ACKNOWLEDGE :消费者接收消息之后需要手动签收
  • DUPS_OK_ACKNOWLEDGE :签收不签收都可以,要求消费者可以容忍重复消费。

实际中更加推荐采用手动签收,理论上说,消费者收到消息不代表消息传递的结束,只有当消费者正确处理了消息才是整个消息传递流程的终点。因此如果自动签却没有成功处理收会导致消息丢失。

消息顺序消费

消息优先级

MessageProvider的send方法存在多个重载方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package javax.jms;
public interface MessageProducer {
//omit above
void send(Message var1) throws JMSException;
void send(Message var1, int var2, int var3, long var4) throws JMSException;
void send(Destination var1, Message var2) throws JMSException;
void send(Destination var1, Message var2, int var3, int var4, long var5) throws JMSException;
}
  • 而我们在创建生产者时指定了Destination,也可以在send的时候指定。而且实际工程中的业务逻辑会更加复杂,可能会存在各种判断决定消息发往哪个地址,因此不推荐在创建MessageProducer的时候创建Destination。

  • 消息优先级,0-9。其中0-4为普通消息,5-9为加急消息,消息的默认级别是4。但是实际上,优先级只是个理论上的概念,ActiveMQ并不能保证消费的顺序性。

顺序消费

当我们需要对传入的消息设定一个固定的顺序的时候,比如商城项目中用户下单,支付,发货就是有严格的先后顺序的,不可能先发货在支付。这是或我们需要保证ActiveMQ的顺序执行。

  • 一个简单的思路是根据用户ID做一个哈希表,将消息定位到不同的队列上,从而可以使得同一个用户的消息将发往同一个队列。 然后对于同一个队列三个消息,比如订单消息,支付消息,发货消息,将其先后交付给订单系统,支付系统,物流系统进行处理。这个处理过程是同步的,但是在分布式场景下,并不会降低系统的处理性能。

消息的同步与异步

接收消息,可以通过消费者的receive方法,这种方法是client端主动接收消息,也就是同步接收。需要写一个死循环来不停的接受消息。而ActiveMQ提供了异步接收的方法。其实在上一节已经使用了,但是这次单独写出来。原理很简单,我们设置一个消息监听的机制,当队列上有消息了,则回调追星messageListener接口的onMessage方法。这次贴一个我商城里面使用的Listener实现类,实现后台商品添加和索引库的同步,简单逻辑是收到商品添加事件消息之后,根据id去数据库里面查询,然后添加到document中,加到索引库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 监听商品添加事件,同步索引库
* Created by cdx0312
* 2018/3/9
*/
public class ItemAddMessageListener implements MessageListener{
/**
* 注入DAO
*/
@Autowired
private SearchItemMapper searchItemMapper;
/**
* 注入SolrServer对象
*/
@Autowired
private SolrServer solrServer;
@Override
public void onMessage(Message message) {
try {
//从消息中取商品id
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
long itemId = Long.parseLong(text);
//根据商品id查询数据库,取商品信息,需要设置等待时间,等待事务提交
Thread.sleep(1000);
SearchItem searchItem = searchItemMapper.getItemById(itemId);
//创建文档对象
SolrInputDocument document = new SolrInputDocument();
//向文档对象中添加域
document.addField("id", searchItem.getId());
document.addField("item_title", searchItem.getTitle());
document.addField("item_sell_point", searchItem.getSell_point());
document.addField("item_price", searchItem.getPrice());
document.addField("item_image", searchItem.getImage());
document.addField("item_category_name", searchItem.getCategory_name());
document.addField("item_desc", searchItem.getItem_desc());
//把文档对象写入索引库
solrServer.add(document);
//提交
solrServer.commit();
} catch (JMSException | InterruptedException | SolrServerException | IOException e) {
e.printStackTrace();
}
}
}

P2P 和 Pub/Sub

两种消息模式,去网上找了两张图,上一节其实已经介绍了,但是没有图还是不直观

  • 一对一通信,一个生产者一个消费者

Alt text

  • 发布订阅模式,发布一条消息,所有订阅了该目标的消费者都会收到消息。

Alt text

如果消费者重启了,这个消费者会丢失一些消息,为了避免消息丢失,ActiveMQ采用了持久化机制来保存消息。

消息化订阅

持久化订阅就是如果消费者宕机,则将消息暂存在ActiveMQ中,等待消费者正常工作再发送给消费者。首先为消费者设定一个标识ID,然后创建爱你消费者的时候调用session的createDurableSubscriber方法来进行持久化订阅。

ActiveMQ持久化机制

为了避免意外宕机丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都会采用持久化机制。ActiveMQ的持久化消息机制有JDBC、AMQ、KahaDB和LevelDB。在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库,然后试图将消息发送给接受者,发送成功则将消息从存储中删除,失败则继续尝试。

消息中心启动之后首先要检查指定的存储位置,如果有未发送成功的消息,则需要把消息发送出去。

JDBC持久化方式

使用JDBC持久化方式,数据库会创建三个表:activemq_msgs,activemq_acks,activem_lock。其中activemq_msgs用来存储消息,Queue和Topic都存储在这个表中。

  • 配置方式

配置持久化的方式,需要修改conf/activemq.xml文件,首先定义一个mysql-ds的MySQL数据源,然后在persistenceAdapter节点中配置JDBCPersistenceAdapter并且引用刚才的数据源。dataSource指定持久化数据库的Bean,createTablesOnStartup的核定是否在启动时创建数据库表,默认值为true,一般第一次启动设置为true,后面改成false。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<beans>
<broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/>
</persistenceAdapter>
</broker>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="activemq"/>
<property name="password" value="activemq"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>
  • 数据库表信息

activemq_msgs用于存储消息,Queue和Topic都存储在这个表中:

列明 内容
ID 自增的数据库主键
CONTAINER 消息的Destination
MSGID_PORD 消息发送者客户端的主键
MSG_SEQ 发送消息的顺序,MSGID_PORD+MSG_SEQ可以组成JMS的MessageID
EXPIRATION 消息的过期时间, 存储的是从1970-01-01到现在的毫秒数
MSG 消息本体的Java序列化对象的二进制数据
PRIORITY 优先级 0-9

activemq_acks用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存:

列明 内容
CONTAINER 消息的Destination
SUB_DEST 如果是使用Static集群,这个字段会有集群其他系统的信息
CLIENT_ID 每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME 订阅者名称
SELECTOR 选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性AND和OR操作
LAST_ACKED_ID 记录消费过的消息的ID

表activemq_lock在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker。

AMQ方式

  • 性能高于JDBC,写入消息时,会将消息写入日志文件,由于是顺序追加写,性能很高。为了提升性能,创建消息主键索引,并且提供缓存机制,进一步提升性能。

  • 每个日志文件的大小是有限制的,默认为32M,可以自行配置。当超过这个大小,系统会重新建立一个文件,当所有消息都消费完成,系统会删除这个文件或者归档。

  • 主要缺点在于AMQ Message会为每一个Destination创建一个索引,如果使用了大量的Queue,索引文件的大小会占用很多磁盘空间,而由于索引巨大,Broker崩溃,重建索引的速度非常慢。

  • 配置方法:

1
2
3
<persistenceAdapter>
<amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/>
</persistenceAdapter>

虽然AMQ性能高于KahaDb方式,但是由于重建索引时间过长,索引占用磁盘空间过大,实际项目中并不推荐使用,了解即可。

KahaDB方式

KahaDB是从ActiveMQ5.4开始默认的持久化插件。KahaDB恢复时间远远小于其前身AMQ并且使用更少的数据文件,可以完全替代AMQ。

KahaDB的持久化同样是基于日志文件,索引和缓存的。

  • 配置方式如下,其中directory用来指定持久化消息的存储目录,journalMaxFileLength用来指定保存消息的日志文件大小,具体根据你的实际应用配置。
1
2
3
<persistenceAdapter>
<kahaDB directory="${activemq.data}/activemq-data" journalMaxFileLength="16mb"/>
</persistenceAdapter>
  • KahaDB主要特性:

    • 日志形式存储消息
    • 消息索引以B-Tree结构存储
    • 完全支持JMS事务
    • 支持多种恢复机制
  • KahaDB的结构

消息存储在基于文件的的数据日志中。如果消息发送成功,变标记为可删除。系统会周期性的清除或者归档日志文件。消息文件的位置索引存储在内存中,这样能快速定位到文件。定期将内存中的消息索引保存到metadata store中,避免大量消息未发送时,消息索引占用过多内存。

Alt text

  • Data logs:Data logs用于存储消息日志,消息的去哪补内容都在Data logs中。同AMQ一样,一个Data logs文件大小超过规定的最大值,会新建一个文件,同样是在文件尾部追加,写入性能很快。每个消息在Data logs中有计数引用,所以当一个文件里所有的消息都不需要了,系统会自动删除文件或放入归档文件夹。

  • Metadata cache:缓存用于存放在线消费者消息。如果消费者已经快速的消费完成,name这些消息就不需要写入磁盘了。Btree索引会根据MessageID创建索引,用于快速的查找消息,这个索引同样维护持久化订阅者与Destination的关系,以及每个消费者消费消息的指针。

  • Metadata store:在db.data文件中保存消息日志中消息的元数据,也是一B-Tree结构存储的,定时从Metadata cache更新数据。Metadata store也会备份一些在消息日志中存在的信息,这样可以让broker实例快速启动。几遍metadata store文件被破坏或者删除了,broker可以读取data logs恢复过来,只是速度回很慢。

LevelDB

  • ActiveMQ5.6之后推出的持久化引擎LevelDB。
  • 默认方式仍然是KahaDB,LevelDB的持久化性能要高于KahaDB。
  • LevelDB主要用于Master-Slave方式的主从复制数据。
1…891011
cdx

cdx

Be a better man!

110 日志
36 分类
31 标签
GitHub E-Mail
© 2020 cdx
由 Hexo 强力驱动
|
主题 — NexT.Mist v5.1.2