JAVA concurrent

本文主要讲解Java并发相关的内容,包括锁、信号量、堵塞队列、线程池等主要内容。

并发的优点和缺点

在讲述怎么利用多线程的情况下,我们先看一下采用多线程并发的优缺点。

优点

  • 提高资源利用率
    如读取一个目录下的所有文件,如果采用单线程模型,则从磁盘读取文件的时候,大部分CPU用于等待磁盘去读取数据。如果是采用多线程并发执行,则CPU可以在等待IO的时候去做其他的事情,以提高CPU的使用率,减少资源的浪费。

  • 程序响应速度好
    单线程模型下,假设一个http请求需要占用大量的时间来处理,则其他的请求无法发送请求给服务端。而多线程模式下,监听线程把请求传递给工作者线程,然后立刻返回去监听,可以去接收新的请求,而工作者线程则能够处理这个请求并发送一个回复给客户端。明显响应速度比单线程模型要好得多。

    缺点

  • 程序设计复杂度
    多线程情况下,需要考虑线程间的通信、共享资源的访问,相对而言要比单线程程序负责一些。
  • 上下文切换开销大
    CPU从执行一个线程切换到执行另外一个线程的时候,它需要先存储当前线程的本地的数据,程序指针等,然后载入另一个线程的本地数据,程序指针等,最后才开始执行。这种切换称为“上下文切换”。CPU会在一个上下文中执行一个线程,然后切换到另外一个上下文中执行另外一个线程。尤其是当线程数量较多时,这种开销很明显。
  • 资源消耗
    线程在运行的时候需要从计算机里面得到一些资源。除了CPU,线程还需要一些内存来维持它本地的堆栈。它也需要占用操作系统中一些资源来管理线程

    并发模型

    并发系统可以采用多种并发编程模型来实现。并发模型指定了系统中的线程如何通过协作来完成分配给它们的作业。不同的并发模型采用不同的方式拆分作业,同时线程间的协作和交互方式也不相同。

    并行工作者

    在并行工作者模型中,委派者(Delegator)将传入的作业分配给不同的工作者。每个工作者完成整个任务。工作者们并行运作在不同的线程上,甚至可能在不同的CPU上。

    假设电商系统中的秒杀活动采用了并行工作者模型,订单->财务->仓储->物流,工作者A拿到订单请求,然后负责支付流程,查询仓储情况,直到发货。
    在Java应用系统中,并行工作者模型是最常见的并发模型,java.util.concurrent包中的许多并发实用工具都是设计用于这个模型的。

优点
易于理解,可以添加更多的工作者来提高系统的并行度
缺点

  • 共享状态可能会很复杂
    在上面的电商系统中,由于共享的工作者经常需要访问一些共享数据,无论是内存中的或者共享的数据库中的。
    在等待访问共享数据结构时,线程之间的互相等待将会丢失部分并行性。许多并发数据结构是阻塞的,意味着在任何一个时间只有一个或者很少的线程能够访问。这样会导致在这些共享数据结构上出现竞争状态。在执行需要访问共享数据结构部分的代码时,高竞争基本上会导致执行时出现一定程度的串行化。
  • 无状态的工作者
    每次都重读需要的数据,将会导致速度变慢,特别是状态保存在外部数据库中的时候。
  • 任务顺序是不确定的
    作业执行顺序是不确定的,无法保证哪个作业最先或者最后被执行。如A先下单,B后下单,不根据时间进行业务逻辑的判断,不能有可能B先于A收到货。

流水线模式

流水线模式中,每个工作者只负责作业中的部分工作。当完成了自己的这部分工作时工作者会将作业转发给下一个工作者。每个工作者在自己的线程中运行,并且不会和其他工作者共享状态。也称反应器系统,或事件驱动系统。

以秒杀为例,工作者A执行订单的处理,工作者B执行支付,工作者C检查仓储,工作者D负责物流,分工明确,各司其职。
在实际应用中,作业有可能不会沿着单一流水线进行。由于大多数系统可以执行多个作业,作业从一个工作者流向另一个工作者取决于作业需要做的工作。在实际中可能会有多个不同的虚拟流水线同时运行。

