Java 线程同步原理探析

现如今,服务器功用日益增长,并发(concurrency)VWIN首页现已“家喻户晓”,但由于冯诺依式计算机“指令存储,次序履行”的特性,使得编写跨过时间维度的并发程序反常困难,所以现代VWIN首页言语都对并发VWIN首页供给了必定程度的支撑,像 Golang 里边的 Goroutines、Clojure 里边的 STM(Software Transactional Memory)、Erlang 里边的 Actor

Java 关于并发VWIN首页的处理计划是多线程(Multi-threaded programming),并且 Java 中的线程 与 native 线程一一对应,多线程也是前期操作体系支撑并发的计划之一(其他计划:多进程、IO多路复用)。

本文偏重介绍 Java 中线程同步的原理、完成机制,更偏重操作体系层面,部分原理参阅 openjdk 源码。阅览本文需求对 CyclicBarrier、CountDownLatch 有根本的运用经历。

JUC

在 Java 1.5 版别中,引进 JUC 并发VWIN首页辅佐包,很大程度上降低了并发VWIN首页的门槛,JUC 里边首要包含:

  • 线程调度的 188bets
  • 缓冲使命的 Queues
  • 超时相关的 TimeUnit
  • 并发调集(如 ConcurrentHashMap)
  • 线程同步类(Synchronizers,如 CountDownLatch )

个人以为其间最重要也是最中心的是线程同步这一块,由于并发VWIN首页的难点就在于怎么确保「同享区域(专业术语:临界区,Critical Section)的拜访时序问题」。

AbstractQueuedSynchronizer

JUC 供给的同步类首要有如下几种:

  • Semaphore is a classic concurrency tool.
  • CountDownLatch is a very simple yet very common utility for blocking until a given number of signals, events, or conditions hold.
  • CyclicBarrier is a resettable multiway synchronization point useful in some styles of parallel programming.
  • Phaser provides a more flexible form of barrier that may be used to control phased computation among multiple threads.
  • An Exchanger allows two threads to exchange objects at a rendezvous(约会) point, and is useful in several pipeline designs.

经过阅览其源码能够发现,其完成都依据 AbstractQueuedSynchronizer 这个笼统类(一般简写 AQS),正如其 javadoc 最初所说:

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state.

也便是说,AQS 经过保护内部的 FIFO 行列和具有原子更新的整型 state 这两个特点来完成各种锁机制,包含:是否公正,是否可重入,是否同享,是否可中止(interrupt),并在这根底上,供给了更便利有用的同步类,也便是一开端提及的 Latch、Barrier 等。

这儿暂时不去介绍 AQS 完成细节与怎么依据 AQS 完成各种同步类(挖个坑),感兴趣的能够移步美团的一篇文章《不可不说的Java“锁”事》 第六部分“独享锁 VS 同享锁”。

在学习 Java 线程同步这一块时,对我来说困扰最大的是「线程唤醒」,试想一个现已 wait/sleep/block 的线程,是怎么呼应 interrupt 的呢?当调用 Object.wait() 或 lock.lock() 时,JVM 终究做了什么事情能够在调用 Object.notify 或 lock.unlock 时从头激活相应线程?

带着上面的问题,咱们从源码中寻觅答案。

Java 怎么完成阻塞、告诉

wait/notify

public final native void wait(long timeout) throws InterruptedException;
public final native void notify();

在 JDK 源码中,上述两个办法均用 native 完成(即 cpp 代码),追寻相关代码

// java.base/share/native/libjava/Object.c
static JNINativeMethod methods[] = {
    {"hashCode",    "()I",                    (void *)&JVM_IHashCode},
    {"wait",        "(J)V",                   (void *)&JVM_MonitorWait},
    {"notify",      "()V",                    (void *)&JVM_MonitorNotify},
    {"notifyAll",   "()V",                    (void *)&JVM_MonitorNotifyAll},
    {"clone",       "()Ljava/lang/Object;",   (void *)&JVM_Clone},
};

经过上面的 cpp 代码,咱们大约能猜出 JVM 是运用 monitor 来完成的 wait/notify 机制,至于这儿的 monitor 是何种机制,这儿暂时越过,接着看 lock 相关完成

lock/unlock

LockSupport 是用来完成阻塞语义模型的根底辅佐类,首要有两个办法:park 与 unpark。(在英文中,park 除了“公园”意义外,还有“泊车”的意思)

// LockSupport.java
    public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