作业甚至也有可能被转发到超过一个工作者上并发处理。比如说,作业有可能被同时转发到作业执行器和作业日志器。下图说明了三条流水线是如何通过将作业转发给同一个工作者(中间流水线的最后一个工作者)来完成作业:

优点

  • 无需共享的状态
    工作者之间无需共享状态,意味着实现的时候无需考虑所有因并发访问共享对象而产生的并发性问题

  • 较好的硬件整合
    单线程代码在整合底层硬件的时候往往具有更好的优势。首先,当能确定代码只在单线程模式下执行的时候,通常能够创建更优化的数据结构和算法。

  • 合理的作业顺序
    基于流水线并发模型实现的并发系统,在某种程度上是有可能保证作业的顺序的。作业的有序性使得它更容易地推出系统在某个特定时间点的状态

缺点

  • 编写难度大
    好在有一些平台框架可以直接使用,如Akka,Node.JS

  • 跟踪困难
    流水线并发模型最大的缺点是作业的执行往往分布到多个工作者上,并因此分布到项目中的多个类上。这样导致在追踪某个作业到底被什么代码执行时变得困难。

函数式并行

函数式并行的基本思想是采用函数调用实现程序。函数可以看作是代理人agents或者actor,函数之间可以像流水线模型(反应器或者事件驱动系统)那样互相发送消息。
函数都是通过拷贝来传递参数的,所以除了接收函数外没有实体可以操作数据。这对于避免共享数据的竞态来说是很有必要的。同样也使得函数的执行类似于原子操作。每个函数调用的执行独立于任何其他函数的调用。

Runnable、Callable、Future、Thread、FutureTask

Java并发中主要以RunnableCallableFuture三个接口作为基础。

Runnable

实例想要被线程执行,可以通过实现Runnable接口。
。通过实例化某个 Thread 实例并将自身作为运行目标,就可以运行实现 Runnable 的类而无需创建 Thread 的子类。大多数情况下,如果只想重写 run() 方法,而不重写其他 Thread 方法,那么应使用 Runnable 接口。这很重要,因为除非程序员打算修改或增强类的基本行为,否则不应为该类创建子类。

Callable

Callable 接口类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable不会返回结果,并且无法抛出经过检查的异常。

Future

Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。
主要方法如下:

  • cancel(boolean mayInterruptIfRunning)
    试图取消对此任务的执行。
  • get()
    如有必要,等待计算完成,然后获取其结果。
  • get(long timeout, TimeUnit unit)
    如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
  • isCancelled()
    如果在任务正常完成前将其取消,则返回 true。
  • isDone()
    如果任务已完成,则返回 true。

Thread

线程的创建

Java中,我们有2个方式创建线程:

  • 通过直接继承thread类,然后覆盖run()方法。
  • 构建一个实现Runnable接口的类, 然后创建一个thread类对象并传递Runnable对象作为构造参数

    线程的运行流程

    我们在主线程中创建5个子线程,每个子线程通过构造函数初始化number的值,来实现1-5内的乘法表:

      package com.molyeo.java.concurrent;
      public class ThreadTest {
          public static void main(String[] args) {
              System.out.println("main thread start");
              for (int i = 1; i <= 5; i++) {
                  Calculator calculator = new Calculator(i);
                  Thread thread = new Thread(calculator);
                  thread.start();
              }
              System.out.println("main thread end");
          }
      }   
      class Calculator implements Runnable {  
          private int number; 
          public Calculator(int number) {
              this.number = number;
          }   
          @Override
          public void run() {
              for (int i = 1; i <= 5; i++) {
                  System.out.printf("%s: %d * %d = %d \n", Thread.currentThread().getName(), number, i, i * number);
              }
          }
      }

程序输出如下:

main thread start
Thread-0: 1 * 1 = 1 
Thread-0: 1 * 2 = 2 
Thread-0: 1 * 3 = 3 
Thread-0: 1 * 4 = 4 
Thread-0: 1 * 5 = 5 
Thread-4: 5 * 1 = 5 
Thread-4: 5 * 2 = 10 
Thread-4: 5 * 3 = 15 
Thread-4: 5 * 4 = 20 
Thread-4: 5 * 5 = 25 
Thread-3: 4 * 1 = 4 
Thread-3: 4 * 2 = 8 
Thread-2: 3 * 1 = 3 
Thread-2: 3 * 2 = 6 
Thread-2: 3 * 3 = 9 
Thread-2: 3 * 4 = 12 
Thread-1: 2 * 1 = 2 
Thread-1: 2 * 2 = 4 
Thread-1: 2 * 3 = 6 
main thread end
Thread-1: 2 * 4 = 8 
Thread-3: 4 * 3 = 12 
Thread-3: 4 * 4 = 16 
Thread-3: 4 * 5 = 20 
Thread-2: 3 * 5 = 15 
Thread-1: 2 * 5 = 10 

在Java中,每个应用程序最少有一个执行线程,运行程序时,JVM负责调用main()方法的执行线程。
当全部的非守护线程执行结束时,Java程序才算结束。从输出中也可以看到,主程序输出main thread end后,其他程序还是继续执行,直到执行结束。
需要注意的是,如果某个线程调用System.exit()指示终结程序,那么全部的线程都会结束执行。

线程中断、睡眠、设置优先级

下面的示例中,NumberGenerator中首先创建numberGenetorThread线程,并设置优先级,启动线程后,一直循环运行,打印出number的值,直到5毫秒后主线程调用interrupt()方法让其中断,numberGenetorThread线程其跳出while循环。首次调用方法isInterrupted()返回值为true,表示线程已中断。
需要注意的是,interrupt()方法测试当前线程是否已经中断,线程的中断状态也由该方法清除。换句话说,如果连续两次调用该方法,则第二次调用将返回 false。大家可以打开下面的注释去测试。

package com.molyeo.java.concurrent;

/**
 * Created by zhangkh on 2018/8/23.
 */
public class ThreadTest2 {
    public static void main(String[] args) throws InterruptedException {
        Thread numberGenetorThread = new NumberGenerator(0);
        numberGenetorThread.setPriority(Thread.MAX_PRIORITY);
        numberGenetorThread.start();
        Thread.sleep(5);
        numberGenetorThread.interrupt();
        System.out.println("first interrupt,isInterrupted=" + numberGenetorThread.isInterrupted());

//        Thread.sleep(5);
//        numberGenetorThread.interrupt();
//        System.out.println("second interrupt,isInterrupted=" + numberGenetorThread.isInterrupted());

    }
}

class NumberGenerator extends Thread {
    private int number;

    public NumberGenerator(int number) {
        this.number = number;
    }

    @Override
    public void run() {
        while (!isInterrupted()) {
            System.out.println("number is " + number);
            number++;
        }
        System.out.println("NumberGenerator thread,isInterrupted= " + this.isInterrupted());
    }
}

程序部分输出如下:

number is 96
number is 97
NumberGenerator thread,isInterrupted= true
first interrupt,isInterrupted=true

ThreadLocal

定义和作用
ThreadLocal称线程本地变量,并不是为了解决共享对象的多线程访问的问题的,因为如果ThreadLocal.set()放进去的本来就是多线程共享的同一个对象的话,线程通过ThreadLocal.get()方法得到的还是共享对象本身,依旧存在并发访问的问题。其是每个线程所单独持有的,主要是提供了保持对象的方法和避免参数传递,以方便对象的访问。

  • 每个线程中都有一个自己的ThreadLocalMap类对象,可以将线程自己的对象保持到其中,各管各的,线程可以正确的访问到自己的对象。
  • 将一个共用的ThreadLocal静态实例作为key,将不同对象的引用保存到不同线程的ThreadLocalMap中,然后在线程执行的各处通过这个静态ThreadLocal实例的get()方法取得自己线程保存的那个对象,避免了将这个对象作为参数传递的麻烦。

程序运行时,每个线程都保持对其线程局部变量副本的隐式引用,只要线程是活动的并且 ThreadLocal 实例是可访问的;在线程消失之后,其线程局部实例的所有副本都会被垃圾回收(除非存在对这些副本的其他引用)。

使用示例
如下我们创建ThreadLocal的实例stringLocal,分别在主线程和子线程中设置其值为当前线程名字。查看输出的结果可以看到线程间彼此不干扰,各自输出自己设置的值。

package com.molyeo.java.concurrent;

/**
 * Created by zhangkh on 2018/8/24.
 */
public class ThreadLocalDemo {
    public static void main(String[] args) throws InterruptedException {
        ThreadLocal<String> stringLocal = new ThreadLocal<String>();
        stringLocal.set(Thread.currentThread().getName());
        System.out.println(String.format("threadName=%10s,threadLocal valaue=%10s",Thread.currentThread().getName(),stringLocal.get()) );

        Thread thread1 = new Thread() {
            public void run() {
                stringLocal.set(Thread.currentThread().getName());
                System.out.println(String.format("threadName=%10s,threadLocal valaue=%10s",Thread.currentThread().getName(),stringLocal.get()) );
            }
        };
        thread1.start();
        thread1.join();

        System.out.println(String.format("threadName=%10s,threadLocal valaue=%10s",Thread.currentThread().getName(),stringLocal.get()) );
    }
}

程序输出如下:

threadName=      main,threadLocal valaue=      main
threadName=  Thread-0,threadLocal valaue=  Thread-0
threadName=      main,threadLocal valaue=      main

源码实现
ThreadLocal有3个成员变量

private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
    return nextHashCode.getAndAdd(HASH_INCREMENT);
}

nextHashCodeThreadLocal的静态变量,HASH_INCREMENT是静态常量,只有threadLocalHashCodeThreadLocal实例的变量。
在创建ThreadLocal类实例的时候,将ThreadLocal类的下一个hashCode值即nextHashCode的值赋给实例的threadLocalHashCode,然后nextHashCode的值增加HASH_INCREMENT这个值。而实例变量threadLocalHashCodefinal的,用来区分不同的ThreadLocal实例。
ThreadLocal实例stringLocal创建完成后,调用set()方法时,

public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

先获取当前线程,即main线程,然后根据线程实例调用getMap()方法获取ThreadLocalMap
其中getMap()方法如下:

ThreadLocalMap getMap(Thread t) {
    return t.threadLocals;
}

getMap()方法直接返回线程的成员变量threadLocals,其中threadLocals变量是ThreadLocalMap类的实例,而ThreadLocalMapThreadLocal的内部类。
如果map(当前线程的成员变量threadLocals)存在,则将数据写入到ThreadLoclMap用于存储数据的Entry中。
ThreadLocalMapset方法如下:

private void set(ThreadLocal<?> key, Object value) {
    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);

    for (Entry e = tab[i];
         e != null;
         e = tab[i = nextIndex(i, len)]) {
        ThreadLocal<?> k = e.get();

        if (k == key) {
            e.value = value;
            return;
        }

        if (k == null) {
            replaceStaleEntry(key, value, i);
            return;
        }
    }

    tab[i] = new Entry(key, value);
    int sz = ++size;
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
}

其中Entry定义如下

static class Entry extends WeakReference<ThreadLocal<?>> {
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

keyThreadLocal实例,值是用户定义的具体对象值。

如果map(当前线程的成员变量threadLocals)不存在,则创建一个ThreadLocalMap实例,并和线程的成员变量threadLocals关联起来。其中ThreadLocalMap实例的keythis,即ThreadLocal实例stringLocal,值是用户定义的具体对象值。

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

总的来说,ThreadLocal的作用是提供线程内的局部变量,这种变量在线程的生命周期内起作用。作用:提供一个线程内公共变量(比如本次请求的用户信息),减少同一个线程内多个函数或者组件之间一些公共变量的传递的复杂度,或者为线程提供一个私有的变量副本,这样每一个线程都可以随意修改自己的变量副本,而不会对其他线程产生影响。

其他内容待续……

本文参考

Java 7 Concurrency Cookbook

http://ifeve.com/concurrency-modle-seven-week-1/

http://tutorials.jenkov.com/java-concurrency/concurrency-models.html

版权声明:本文为molyeo原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/molyeo/p/9530427.html