// Unsafe.java
    /**
     * Unblocks the given thread blocked on {@code park}, or, if it is
     * not blocked, causes the subsequent call to {@code park} not to
     * block.  Note: this operation is "unsafe" solely because the
     * caller must somehow ensure that the thread has not been
     * destroyed. Nothing special is usually required to ensure this
     * when called from Java (in which there will ordinarily be a live
     * reference to the thread) but this is not nearly-automatically
     * so when calling from native code.
     *
     * @param thread the thread to unpark.
     */
    @HotSpotIntrinsicCandidate
    public native void unpark(Object thread);

    /**
     * Blocks current thread, returning when a balancing
     * {@code unpark} occurs, or a balancing {@code unpark} has
     * already occurred, or the thread is interrupted, or, if not
     * absolute and time is not zero, the given time nanoseconds have
     * elapsed, or if absolute, the given deadline in milliseconds
     * since Epoch has passed, or spuriously (i.e., returning for no
     * "reason"). Note: This operation is in the Unsafe class only
     * because {@code unpark} is, so it would be strange to place it
     * elsewhere.
     */
    @HotSpotIntrinsicCandidate
    public native void park(boolean isAbsolute, long time);

// hotspot/share/prims/unsafe.cpp
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) {
  HOTSPOT_THREAD_PARK_BEGIN((uintptr_t) thread->parker(), (int) isAbsolute, time);
  EventThreadPark event;

  JavaThreadParkedState jtps(thread, time != 0);
  thread->parker()->park(isAbsolute != 0, time);
  if (event.should_commit()) {
    post_thread_park_event(&event, thread->current_park_blocker(), time);
  }
  HOTSPOT_THREAD_PARK_END((uintptr_t) thread->parker());
} UNSAFE_END

经过上述 unsafe.cpp 能够看到每个 thread 都会有一个 Parker 目标,所以咱们需求查看 parker 目标的界说

// hotspot/share/runtime/park.hpp
class Parker : public os::PlatformParker
...
public:
  // For simplicity of interface with Java, all forms of park (indefinite,
  // relative, and absolute) are multiplexed into one call.
  void park(bool isAbsolute, jlong time);
  void unpark();

// hotspot/os/posix/os_posix.hpp
class PlatformParker : public CHeapObj<mtInternal> {
 protected:
  enum {
    REL_INDEX = 0,
    ABS_INDEX = 1
  };
  int _cur_index;  // which cond is in use: -1, 0, 1
  pthread_mutex_t _mutex[1];
  pthread_cond_t  _cond[2]; // one for relative times and one for absolute
  ...
};

看到这儿大约就能知道 park 是运用 pthread_mutex_t 与 pthread_cond_t 完成。好了,到目前为止,就引出了 Java 中与阻塞相关的完成,不难想象,都是依靠底层操作体系的功用。

OS 支撑的同步原语

Semaphore

并发VWIN首页范畴的前锋人物 Edsger Dijkstra(没错,也是最短途径算法的作者)在 1965 年初次提出了信号量( Semaphores) 这一概念来处理线程同步的问题。信号量是一种特别的变量类型,为非负整数,只要两个特别操作PV:

  • P(s) 假如 s!=0,将 s-1;不然将当时线程挂起,直到 s 变为非零
  • V(s) 将 s+1,假如有线程阻塞在 P 操作等候 s 变成非零,那么 V 操作会重启这些线程中的恣意一个

注:Dijkstra 为荷兰人,姓名 P 和 V 来历于荷兰单词 Proberen(测验)和Verhogen(添加),为便利了解,后文会用 Wait 与 Signal 来表明。

struct semaphore {
     int val;
     thread_list waiting;  // List of threads waiting for semaphore
}
wait(semaphore Sem):    // Wait until > 0 then decrement
  // 这儿用的是 while 而不是 if
  // 这是由于在 wait 过程中,其他线程还或许持续调用 wait
  while (Sem.val <= 0) {
    add this thread to Sem.waiting;
    block(this thread);
  }
  Sem.val = Sem.val - 1;
return;

signal(semaphore Sem):// Increment value and wake up next thread
     Sem.val = Sem.val + 1;
     if (Sem.waiting is nonempty) {
         remove a thread T from Sem.waiting;
         wakeup(T);
     }

有两点留意事项:

  1. wait 中的「测验和减 1 操作」,signal 中的「加 1 操作」需求确保原子性。一般来说是运用硬件支撑的 read-modify-write 原语,比方 test-and-set/fetch-and-add/compare-and-swap,除了硬件支撑外,还能够用 busy wait 的软件办法来模仿。
  2. signal 中没有界说从头启动的线程次序,也即多个线程在等候同一信号量时,无法猜测重启哪一个线程

运用场景

信号量为操控并发程序的履行供给了强有力东西,这儿罗列两个场景:

互斥

信号量供给了了一种很便利的办法来确保对同享变量的互斥拜访,根本思想是

将每个同享变量(或一组相关的同享变量)与一个信号量 s (初始化为1)联系起来,然后用 wait/signal 操作将相应的临界区包围起来。

二元信号量也被称为互斥锁(mutex,mutual exclusve, 也称为 binary semaphore),wait 操作相当于加锁,signal 相当于解锁。
一个被用作一组可用资源的计数器的信号量称为计数信号量(counting semaphore)

调度同享资源

除了互斥外,信号量的另一个重要作用是调度对同享资源的拜访,比较经典的事例是生产者顾客,伪代码如下:

emptySem = N
fullSem = 0
// Producer
while(whatever) {
    locally generate item
    wait(emptySem)
    fill empty buffer with item
    signal(fullSem)
}
// Consumer
while(whatever) {
    wait(fullSem)
    get item from full buffer
    signal(emptySem)
    use item
}

POSIX 完成

POSIX 规范中有界说信号量相关的逻辑,在 semaphore.h 中,为 sem_t 类型,相关 API:

// Intialize: 
sem_init(&theSem, 0, initialVal);
// Wait: 
sem_wait(&theSem);
// Signal: 
sem_post(&theSem);
// Get the current value of the semaphore:       
sem_getvalue(&theSem, &result);

信号量首要有两个缺陷:

  • Lack of structure,在规划大型体系时,很难确保 wait/signal 能以正确的次序成对呈现,次序与成对缺一不可,不然就会呈现死锁!
  • Global visiblity,一旦程序呈现死锁,整个程序都需求去查看

处理上述两个缺陷的新计划是监控器(monitor)

Monitors

C. A. R. Hoare 在 1974 年的论文 Monitors: an operating system structuring concept 初次提出了「监控器」概念,它供给了对信号量互斥和调度才能的更高等级的笼统,运用起来愈加便利,一般办法如下:

monitor1 . . . monitorM
process1 . . . processN

咱们能够以为监控器是这么一个目标:

  • 全部拜访同一监控器的线程经过条件变量(condition variables)直接通讯
  • 某一个时间,只能有一个线程拜访监控器

Condition variables

上面说到监控器经过条件变量(简写 cv)来和谐线程间的通讯,那么条件变量是什么呢?它其实是一个 FIFO 的行列,用来保存那些因等候某些条件建立而被阻塞的线程,关于一个条件变量 c 来说,会相关一个断语(assertion) P。线程在等候 P 建立的过程中,该线程不会锁住该监控器,这样其他线程就能够进入监控器,修正监控器状况;在 P 建立时,其他线程会告诉阻塞的线程,因而条件变量上首要有三个操作:

  1. wait(cv, m) 等候 cv 建立,m 表明与监控器相关的一 mutex 锁
  2. signal(cv) 也称为 notify(cv) 用来告诉 cv 建立,这时会唤醒等候的线程中的一个履行。依据唤醒战略,监控器分为两类:Hoare vs. Mesa,后面会介绍
  3. broadcast(cv) 也称为 notifyAll(cv) 唤醒全部等候 cv 建立的线程
POSIX 完成

在 pthreads 中,条件变量的类型是 pthread_cond_t,首要有如下几个办法:

// initialize
pthread_cond_init() 
pthread_cond_wait(&theCV, &someLock);
pthread_cond_signal(&theCV);
pthread_cond_broadcast(&theCV);

运用办法

在 pthreads 中,全部运用条件变量的当地都必须用一个 mutex 锁起来,这是为什么呢?看下面一个比如:

pthread_mutex_t myLock;
pthread_cond_t myCV;
int count = 0;

// Thread A
pthread_mutex_lock(&myLock);
while(count < 0) {
    pthread_cond_wait(&myCV, &myLock);
}
pthread_mutex_unlock(&myLock);

// Thread B

pthread_mutex_lock(&myLock);
count ++;
while(count == 10) {
    pthread_cond_signal(&myCV);
}
pthread_mutex_unlock(&myLock);

假如没有锁,那么

  • 线程 A 或许会在其他线程将 count 赋值为10后持续等候
  • 线程 B 无法确保加一操作与测验 count 是否为零 的原子性

这儿的要害点是,在进行条件变量的 wait 时,会开释该锁,以确保其他线程能够将之唤醒。不过需求留意的是,在线程 B 告诉(signal) myCV 时,线程 A 无法马上康复履行,这是由于 myLock 这个锁还被线程 B 持有,只要在线程 B unlock(&myLock) 后,线程 A 才可康复。总结一下:

  1. wait 时会开释锁
  2. signal 会唤醒等候同一 cv 的线程
  3. 被唤醒的线程需求从头获取锁,然后才能从 wait 中回来

Hoare vs. Mesa 监控器语义

在上面条件变量中,咱们说到 signal 在调用时,会去唤醒等候同一 cv 的线程,依据唤醒战略的不同,监控器也分为两类:

  • Hoare 监控器(1974),最早的监控器完成,在调用 signal 后,会马上运转等候的线程,这时调用 signal 的线程会被阻塞(由于锁被等候线程占有了)
  • Mesa 监控器(Xerox PARC, 1980),signal 会把等候的线程从头放回到监控的 ready 行列中,一起调用 signal 的线程持续履行。这种办法是现如今 pthreads/Java/C# 选用的

这两类监控器的要害差异在于等候线程被唤醒时,需求从头查看 P 是否建立。

监控器作业示意图

上图表明蓝色的线程在调用监控器的 get 办法时,数据为空,因而开端等候 emptyFull 条件;紧接着,赤色线程调用监控器的 set 办法改动 emptyFull 条件,这时

  • 依照 Hoare 思路,蓝色线程会马上履行,并且赤色线程阻塞
  • 依照 Mesa 思路,赤色线程会持续履行,蓝色线程会从头与绿色线程竞赛与监控器相关的锁

Java 中的监控器

在 Java 中,每个目标都是一个监控器(因而具有一个 lock 与 cv),调用目标 o 的 synchronized 办法 m 时,会首先去获取 o 的锁,除此之外,还能够调用 o 的 wait/notify/notify 办法进行并发操控

Big Picture

操作体系并发相关 API 归纳图

Interruptible

经过介绍操作体系支撑的同步原语,咱们知道了 park/unpark、wait/notify 其实便是运用信号量( pthread_mutex_t)、条件变量( pthread_cond_t)完成的,其实监控器也能够用信号量来完成。在查看 AQS 中,发现有这么一个特点:

/**
 * The number of nanoseconds for which it is faster to spin
 * rather than to use timed park. A rough estimate suffices
 * to improve responsiveness with very short timeouts.
 */
static final long spinForTimeoutThreshold = 1000L;

也便是说,在小于 1000 纳秒时,await 条件变量 P 时,会运用一个循环来替代条件变量的阻塞与唤醒,这是由于阻塞与唤醒自身的操作开支或许就远大于 await 的 timeout。相关代码:

// AQS 的 doAcquireNanos 办法节选
for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return true;
    }
    nanosTimeout = deadline - System.nanoTime();
    if (nanosTimeout <= 0L)
        return false;
    if (shouldParkAfterFailedAcquire(p, node) &&
        nanosTimeout > spinForTimeoutThreshold)
        LockSupport.parkNanos(this, nanosTimeout);
    if (Thread.interrupted())
        throw new InterruptedException();
}

在 JUC 供给的高档同步类中,acquire 对应 park,release 对应 unpark,interrupt 其实便是个布尔的 flag 位,在 unpark 被唤醒时,查看该 flag ,假如为 true,则会抛出咱们了解的 InterruptedException。

Selector.select() 呼应中止反常的逻辑有些特别,由于关于这类阻塞 IO 操作来说,没有条件变量的阻塞唤醒机制,咱们能够再看下 Thread.interrupt 的完成

public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

OpenJDK 运用了这么一个技巧来完成阻塞 IO 的中止唤醒:在一个线程被阻塞时,会相关一个 Interruptible 目标。
关于 Selector 来说,在开端时,会相关这么一个Interruptible 目标

protected final void begin() {
    if (interruptor == null) {
        interruptor = new Interruptible() {
                public void interrupt(Thread target) {
                    synchronized (closeLock) {
                        if (closed)
                            return;
                        closed = true;
                        interrupted = target;
                        try {
                            AbstractInterruptibleChannel.this.implCloseChannel();
                        } catch (IOException x) { }
                    }
                }};
    }
    blockedOn(interruptor);
    Thread me = Thread.currentThread();
    if (me.isInterrupted())
        interruptor.interrupt(me);
}

当调用 interrupt 办法时,会封闭该 channel,这样就会封闭掉这个阻塞线程,可见为了完成这个功用,价值也是比较大的。LockSupport.park 中选用了相似技巧。

总结

或许依据多线程的并发VWIN首页不是最好的(或许是最杂乱的,Clojure 大法好 :-),但却是最悠长的。
即使咱们自己不去写往往也需求阅览他人的多线程代码,并且能够写出“正确”(who knows?)的多线程程序往往也是区别 senior 与 junior 程序员的标志,期望这篇文章能协助我们了解 Java 是怎么完成线程操控,有疑问欢迎留言指出,谢谢!

参阅

vwin娱乐场

宣布我的谈论

撤销谈论
表情 插代码

Hi,您需求填写昵称和邮箱!

  • 必填项
  • 必填项