原创

JUC学习笔记


JUC学习笔记

一.进程, 线程, 管程

进程: 进程有自己的系统内存和资源

线程: 线程共享进程的内存和资源

管程: 每个java对象都有一个作为锁的对象, 这个锁对象称为管程.

用户线程: 通过new, 还有线程池创建的线程一般都是用户线程

守护线程: 处理一些系统服务, 比如垃圾回收等的线程. 如果jvm只剩守护线程, 则jvm会关闭, 退出程序.

Thread t1 = new Thread(() -> {

}, "t1");
t1.setDaemon(true);
t1.start();

lambda函数方法速记

Runnable: 无参无返回值

Function: 有参有返回值

Consumer: 有参无返回值

Supplier: 无参, 有返回值

二.Future和CompletableFuture

1.Future

Runnable: 无返回值

Callable: 有返回值

Future: 有返回值

RunnableFuture接口继承了Future和Runnable接口, FutureTask是RunnableFuture的实现类, 并且构造器参数为Callable接口.

public class FutureTaskTest {
    public static void main(String[] args) throws Exception {
        FutureTask<String> task = new FutureTask<>(() -> {
            System.out.println("Future Task Run");
            return "hello";
        });
        new Thread(task, "t1").start();
        System.out.println(task.get());
    }
}

2.FutureTask

优点: 可以执行带有返回结果的异步任务

缺点:

  • get() 获取返回结果的方法是阻塞的
  • isDone() 需要使用循环进行轮询, 才能知道是否有返回结果, 占用CPU资源.

3.CompletableFuture

CompletableFuture实现了Future和CompletionStage接口, 比FutureTask更加强大.

CompletionStage代理异步计算过程中的一个阶段, 一个阶段完成后可能会触发另外一个阶段.

四个创建静态方法

public class CompletableFutureTaskTest {
    public static void main(String[] args) throws Exception {
        // 默认使用ForkJoinPool.commonPool
        CompletableFuture.runAsync(() -> {
            System.out.println("hello");
        });
        ExecutorService pool = Executors.newFixedThreadPool(3);
        // 使用自定义的线程池
        CompletableFuture.runAsync(() -> {
            System.out.println("hello");
        }, pool);
        // 默认使用ForkJoinPool.commonPool
        CompletableFuture.supplyAsync(() -> {
            return "1";
        });
        // 使用自定义的线程池
        CompletableFuture.supplyAsync(() -> {
            return "1";
        }, pool);
    }
}

异步编程示例

    public static void main(String[] args) throws Exception {
        // 使用自定义的线程池
        ExecutorService pool = Executors.newFixedThreadPool(3);
        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        }, pool).whenComplete((r, e) -> {
            if (e == null) {
                System.out.println("获取计算结果: " + r);
            }
        }).exceptionally(e -> {
            e.printStackTrace();
            System.out.println("出现异常," + e.getMessage());
            return null;
        });
        // get和join方法基本一致, join只是不抛出受检异常
        completableFuture.get();
        completableFuture.join();
    }

电商查价格需求示例

public class NetMallPriceTaskTest {

    private static List<NetMall> netMalls = Arrays.asList(
            new NetMall("jd"),
            new NetMall("taobao"),
            new NetMall("dangdang"),
            new NetMall("tmall"),
            new NetMall("raisedt")
    );

    /
     * 一个个遍历获取
     *
     * @param productName 产品名称
     * @return 结果
     */
    public static List<String> getPrice(String productName) {
        return netMalls.stream().map(n -> String.format("%s in %s price is %.2f", productName,
                n.getMallName(), n.price(productName))).collect(Collectors.toList());
    }

    /
     * 使用CompletableFuture
     *
     * @param productName 产品名称
     * @return 结果
     */
    public static List<String> getPriceByCompletableFuture(String productName) {
        return netMalls.stream()
                .map(n -> CompletableFuture.supplyAsync(() -> String.format("%s in %s price is %.2f", productName,
                        n.getMallName(), n.price(productName))))
                .collect(Collectors.toList())
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }

    public static void main(String[] args) {
        // 一个个遍历获取
        long costTime = CostTimeUtil.calc(() -> {
            List<String> list = getPrice("mysql");
            for (String s : list) {
                System.out.println(s);
            }
        });
        System.out.println("一个个遍历获取,花费时间: " + costTime + "毫秒");

        // 使用CompletableFuture获取
        long costTime2 = CostTimeUtil.calc(() -> {
            List<String> list = getPriceByCompletableFuture("mysql");
            for (String s : list) {
                System.out.println(s);
            }
        });
        System.out.println("使用CompletableFuture获取,花费时间: " + costTime2 + "毫秒");
    }

    /
     * 电商类
     */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class NetMall {

        private String mallName;

        /
         * 根据产品名称获取产品价格
         *
         * @param productName 产品名称
         * @return 产品价格
         */
        public double price(String productName) {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
        }
    }
}

常用方法

获取结果

    public T get() throws InterruptedException, ExecutionException {
        Object r;
        return reportGet((r = result) == null ? waitingGet(true) : r);
    }
// 在指定时间内获取结果
    public T get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        Object r;
        long nanos = unit.toNanos(timeout);
        return reportGet((r = result) == null ? timedGet(nanos) : r);
    }

    public T join() {
        Object r;
        return reportJoin((r = result) == null ? waitingGet(false) : r);
    }
// 立即获取结果,如果没有则返回传入的valueIfAbsent
    public T getNow(T valueIfAbsent) {
        Object r;
        return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
    }

主动触发计算

    // 是否打断get方法,如果打断了返回方法参数value值,如果没有打断,则直接返回get方法的值
	public boolean complete(T value) {
        boolean triggered = completeValue(value);
        postComplete();
        return triggered;
    }

对计算结果进行处理

	// thenApply计算结果存在依赖关系,两个线程串行化
	// 由于存在依赖关系, 上一步出现异常,则不能进行下一步
	public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }
	// handle计算结果存在依赖关系,两个线程串行化
	// 有异常也可以往下一步走,根据带的异常参数可以进一步处理
    public <U> CompletableFuture<U> handle(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(null, fn);
    }

    public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(asyncPool, fn);
    }

    public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
        return uniHandleStage(screenExecutor(executor), fn);
    }

对计算结果进行消费

    // thenAccept接收任务的处理结果,并消费,无返回结果
	public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
        return uniAcceptStage(null, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
        return uniAcceptStage(asyncPool, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,
                                                   Executor executor) {
        return uniAcceptStage(screenExecutor(executor), action);
    }

thenRun(Runnable runnable): 任务A执行完执行B, 并且B不需要A的结果

thenAccept(Consumer action): 任务A执行完执行B, B需要A的结果, 但是任务B无返回值

thenApply(Function fn): 任务A执行完执行B, B需要A的结果, 同时任务B有返回值

    public static void main(String[] args) throws Exception {
        System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {
        }).join());
        System.out.println(CompletableFuture.supplyAsync(() -> "resultB").thenAccept(System.out::println).join());
        System.out.println(CompletableFuture.supplyAsync(() -> "resultC").thenApply(r -> "fn\t" + r).join());
    }

线程池说明

  1. 如果没有传入自定义线程池,则默认使用ForkJoinPool线程池.
  2. 传入了一个自定义线程池, 如果执行第一个任务的时候传入了自定义线程池, 调用thenRun方法执行第二个任务时, 第二个任务和第一个任务是共用同一个线程池.
  3. 传入了一个自定义线程池, 如果执行第一个任务的时候传入了自定义线程池, 调用thenRunAsync方法执行第二个任务时, 如果没有传入线程池, 则第一个任务使用自定义线程池, 第二个任务使用ForkJoinPool线程池.
  4. 有可能处理的太宽, 直接使用main线程处理
    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName());
            return 1;
        }).thenRun(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.MILLISECONDS.sleep(20);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(2);
        }).thenRun(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.MILLISECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(3);
        }).thenRun(() -> {
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.MILLISECONDS.sleep(30);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(4);
        });
        completableFuture.join();
    }

对计算速度进行选用

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> playA = CompletableFuture.supplyAsync(() -> {
            System.out.println("playA");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "playA";
        });
        CompletableFuture<String> playB = CompletableFuture.supplyAsync(() -> {
            System.out.println("playB");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "playB";
        });

        CompletableFuture<String> future = playA.applyToEither(playB, r -> r + " is winner");
        System.out.println(future.join());
    }

计算结果进行合并

两个任务都完成后, 最终能把两个任务的结果一起交给thenCombine来处理, 先完成的先等着, 等待其他分支任务完成.

    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        });
        CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 2;
        });
        CompletableFuture<Integer> future = future1.thenCombine(future2, Integer::sum);
        System.out.println(future.join());
    }

链式调用写法

    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 2;
        }), Integer::sum);
        System.out.println(future.join());
    }

三.多线程锁

1.synchronized锁

通过反编译字节码, 可知道 monitor enter 和monitor exit

管程(Monitors) 是一种程序结构, 结构内的多个子程序(对象或模块)形成的多个线程互斥访问共享资源.

在HotSpot虚拟机中, monitor使用ObjectMonitor实现, 每个对象天生都带着一个对象监视器, 每一个被锁住的对象都会和monitor关联起来

ObjectMonitor几个关键属性:

_owner: 指向持有ObjectMonitor对象的线程

_WaitSet: 存放处于wait状态的线程队列

_EntryList: 存放处于等待锁block状态的线程队列

_recursions: 锁的重入次数

_count: 用来记录线程获取锁的次数

2.公平锁和非公平锁

公平锁: 多个线程按照申请锁的顺序来获取锁, Lock lock = new ReentrantLock(true);

非公平锁: 多个线程获取锁的顺序不是按照申请锁的顺序.有可能后申请的线程比先申请的线程优先获取锁, 在高并发环境下, 有可能造成优先级翻转或者锁饥饿的状态(某个线程一直获不到锁).

synchronized和ReentrantLock默认都是非公平锁.

为什么使用非公平锁?

  1. 恢复挂起的线程到真正锁的获取还是有时间差的, 非公平锁更能充分利用CPU的时间片,尽量减少CPU的空闲状态时间
  2. 线程切换的开销, 刚释放锁的线程再次获取锁的概率很大, 减少了线程的开销.

3.可重入锁

又名递归锁, 指在同一个线程在外层方法获取锁的时候, 再进入该线程的内层方法会自动获取锁(前提: 锁对象是同一个对象), 不会因为之前已经获取过而没有释放而阻塞.

synchronized和ReentrantLock默认都是可重入锁, 可避免死锁.

实现原理:

  1. 每个锁对象都拥有一个锁计数器和一个指向持有该锁的线程的指针
  2. 当执行monitor enter时, 如果目标锁对象的计数器为零, 那说明锁对象没有被其他线程所持有, Java虚拟机会将该锁对象的持有线程设置为当前线程, 并且将计数器加1.
  3. 在目标锁对象的计数器不为零的情况下, 如果锁对象的持有线程是当前线程, 那么Java虚拟机可以将其计数器加1, 否则需要等待, 直到持有锁的线程释放锁.
  4. 当执行monitor exit时, Java虚拟机需将锁对象的计数器减1, 计数器为零代表锁已释放.

4.死锁和排查

死锁是指两个或两个以上线程在执行过程中, 因争夺锁资源而造成的一种互相等待的现象.

死锁原因: 系统资源不足, 进程运行推进的顺序不合适, 资源分配不当.

排查方法:

  1. 使用 jps -l 查询进程id, jstack 进程id 可查看死锁信息
  2. 使用jconsole图形化界面

四.LockSupport和线程中断

1.线程中断

一个线程不应该由其他线程来强制中断或停止, 而是应该由线程自己自行停止.

Java中没有方法立即停止一条线程, 提供了一种用于停止线程的协商机制--中断, 也即中断标识协商机制.

中断只是一种协作协商机制, Java没有给中断增加任何语法, 中断的过程完全需要程序员自己实现.

若要中断一个线程, 需要手动调用该线程的interrupt方法, 该方法也仅仅是将线程对象的中断标识设置为true.每个线程对象都有一个中断标识位, 用于表示线程是否被中断, 为true表示中断, 为false表示未中断.

Thread类中的三个方法:

  1. void interrupt() 中断此线程, 仅仅是设置线程的中断状态为true, 不会立刻停止线程.中断处于阻塞状态(wait, sleep, join)的线程, 当前线程的中断状态被清除, 立即退出阻塞状态,变为可运行状态, 并抛出异常InterruptedException.不活动的线程不会产生任何影响.
  2. static boolean interrupted() 测试当前线程是否已被中断, 判断线程是否被中断并清除当前中断状态.返回当前线程的中断状态, 将当前线程的中断状态清零并重置为false, 清除线程的中断状态.
  3. boolean isInterrupted() 测试线程是否已经被中断

实现线程中断的方法:

  1. 使用volatile关键字(可见性)来实现

        static volatile boolean isStop = false;
        public static void main(String[] args) throws Exception {
            Thread t1 = new Thread(() -> {
                while (true) {
                    if (isStop) {
                        System.out.println("t1 ----- is interrupted.");
                        break;
                    }
                    System.out.println("t1 ----- is running.");
                }
            }, "t1");
            t1.start();
            TimeUnit.SECONDS.sleep(2);
            new Thread(() -> {
                isStop = true;
            }).start();
        }
    
  2. 使用AtomicBoolean实现

        static AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        public static void main(String[] args) throws Exception {
            Thread t1 = new Thread(() -> {
                while (true) {
                    if (atomicBoolean.get()) {
                        System.out.println("t1 ----- is interrupted.");
                        break;
                    }
                    System.out.println("t1 ----- is running.");
                }
            }, "t1");
            t1.start();
            TimeUnit.SECONDS.sleep(1);
            new Thread(() -> {
                atomicBoolean.set(true);
            }).start();
        }
    
  3. 使用interrupt实现

        public static void main(String[] args) throws Exception {
            Thread t1 = new Thread(() -> {
                while (true) {
                    if (Thread.currentThread().isInterrupted()) {
                        System.out.println("t1 ----- is interrupted.");
                        break;
                    }
                    System.out.println("t1 ----- is running.");
                }
            }, "t1");
            t1.start();
            TimeUnit.SECONDS.sleep(1);
    //        new Thread(() -> {
    //            t1.interrupt();
    //        }).start();
            t1.interrupt();
        }
    

中断阻塞线程问题示例:

    public static void main(String[] args) throws Exception {
        Thread t1 = new Thread(() -> {
            while (true) {
                if (Thread.currentThread().isInterrupted()) {
                    System.out.println("t1 ----- is interrupted.");
                    break;
                }
                System.out.println("before sleep thread interrupt? " + Thread.currentThread().isInterrupted());
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    // 调用中断方法使sleep状态的线程退出阻塞状态,并清除当前中断状态,此时中断标识位为false,所以还是会运行
                    System.out.println("break sleep thread interrupt? " + Thread.currentThread().isInterrupted());
                    // 需再次调用中断方法,将中断标识为设置为true才能退出循环
                    Thread.currentThread().interrupt();
                    e.printStackTrace();
                }
                System.out.println("t1 ----- is running.");
            }
        }, "t1");
        t1.start();
        TimeUnit.SECONDS.sleep(1);
        new Thread(() -> {
            t1.interrupt();
        }).start();
    }

静态方法interrupted问题示例:

// 使用实例方法: isInterrupted
    public static void main(String[] args) {
        System.out.println("before " + Thread.currentThread().getName() + " interrupt");
        System.out.println(Thread.currentThread().getName() + "\t" + Thread.currentThread().isInterrupted());
        System.out.println(Thread.currentThread().getName() + "\t" + Thread.interrupted());
        Thread.currentThread().interrupt();
        System.out.println("after " + Thread.currentThread().getName() + " interrupt");
        System.out.println(Thread.currentThread().getName() + "\t" + Thread.currentThread().isInterrupted());
        System.out.println(Thread.currentThread().getName() + "\t" + Thread.currentThread().isInterrupted());
    }
// 运行结果
before main interrupt
main	false
main	false
after main interrupt
main	true
main	true
    
// 使用静态方法: interrupted
    public static void main(String[] args) {
        System.out.println("before " + Thread.currentThread().getName() + " interrupt");
        System.out.println(Thread.currentThread().getName() + "\t" + Thread.currentThread().isInterrupted());
        System.out.println(Thread.currentThread().getName() + "\t" + Thread.interrupted());
        Thread.currentThread().interrupt();
        System.out.println("after " + Thread.currentThread().getName() + " interrupt");
        System.out.println(Thread.currentThread().getName() + "\t" + Thread.interrupted());
        System.out.println(Thread.currentThread().getName() + "\t" + Thread.currentThread().isInterrupted());
        System.out.println(Thread.currentThread().getName() + "\t" + Thread.interrupted());
    }
// 运行结果
before main interrupt
main	false
main	false
after main interrupt
main	true
main	false
main	false
    
// 因为Thread.interrupted()方法多做了一个操作, 先返回原来的中断状态,然后将中断标识重置.

2.LockSupport

LockSupport是用来创建锁和其他同步类的基本线程阻塞原语.一种新的线程等待唤醒机制.

LockSupport是一个线程阻塞工具类, 所有方法都是静态方法, 可以让线程在任意位置阻塞(park方法), 阻塞之后有对应的唤醒方法(unpark方法).底层调用的是Unsafe中的native代码.

LockSupport提供park()和unpark()方法实现阻塞线程和解除线程阻塞的过程.

LockSupport和每个使用它的线程都有一个许可(permit)关联, permit最多只有一个, 重复调用unpark也不会累积凭证.

线程阻塞需要消耗凭证(permit), 凭证最多只有1个.

当调用park方法时, 如果有凭证, 则直接消耗凭证然后正常退出; 如果没有凭证, 则就必须阻塞等待凭证可用.

当调用unpark方法时, 会给对应的线程增加一个凭证, 但凭证最多只能有一个.

    public static void main(String[] args) throws Exception {
        Thread t1 = new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("t1 ----- come in." + System.currentTimeMillis());
            LockSupport.park();
            System.out.println("t1 ----- 被唤醒." + System.currentTimeMillis());
//            LockSupport.park();
        }, "t1");
        t1.start();
//        TimeUnit.SECONDS.sleep(2);
        System.out.println(Thread.currentThread().getName() + "\t通知线程t1");
        // 提前调用unpark方法,park方法前后时间一致
        LockSupport.unpark(t1);
        // 调用两次unpark方法,如果t1调用两次park方法,仍然会阻塞,因为最多只有一个凭证
//        LockSupport.unpark(t1);
    }

对比synchronized, wait, notify: wait和notify方法必须在同步块或者方法中调用, 且必须成对出现使用, 必须先wait然后notify.

        Object object = new Object();
        Thread t1 = new Thread(() -> {
            synchronized (object) {
                System.out.println("t1 ----- come in.");
                try {
                    // 如果不在synchronized块中调用会抛出IllegalMonitorStateException异常
                    object.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("t1 ----- 被唤醒.");
            }
        }, "t1");
        t1.start();
        TimeUnit.SECONDS.sleep(2);
        new Thread(() -> {
            synchronized (object) {
                System.out.println("t2 ----- 通知t1");
                object.notify();
            }
        }, "t2").start();

对比Lock, Condition, await, signal: Condition中的await和signal方法, 需要先获取锁(lock.lock), 必须先await然后signal.

        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        Thread t1 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("t1 ----- come in.");
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("t1 ----- 被唤醒.");
            } finally {
                lock.unlock();
            }
        }, "t1");
        t1.start();
        TimeUnit.SECONDS.sleep(2);
        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("t2 ----- 通知t1");
                condition.signal();
            } finally {
                lock.unlock();
            }
        }, "t2").start();

lock: 在锁上等待, 直到获取锁.

tryLock: 立即返回, 获得锁返回true,没获得锁返回false.

lockInterruptibly: 在锁上等待, 直到获取锁, 但是会响应中断, 这个方法优先考虑响应中断, 而不是响应锁的普通获取或重入获取.

分别对应无限制锁, 定时锁(通过while可以作为轮询锁), 可中断锁.

五.JMM和Volatile

1.JMM

Java内存模型(Java Memory Model)简称JMM, 用来屏蔽掉各种硬件和操作系统的内存访问差异, 以实现Java程序在各种平台下都能达到一致内存访问效果.

JMM是一种抽象的概念,并不真实存在,仅仅描述的是一组约定或规范, 通过这组规范定义了程序中(尤其是多线程)访问各个变量的读写访问方式并决定一个线程对共享变量的何时写入以及如何变成对另一个线程可见, 关键技术点都是围绕多线程的原子性, 可见性和有序性展开的.

三大特性:

  1. 原子性: 一个操作不可打断的, 即多线程环境下, 操作不能被其他线程干扰.

  2. 可见性: 指当一个线程修改了某个共享变量的值, 其他线程能否立即知道该变更. JMM规定所有的变量都存储在主内存中.每个线程都有自己的工作内存, 工作内容保存了该线程用到的变量的主内存副本拷贝, 线程对变量的操作都在自己的工作内存中进行.

  3. 有序性: 对于一个线程的执行代码而言, 我们习惯疑问代码的执行是从上到下, 但为了提升性能, 编译器和处理器通常会对指令序列进行重新排序. 只要程序的最终结果和它顺序化执行的结果相等,那么指令的执行顺序可以与代码属性不一致, 此过程叫指令重排序.

    优点: JVM能根据处理器特性适当的对机器指令进行重排序,使机器指令更符合CPU的执行特性, 最大限度的发挥机器性能.

    缺点: 指令重排可保证串行语义一致, 但没有义务保证多线程间的语义也一致.

    源码到最终执行: 源代码->编译器优化的重排->指令并行的重排->内存系统的重排->最终执行的指令.

    处理器在进行重排序时必须考虑指令之间的数据依赖性.

多线程对变量的读写过程:

  1. 定义的所有共享变量都存储在物理主内存中
  2. 每个线程都有自己独立的工作内存, 里面保存该线程使用到的变量的副本.
  3. 线程对共享变量的操作都必须先在线程自己的工作内存中进行后写回主内存,不能直接从主内存中读写
  4. 不同线程之间也无法直接访问其他线程的工作内存中的变量,线程间变量值的传递需要通过主内存来进行

多线程的先行发生原则:

JMM中如果一个操作执行的结果需要对另一个操作可见或者代码重排序, 那么两个操作之间必须存在先行发生(happens-before)原则.本质是一种可见性.

总原则:

如果一个操作先行发生于另一个操作, 那么第一个操作的执行结果将对第二个操作可见, 而且第一个操作的执行顺序排在第二个操作之前.

两个操作之间存在先行发生关系, 并不意味着一定要按照先行发生原则制定顺序来执行, 如果重排序之后的执行结果与按照先行发生原则来执行的结果一致, 那么这种重排序并不非法.

8条原则:

  1. 次序规则: 一个线程内, 按照代码顺序, 写在前面的操作先行发生于写在后面的操作
  2. 锁定规则: 一个unLock操作先行发生于时间后对同一个锁的lock操作. A获取锁后释放锁后, B才能获取锁.
  3. volatile变量规则: 对一个volatile变量的写操作先行发生于后面对这个变量的读操作.前面的写对后面的读是可见的.
  4. 传递规则: 如果操作A先行发生于操作B, 而操作B又先行发生于操作C, 那可以得出操作A先行发生于操作C.
  5. 线程启动规则: Thread对象的start方法优先发生于此线程的每一个操作
  6. 线程中断规则: 对线程interrupt发放的调用先行发生于被中断线程代码检测到中断事件的发生.先设置中断标志位, 才能检测到中断发送.
  7. 线程终止规则: 线程的所有操作都先行发生于对此线程的终止.
  8. 对象终结规则: 一个对象的初始化完成先行发生于它的finalize(垃圾回收时调用的)方法的开始.

2.volatile

被volatile修饰的变量有2大特点: 可见性, 有序性.由内存屏障保证其语义.

代码编译为字节码时, 对于volatile变量中的flags添加了一个ACC_VOLATILE, 然后虚拟机执行代码时在相应的位置插入内存屏障. (使用javap -c class文件 的命令可查看)

内存语义:

  1. 当写一个volatile变量时, JMM会把该线程对应的本地内存中的共享变量值立即刷新回主内存中
  2. 当读一个volatile变量时, JMM会把该线程对应的本地内存变量设置为无效, 重新回到主内存中读取最新的共享变量
  3. 写内存语义是直接刷新到主内存, 读内存语义是直接从主内存中读取.

内存屏障:

内存屏障(内存栅栏, 屏障指令)是一类同步屏障指令, 是CPU或编译器在对内存随机访问的操作中的一个同步点, 使得此点之前的所有读写操作都执行后才可以开始执行此点之后的操作, 避免指令重排序.是一种JVM指令, JMM的重排规则会要求Java编译器在生成JVM指令时插入指定的内存屏障指令,通过这些内存屏障指令, volatile实现了Java内存模型中的可见性和有序性(禁止重排), 但volatile无法保证原子性.

写屏障(Store Memory Barrier): 将存储在缓存中的数据同步到主内存中.在写指令之后插入写屏障, 强制把写缓冲区的数据刷回到主内存中.

读屏障(Load Memory Barrier): 读取主内存中最新的数据. 在读指令之前插入该屏障, 让工作内存或CPU中的高速缓存中的缓存数据失效, 重新回到主内存中获取最新数据.

屏障类型指令示例说明
LoadLoadLoad1;LoadLoad;Load2保证load1读取操作在load2及后续读取操作之前执行
StoreStoreStore1;StoreStore;Store2在store2及其后的写操作执行前,保证store1的写操作已刷新到主内存
LoadStoreLoad1;LoadStore;Store2在store2及其后的写操作执行前,保证load1的读操作已读取结束
StoreLoadStore1;StoreLoad;Load2保证store1的写操作已刷新到主内存之后,在load2及其后的读取操作才能执行

先行发生之volatile变量规则:

第一个操作第二个操作:普通读写第二个操作: volatile读第二个操作: volatile写
普通读写可以重排可以重排不可以重排
volatile读不可以重排不可以重排不可以重排
volatile写可以重排不可以重排不可以重排

当第一个操作是volatile读时, 不论第二个操作是什么, 都不能重排序, 这个操作保证了volatile读之后的操作不会被重排到volatile读之前.

当第二个操作是volatile读时, 不论第一个操作是什么, 都不能重排序, 这个操作保证了volatile写之前的操作不会被重排到volatile写之后.

当第一个操作为volatile写时, 第二个操作为volatile读时, 不能重排.

JMM内存屏障插入策略规则:

  1. 在每一个volatile读操作的后面插入一个LoadLoad屏障, 禁止处理器把上面的volatile读与下面的普通读重排序
  2. 在每一个volatile读操作的后面插入一个LoadStore屏障, 禁止处理器把上面的volatile读与下面的普通写重排序
  3. 在每个volatile写操作之前插入一个StoreStore屏障, 可以保证volatile写之前, 其前面所有的普通写操作已刷新到主内存中.
  4. 在每个volatile写操作之后插入一个StoreLoad屏障, 作用是避免volatile写与后面可能有volatile读/写操作重排序.

无原子性:

对于volatile变量具备可见性, JVM只是保证从主内存加载到线程工作内存的值是最新的, 在一个线程A把volatile变量进行修改后(操作完成, 写入主内存中), 而在另一个线程B中也操作完成, 但是还没写入, 此时线程B读取了最新的volatile变量的值, 作废原存在线程B的工作内存中的volatile变量, 然后线程B将最新的volatile变量的值写入主内存中, 这样线程B丢失了一次操作.

volatile变量不适合参与到依赖当前值的运算.适合: 运算结果并不依赖变量的当前值, 或者能够确保只有单一的线程修改变量的值, 变量不需要与其他状态变量共同参与不变约束的场景.单一赋值可以, 复合运算不行.

禁止重排序情况:

名称代码说明
写后读a = 1; b = a;写一个变量后, 再读这个变量
写后写a = 1; a = 2;写一个变量后, 再写这个变量
读后写a = b; b = 1;读一个变量后, 再写这个变量

DCL双端锁:

由于重排序, 多线程环境下对象的可能未完成初始化就被其他线程读取.

创建对象字节码包含4部分:

  1. new 新建一个实例
  2. 复制这个实例的引用
  3. 通过该引用调用构造方法
  4. 引用用来进行赋值操作putstatic

JIT即时编译器进行指令重排, 将4排到3的前面,那么线程t1还未完全将构造方法执行完毕, 如果在构造方法中要执行很多初始化, 线程t2操作拿到的instance类变量是非空的.但是一个未初始化完毕的单例, 那么就会出现问题,解决办法: 就是对instance使用volatile修饰

    public static class Singleton {
        private Singleton() {
        }

        private static volatile Singleton instance = null;

        public static Singleton getInstance() {
            if (instance == null) {
                synchronized (Singleton.class) {
                    if (instance == null) {
                        instance = new Singleton();
                    }
                }
            }
            return instance;
        }
    }

总结:

volatile写之前的操作, 都禁止重排序到volatile之后; volatile读之后的操作, 都禁止重排序到volatile之前, volatile写之后volatile读, 禁止重排序.

六.CAS和原子类

1.CAS

Compare and swap 的缩写, 比较并交换.包含三个操作数, 内存位置, 预期原值, 更新值.只需CAS操作时, 将内存位置的当时值和预期原值比较, 如果相匹配, 那么处理器会自动将该位置值更新为更新值.如果不匹配,则不做任何操作.多个线程执行同一个CAS操作只有一个会成功.

CAS是JDK提供的非阻塞原子性操作, 通过硬件保证了比较-更新的原子性.CAS是一条CPU的原子指令(cmpxchg指令).Unsafe提供的CAS方法底层实现为CPU指令cmpxchg.CAS原子性是CPU实现独占的,比synchronized重量级锁时间短很多, 在多线程情况下性能会比较好.CAS是一条CPU并发原语.通过它实现了原子操作, 原语的执行必须是连续的, 在执行过程中不允许被中断,不会造成数据不一致的问题.

Unsafe是CAS的核心类, 基于该类(native方法)可直接操作特定内存的数据.存在于sun.misc包中,其内部方法操作可以像C的指针一样操作内存.直接调用操作系统底层资源执行相应的任务.

    // 变量使用volatile修饰保证多线程之间的内存可见性
	private volatile int value;
	public final boolean compareAndSet(int expect, int update) {
        // valueOffset是变量值在内存的偏移地址
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
            // 这里如果CAS操作不成功,则再次重试,成为自旋SpinLock
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

        return var5;
    }

AtomicReference, 原子引用示例:

    public static void main(String[] args) {
        AtomicReference<User> reference = new AtomicReference<>();
        User u1 = new User();
        u1.setName("张三");
        User u2 = new User();
        u2.setName("李四");
        reference.set(u1);
        System.out.println(reference.compareAndSet(u1, u2) + "\t" + reference.get());
    }

    @Data
    @ToString
    public static class User {
        private String name;
    }

自旋锁: SpinLock

CAS是实现自旋锁的基础, 采用循环的方式去尝试获取锁, 当线程发现锁被占用时,会不断循环获取锁的状态,直到获取.好处是减少线程上下文切换的消耗, 缺点是循环会消耗CPU.

    public static void main(String[] args) {
        SpinLock lock = new SpinLock();
        new Thread(() -> {
            lock.lock();
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            lock.unlock();
        }, "t1").start();

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(() -> {
            lock.lock();
            System.out.println(Thread.currentThread().getName() + "\t获取到锁");
            lock.unlock();
        }, "t2").start();
    }

    public static class SpinLock {

        private AtomicReference<Thread> threadReference = new AtomicReference<>();

        public void lock() {
            Thread thread = Thread.currentThread();
            System.out.println(thread.getName() + "\t" + "进行锁定");
            while (!threadReference.compareAndSet(null, thread)) {

            }
        }

        public void unlock() {
            Thread thread = Thread.currentThread();
            System.out.println(thread.getName() + "\t" + "进行解锁");
            threadReference.compareAndSet(thread, null);
        }
    }

CAS两大缺点

  1. 循环时间长, CPU开销大, 如果CAS失败, 会一直进行尝试, 如果CAS长时间一直不成功, 可能会给CPU带来很大的开销.
  2. ABA问题: CAS算法在提前内存中的某时刻数据在当下时刻比较并替换, 这个时间差会导致数据的变化.一个线程1从内存位置V读取值A, 一个线程2也从内存位置V读取值A, 线程2执行一些操作将值A变为值B, 然后线程2又将内存位置V的数据变为值A, 线程1进行CAS操作, 发现内存位置V中还是值A, 预期OK, 然后线程1操作成功.
        AtomicInteger atomicInteger = new AtomicInteger(100);
        new Thread(() -> {
            // 当前线程atomicInteger中的值
            System.out.println(Thread.currentThread().getName() + "\t" + atomicInteger.get());
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 比较并交换为101
            System.out.println(atomicInteger.compareAndSet(100, 101) + "\t" + atomicInteger.get());
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 改为原来的值100
            System.out.println(atomicInteger.compareAndSet(101, 100) + "\t" + atomicInteger.get());

        }, "t1").start();

        new Thread(() -> {
            // 当前线程atomicInteger中的值
            System.out.println(Thread.currentThread().getName() + "\t" + atomicInteger.get());
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(atomicInteger.compareAndSet(100, 2000) + "\t" + atomicInteger.get());
        }, "t2").start();

ABA问题的解决 使用AtomicStampedReference带有流水版本号的原子引用进行解决.

        User u1 = new User();
        u1.setName("张三");
        AtomicStampedReference<User> stampedReference = new AtomicStampedReference<>(u1, 0);
        User u2 = new User();
        u2.setName("李四");
        boolean flag1 = stampedReference.compareAndSet(u1, u2, 0, 1);
        System.out.println(flag1 + "\t" + stampedReference.getReference() + "\t" + stampedReference.getStamp());
        boolean flag2 = stampedReference.compareAndSet(u2, u1, 1, 2);
        System.out.println(flag2 + "\t" + stampedReference.getReference() + "\t" + stampedReference.getStamp());

2.原子操作类

分类
基本类型原子类型AtomicInteger, AtomicBoolean, AtomicLong
数组类型原子类AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray
引用类型原子类AtomicReference, AtomicStampedReference, AtomicMarkableReference
对象的属性修改原子类AtomicIntegerFieldUpdater, AtomicLongFieldUpdater, AtomicReferenceFieldUpdater
原子操作增强类DoubleAccumulator, DoubleAdder, LongAccumulator, LongAdder

AtomicStampedReference: 携带版本号的引用类型原子类, 可解决ABA问题, 解决修改过几次的问题.

AtomicMarkableReference: 原子更新带有标记位的引用类型原子类, 不可解决ABA问题, 将状态戳简化为true, false.

        AtomicMarkableReference<Integer> markReference = new AtomicMarkableReference<>(100, false);
        new Thread(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println(threadName + "\t" + markReference.getReference() + "\t" + markReference.isMarked());
            try {
                TimeUnit.MILLISECONDS.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            boolean flag = markReference.compareAndSet(100, 123, false, true);
            System.out.println(threadName + "\t" + flag + "\t" + markReference.getReference() + "\t" + markReference.isMarked());
        }, "t1").start();

        new Thread(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println(threadName + "\t" + markReference.getReference() + "\t" + markReference.isMarked());
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            boolean flag = markReference.compareAndSet(100, 2233, false, true);
            System.out.println(threadName + "\t" + flag + "\t" + markReference.getReference() + "\t" + markReference.isMarked());
        }, "t2").start();

对象的属性修改原子类示例:

以一种线程安全的方式操作非线程安全对象内的某些字段.

  1. 字段必须是public volatile修饰
  2. 使用静态方法newUpdater创建更新器,并传入类对象和更新的字段名称
    public static void main(String[] args) throws Exception {
        int threadNumbers = 100;
        CountDownLatch latch = new CountDownLatch(threadNumbers);
        BankAccount bankAccount = new BankAccount();

        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                try {
                    for (int j = 0; j < 10000; j++) {
                        bankAccount.transfer();
                    }
                } finally {
                    latch.countDown();
                }
            }).start();
        }
        latch.await();
        System.out.println("money: " + bankAccount.money);
    }

    public static class BankAccount {

        public volatile int money = 0;

        public void add() {
            money++;
        }

        private AtomicIntegerFieldUpdater<BankAccount> updater = AtomicIntegerFieldUpdater
                .newUpdater(BankAccount.class, "money");

        public void transfer() {
            updater.incrementAndGet(this);
        }
    }

LongAdder

特点: 当多个线程更新用于收集统计信息但不用于细粒度同步控制的目的的公众和时, LongAdder此类通常优于AtomicLong, 在低并发下, AtomicLong和LongAdder有相似的特征,但在高并发的情况下LongAdder的吞吐量明显更高, 但代价是空间消耗更高.

其中的sum方法, 在没有并发更新的情况下, sum会返回一个精确值,存在并发的情况下, sum不保证能实时返回精确值, 但最终会返回正确的计算结果.

    public static class Click {
        long count = 0;

        // 使用synchronized作为并发控制
        public synchronized void add() {
            count++;
        }

        AtomicLong atomicLong = new AtomicLong(0);

        // 使用AtomicLong原子类作为并发控制
        public void addByAtomicLong() {
            atomicLong.getAndIncrement();
        }

        LongAdder longAdder = new LongAdder();

        // 使用LongAdder原子增强类作为并发控制
        public void addByLongAdder() {
            longAdder.increment();
        }

        LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0);

        // 使用LongAccumulator原子增强类作为并发控制
        public void addByLongAccumulator() {
            longAccumulator.accumulate(1);
        }

    }


    public static void main(String[] args) throws Exception {
        Click click1 = new Click();
        long costTime1 = costThreadAddTime(() -> click1.add());
        printResult("synchronized", costTime1, click1.count);

        Click click2 = new Click();
        long costTime2 = costThreadAddTime(() -> click2.addByAtomicLong());
        printResult("AtomicLong", costTime2, click2.atomicLong.get());

        Click click3 = new Click();
        long costTime3 = costThreadAddTime(() -> click3.addByLongAdder());
        printResult("LongAdder", costTime3, click3.longAdder.sum());

        Click click4 = new Click();
        long costTime4 = costThreadAddTime(() -> click4.addByLongAccumulator());
        printResult("LongAccumulator", costTime4, click4.longAccumulator.get());
    }

    private static void printResult(String name, long costTime, long result) {
        System.out.println("使用" + name + "花费时间:" + costTime + "毫秒,结果:" + result);
    }

    private static long costThreadAddTime(Runnable task) throws Exception {
        int threadNumber = 500;
        int _100w = 100 * 10000;
        long startTime = System.currentTimeMillis();
        CountDownLatch latch = new CountDownLatch(threadNumber);
        for (int i = 0; i < threadNumber; i++) {
            new Thread(() -> {
                try {
                    for (int j = 0; j < _100w; j++) {
                        task.run();
                    }
                } finally {
                    latch.countDown();
                }
            }).start();
        }
        latch.await();
        long endTime = System.currentTimeMillis();
        return (endTime - startTime);
    }

LongAdder是Striped64的子类, Striped64中的字段:

	// cell内部类
	@sun.misc.Contended static final class Cell {
	}
	// CPU内核数量, cells数组的最大长度
    static final int NCPU = Runtime.getRuntime().availableProcessors();

	// cell数组, 长度为2的幂, 方便以后位运算
    transient volatile Cell[] cells;

	// 基础value值, 当并发较低时, 只累加该值, 用于没有竞争的情况, 通过CAS更新
    transient volatile long base;
	
	// 创建或扩容cell数组时使用的自旋锁变量
    transient volatile int cellsBusy;

base: 类似于AtomicLong中全局的value值, 在没有竞争情况下数据直接累加到base上, 或者cells创建或扩容时, 没有抢到自旋锁的线程也将数据累积到base上, 作为保底.

collide: 表示扩容意向, false一定不会扩容, true可能会扩容.
    
cellsBusy: 初始化cells数组或扩容cells数组时需要获取锁, 0表示无锁状态, 1表示其他线程已经持有了锁.
 
casCellsBusy(): 通过CAS操作修改cellsBusy的值, 修改成功代表获取锁, 返回true
    
NCPU: 当前计算机CPU核心数, cells数组扩容时会用到
    
getProbe(): 获取当前线程的hash值

advanceProbe(): 重置当前线程的hash值

LongAdder快的原理

解决了AtomicLong中value在高并发情况下CAS操作时的自旋时间长的问题.CAS操作只能一个线程进行, 在高并发情况下, 除了在做CAS操作的线程外, 其余线程都在自旋等待, 造成CPU消耗大.

LongAdder将AtomicLong中value的CAS操作造成的自旋压力分散到多个value中(cells数组), sum方法会将所有cells数组中的value和base累积作为返回值.

    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

base变量: 低并发, 直接累积到该变量上.

cells数组: 高并发, 累积进各个线程自己的槽cell[hash / (n-1)] 中.

源码解读

LongAdder在无竞争或低竞争的情况下, 跟AtomicLong一样, 对同一个base进行操作, 当出现高并发情况时则采用化整为零分散热点的做法,用空间换时间, 用一个cells数组, 将累加数据拆分到不同的槽中, 多个线程需要同时CAS操作时, 可以对线程id进行hash得到hash值, 再根据hash值映射到这个数组cells的某个下标, 再对该下标对应的值进行累加操作. 当所有操作完毕, 将数组cells所有值和base都加起来作为最终的结果.

// LongAdder.add() 方法源码
// =====下面是因为竞争, cell数组初始化的情况=====
// x是要累加的值
public void add(long x) {
    // C语言写法, 枚举所有用到的变量
    Cell[] as; long b, v; int m; Cell a;
    // 1.第一次进入, cells数组比为空, 则进行判断!casBase(b = base, b + x)
    // 2.casBase是一次对base变量的CAS操作, 更新值: base + x
    // 在低并发或没有并发时, CAS操作基本一直成功
    // 所以casBase(b = base, b + x)一直为true
    // 3.在高并发情况下, casBase操作会失败, 于是进入下面的判断
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        // uncontended为true表示有竞争
        boolean uncontended = true;
        // 由于第一次进入, 所以Cell[] as一定为null, 于是进入方法longAccumulate
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null ||
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    // h是当前线程的hash值
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current();
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) {
            // 此处代码省略......
        }
        // 4.此时会进入此条件中,进行初始化cells数组中
        // cellsBusy是初始化锁时的自旋锁条件, 0表示无锁状态, 1表示其他线程已经持有了锁
        // 通过casCellsBusy方法将cellsBusy变为1
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
            // 标识是否已经初始化
            boolean init = false;
            try {
                // 双重检查,避免多个线程同时创建多个Cell[]数组
                if (cells == as) {
                    // 初始化是创建长度为2的数组
                    Cell[] rs = new Cell[2];
                    // 通过hash 和 (cells数组长度 - 1) 进行与运算
                    // 获取当前线程对应的槽位, 并初始化位当前累积的值x
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    // 标识初始化成功
                    init = true;
                }
            } finally {
                // 自旋锁解锁
                cellsBusy = 0;
            }
            // 如果初始化成功,则跳出循环
            if (init)
                break;
        }
        // 兜底方法, 防止因为另一个线程因为cells数组的创建而丢失此次累加x的值
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;
    }
}

// =====上面是因为竞争, cell数组初始化的情况=====


// =====下面是因为竞争, cell数组创建之后的情况=====
public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    // cells数组已经创建, 不为空, 直接进入下一个判断
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        // 标识为无竞争
        boolean uncontended = true;
        // as == null 判断为false, (m = as.length - 1) < 0 主要是给m赋值并判断
        if (as == null || (m = as.length - 1) < 0 ||
            // (a = as[getProbe() & m]) 别的线程可能没有找到对应的槽位, 此次下标为0
            // 因为上次只是初始化了cells数组中的一个cell对象, 比如下标为1的cell对象
            // 然后进入longAccumulate方法
            (a = as[getProbe() & m]) == null ||
            // (a = as[getProbe() & m]) 线程找到了槽位,比如上次初始化使用的线程id值
            // hash和(数组长度-1)的值为下标0
            // 对cell中的value进行CAS操作, 如果操作成功,则不进入longAccumulate方法
            // 如果CAS操作不成功,则uncontended为false, 表明竞争激烈
            // cell数组已经满足不了, 需要进行扩容
            !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}
// 此次进入longAccumulate方法的情况
// 1.(a = as[getProbe() & m]) 别的线程可能没有找到对应的槽位
// 2.竞争过于激烈, cell数组已经满足不了, 需要扩容
final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    // h是当前线程的hash值
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current();
        h = getProbe();
        wasUncontended = true;
    }
    // collide表示扩容意向, false一定不会扩容, true可能会扩容.
    boolean collide = false;
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        // 此次进入的情况, cell数组不为空, 且长度大于0
        if ((as = cells) != null && (n = as.length) > 0) {
            // 解决 别的线程可能没有找到对应的槽位 问题
            // 注意这里有个赋值操作: a = as[(n - 1) & h]
            // 重新计算线程hash后线程对应的槽位有可能不为空
            if ((a = as[(n - 1) & h]) == null) {
                // cellsBusy是初始化锁时的自旋锁条件
                // 0表示无锁状态, 1表示其他线程已经持有了锁
                if (cellsBusy == 0) {
                    // 创建对应槽位的cell对象
                    Cell r = new Cell(x);
                    // 双重检查和自旋锁,防止多线程创建的cell不同对象赋值给对应的槽位
                    if (cellsBusy == 0 && casCellsBusy()) {
                        boolean created = false;
                        try {
                            Cell[] rs; int m, j;
                            // (rs = cells) != null && (m = rs.length) > 0
                            // 上面两个判断主要为了赋值
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                // 再次检查对应槽位的对象为空
                                rs[j = (m - 1) & h] == null) {
                                // 将创建的cell对象赋值到对应的槽位
                                rs[j] = r;
                                // 标识已创建
                                created = true;
                            }
                        } finally {
                            // 清除自旋锁
                            cellsBusy = 0;
                        }
                        // 如果已经创建,则跳出循环
                        if (created)
                            break;
                        continue;
                    }
                }
                // 本次判断是创建槽位对应的cell对象,所以不进行扩容
                collide = false;
            }
            // CAS操作已经知道失败了,所以会重新计算hash: h = advanceProbe(h)
            // 下次循环时,再次进行CAS操作
            else if (!wasUncontended)
                wasUncontended = true;
            // wasUncontended为false,变为true后重新计算hash后再次CAS操作
            // 如果成功直接跳出循环,否则会判断是否扩容
            else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                         fn.applyAsLong(v, x))))
                break;
            // cell数组长度大于等于CPU核心数,或cells != as(表示已经扩容过了)
            // 不再进行扩容,重新计算线程id的hash值,找到对应的槽位
            else if (n >= NCPU || cells != as)
                collide = false;
            // 如果前面判断都不成立,则标识为扩容,下次循环时进行扩容
            // 注意此次重新计算了线程id对应的hash,方便在扩容后找到对应的槽位
            else if (!collide)
                collide = true;
            // 通过自旋锁进行扩容,注意这里和补充cell数组槽位因为自旋锁互斥
            // 防止在扩容操作和补充cell数组槽位操作造成数据不一致
            else if (cellsBusy == 0 && casCellsBusy()) {
                try {
                    // 再次检查,防止多个线程重复进行扩容
                    if (cells == as) {
                        // 新建Cell数组,长度为原数组的2倍
                        // 注意这里n值没有变,还是原数组的长度
                        Cell[] rs = new Cell[n << 1];
                        // 遍历原数组把原数据赋值给新的cell数组
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        // 把新创建的cell数组给到cells变量
                        cells = rs;
                    }
                } finally {
                    // 扩容结束,自旋锁消除
                    cellsBusy = 0;
                }
                // 标识为false不再扩容
                collide = false;
                // 重新循环,注意这里不会再次计算hash h = advanceProbe(h);
                // 尝试使用新数组来进行累加
                continue;
            }
            h = advanceProbe(h);
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
           // 初始化cell数组代码省略......
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break;
    }
}
// =====上面是因为竞争, cell数组创建之后的情况=====

七.ThreadLocal

ThreadLocal提供线程局部变量, 每一个线程在访问ThreadLocal时都有自己的, 独立初始化的副本, ThreadLocal实例通常是类中的私有静态实例.

让每个线程都有一份自己的数据, 解决了线程安全问题.

常用API

  1. get() 返回当前线程的局部变量副本的值
  2. protected initialValue() 返回当前线程的局部变量初始值, ThreadLocal的子类需实现该方法
  3. remove() 删除当前线程的局部变量值
  4. set() 设置当前线程的局部变量值
  5. static withInitial(Supplier supplier) 创建一个线程局部变量

ThreadLocal的创建方式

  1. 使用匿名内部类, 比较繁琐
  2. 使用static 的 withInitial 方法, Java8

必须回收自定义的ThreadLocal变量, 尤其在线程池场景下, 线程经常会复用, 如果不清理, 会影响后续业务逻辑和造成内存泄漏的问题, 使用try-finally块中调用remove()方法进行回收.

public class TestThreadLocal {

    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 5; i++) {
            Seller seller = new Seller();
            seller.name = "销售" + i;
            pool.execute(() -> {
                try {
                    System.out.println("线程:" + Thread.currentThread().getName() + ",当前卖出数量:" + seller.getSellCount());
                    seller.sellProduct();
                    // 这里理应每个销售只卖出1个产品,由于使用了线程池,线程被复用,使用了上次销售的数据,造成卖出产品数量不对
                    System.out.println(seller.name + "卖出了" + seller.getSellCount() + "件产品");
                } finally {
                    seller.remove();
                }
            });
        }
        pool.shutdown();
    }

    private static class Seller {

        private String name;

        private static ThreadLocal<Integer> sellCount = ThreadLocal.withInitial(() -> 0);

        public void sellProduct() {
            Integer count = sellCount.get();
            count += 1;
            sellCount.set(count);
        }

        public int getSellCount() {
            return sellCount.get();
        }

        public void remove() {
            sellCount.remove();
        }
    }
}

Thread, ThreadLocal, ThreadLocalMap之间的关系:

public class Thread implements Runnable {
    // 每个线程内部都有一个ThreadLocal.ThreadLocalMap变量
    ThreadLocal.ThreadLocalMap threadLocals = null;
}

public class ThreadLocal<T> {
    // ThreadLocal类中有ThreadLocalMap静态内部类
    static class ThreadLocalMap {
        // Entry键值对, 注意Entry继承了弱引用
        static class Entry extends WeakReference<ThreadLocal<?>> {
            
            Object value;
            // 注意Entry的k是ThreadLocal, 而v是存储的变量数据副本值
            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }
    }
}

为什么Entry是弱引用? 为什么要调用remove()方法?

内存泄漏: 不再被使用的对象或者变量占用的内存不能被回收.

强引用: Java中最常见的, 把一个对象赋给一个引用变量, 这个引用变量就是一个强引用, 对象被强引用变量引用时, 处于可达状态, 不可能被垃圾回收的, 即使对象以后永远不会用到, JVM也不会回收.

软引用: 当系统内存充足时不会被回收, 当内存不足时, 会被回收. 通常用于高速缓存中.

弱引用: 只要垃圾回收机制一运行, 都会回收弱引用的对象占用的内存.

虚引用: 如果对象仅持有虚引用, 和没有任何引用一样, 随时被垃圾回收, 虚引用必须和引用队列联合使用.跟踪对象被垃圾回收的状态, 仅仅是提供了一种确保对象被finalize以后, 做某些事情的通知机制.PhantomReference的get方法总是返回null.使用虚引用关联对象的唯一目的, 就是在对象垃圾回收的时候收到一个系统通知或后续添加进一步的处理, 用来实现比finalize机制更灵活的回收操作.

为什么Entry是弱引用?

当栈中的方法执行完成后, 栈帧销毁, 栈帧中对ThreadLocal对象的强引用也消耗了, 但此时线程的ThreadLocalMap里某个entry的key引用还指向ThreadLocal对象:

  1. 如果key是强引用, 就会导致key指向的ThreadLocal对象和v指向的对象不能被gc回收

  2. 如果key是弱引用, 就大概率会减少内存泄漏的问题, 因为弱引用只要垃圾回收机制一运行, 都会回收弱引用的对象占用的内存, 使ThreadLocal对象在方法执行完毕后顺利被回收且entry的key引用指向null.注意此时entry的value强引用对象还没回收

  3. 此后我们调用get, set或remove方法时, 就会尝试删除key为null的key, 可以释放value对象所占用的内存.调用的底层方法是expungeStaleEntry用于清除key为null时value对象占用的内存

            private int expungeStaleEntry(int staleSlot) {
                Entry[] tab = table;
                int len = tab.length;
    
                tab[staleSlot].value = null;
                tab[staleSlot] = null;
                size--;
    
                Entry e;
                int i;
                for (i = nextIndex(staleSlot, len);
                     (e = tab[i]) != null;
                     i = nextIndex(i, len)) {
                    ThreadLocal<?> k = e.get();
                    // 注意,如果k为null, 则把value也置为null
                    if (k == null) {
                        e.value = null;
                        tab[i] = null;
                        size--;
                    } else {
                        int h = k.threadLocalHashCode & (len - 1);
                        if (h != i) {
                            tab[i] = null;
                            while (tab[h] != null)
                                h = nextIndex(h, len);
                            tab[h] = e;
                        }
                    }
                }
                return i;
            }
    

八.对象内存布局和对象头

对象在堆内存中的布局: 在HotSpot虚拟机里, 对象在堆内存中的存储分布可以分为: 对象头(Header), 实例数据(Instance Data) 和对齐填充(Padding, 保证8个字节的倍数).

对象头包括: 对象标记Mark Word, 类元信息(类型指针 Class Pointer 指向该对象类元数据klass的首地址), 数组对象对象头中多了一个数组长度(length)信息. 64位系统中, 对象标记Mark Word占用8个字节, 类型指针占了8个字节, 总共16个字节(没有启用压缩的情况下).

对象标记

默认存储对象的hashCode(只有调用hashCode方法才生成), 分代年龄, 锁标志位等信息.

锁状态56bit1bit(未使用)4bit(对象分代年龄)1bit(是否偏向锁)2bit(锁标志位)
无锁25bit未使用, 31bit对象hashCode未使用对象分代年龄001
偏向锁54bit偏向锁的线程id,2bit Epoch偏向时间戳未使用对象分代年龄101
轻量级锁指向栈中锁的记录指针指向栈中锁的记录指针指向栈中锁的记录指针指向栈中锁的记录指针00
重量级锁指向重量级锁的指针指向重量级锁的指针指向重量级锁的指针指向重量级锁的指针10
GC标志11

实例数据

存放类的属性(Field)数据信息, 包括父类的属性信息.

对齐填充

虚拟机要求对象起始地址必须是8字节的整数倍, 填充数据不是必须存在的, 仅仅为了字节对齐.

一个对象大小

new Object() 在默认情况下占用16个字节, 8个字节对象头, 4个字节类型指针, 4个字节对齐填充.

默认开启压缩类型指针(-XX:+UseCompressedClassPointers)

java -XX:+PrintCommandLineFlags -version

-XX:InitialHeapSize=523889472 -XX:MaxHeapSize=8382231552 -XX:+PrintCommandLineFlags -XX:+UseCompressedClassPointers -XX:+UseCompressedOops -
XX:-UseLargePagesIndividualAllocation -XX:+UseParallelGC

九.Synchronized和锁升级

锁升级过程: 无锁-> 偏向锁-> 轻量级锁-> 重量级锁

偏向锁: Mark Word存储的是偏向的线程id

轻量级锁: Mark Word存储的是指向线程栈中Lock Record的指针

重量级锁: Mark Word存储的是指向堆中monitor对象的指针

偏向锁

当线程A竞争到锁, 通过CAS操作修改Mark Word中的偏向线程ID, 如果不存在其他线程竞争, 那么持有偏向锁的线程将永远不需要进行同步.

当一段同步代码一直被同一个线程多次访问, 由于只有一个线程, 那么线程在后续访问时便会自动获得锁.

实际应用运行过程中, 锁总是同一个线程持有, 很少发生竞争, 锁总是被第一个占用他的线程拥有, 这个线程就是锁的偏向线程.那只需要在锁第一次被拥有的时候, 记录下偏向线程id, 这样偏向线程一直持有锁,再次进入同步代码块时, 不需要再次加锁和释放锁, 而是检查锁的Mark Word中是不是放的自己的线程id.

如果相等, 表示偏向锁是偏向于当前线程的, 不需要再尝试获得锁, 直到竞争发生才释放锁.以后每次同步检查锁的偏向线程id是否和当前线程id一致, 如果一致直接进入同步块, 无需每次加锁解锁都去CAS更新对象头.

如果不等, 表示发生了竞争, 锁不是总是偏向同一个线程, 尝试使用CAS来替换Mark Word里面的线程ID为新线程的ID, 如果竞争成功, Mark Word里的线程id为新线程的id, 锁不会升级, 仍然为偏向锁; 如果竞争失败, 可能需要升级变为轻量级锁, 才能保证线程间公平竞争锁.

偏向锁只有遇到其他线程尝试竞争偏向锁时, 持有偏向锁的线程才会释放锁, 线程是不会主动释放偏向锁的.

偏向锁Java6之后默认开启, 但是启动时间有延迟.

偏向锁使用一种等到竞争出现才释放锁的机制, 只有当其他线程竞争锁时, 持有偏向锁的原来线程才会被撤销, 撤销需要等待全局安全点(该时间点上没有字节码正在执行), 同时检查持有偏向锁的线程是否还在执行:

  1. 第一个线程正在执行synchronized方法, 还没执行完, 其他线程来抢夺, 该偏向锁会被取消掉并出现锁升级, 此时轻量级锁有原持有偏向锁的线程持有, 继续执行同步代码, 而正在竞争的线程会进入自旋等待获得该轻量级锁.
  2. 第一个线程执行完成synchronized方法, 则将对象头设置成无锁状态并撤销偏向锁, 重新偏向.

Java15后废除偏向锁, 因为维护成本高.

轻量级锁

多线程竞争, 但是任意时刻最多只有一个线程竞争, 不存在锁竞争太过激烈的情况, 也就没有线程阻塞.

本质是自旋锁CAS

轻量级锁是为了在线程近乎交替执行同步块时提高性能, 通过CAS自旋减少重量级锁使用操作系统互斥量产生的性能消耗, 当关闭偏向锁或多线程竞争偏向锁会导致偏向锁升级为轻量级锁.

如果线程A已拿到锁, 线程B来竞争, 当前对象的锁偏向线程A, 线程B在竞争时发下对象头Mark Word中的线程id不上线程B自己的id, 线程B会CAS操作来获取锁:

  1. 获取锁成功, 直接替换Mark Word的线程id为线程B的id, 锁继续保持偏向状态.
  2. 获取锁失败, 偏向锁升级为轻量级锁, 轻量级锁由原持有偏向锁的线程A持有继续执行代码, 正在竞争的线程B进入自旋等待获取轻量级锁.

轻量级锁加锁: JVM会为每个线程在当前线程的栈帧中创建用于存储锁记录的空间(Displaced Mark Word).若一个线程获得锁时发现是轻量级锁, 会把锁的Mark Word复制到自己的Displaced Mark Word里, 然后尝试用CAS将锁的Mark Word替换为指向存储锁记录的指针, 如果成功, 当前线程获得锁. 如果失败, 表示Mark Word已经被替换成了其他线程的锁记录, 说明在与其他线程竞争锁, 当前线程尝试使用自旋来获取锁.

轻量级锁释放: 释放锁时, 当前线程会CAS操作将Displaced Mark Word的内容复制回锁的Mark Word里, 如果没有竞争, 那么这个复制的操作成功, 如果有其他线程因为自旋多次导致轻量级锁升级成重量级锁, 那么CAS操作回失败, 此时会释放锁, 并唤醒会阻塞的线程.

Java6之后自旋锁次数自适应, 线程如果自旋成功了, 那下次自旋的最大次数会增加, JVM认为上次成功了, 这次成功的概率很大. 反之, 如果很少会自旋成功, 那下次会减少自旋的次数甚至不自旋, 避免CPU空转.

重量级锁

重量级锁是基于进入和退出Monitor对象实现的, 在编译时会将同步块的开始位置插入monitor enter指令, 结束位置插入monitor exit指令.

当线程执行到monitor enter指令时, 会尝试获取对象所对应的monitor所有权, 如果获取到了, 会在monitor的owner中存放当前线程的id, 这样它处于锁定状态, 除非退出同步块, 否则其他线程无法获取到这个monitor.

HashCode

无锁状态下, Mark Word中可以存储对象的hash code值, 当对象的hashCode()方法第一次调用时,JVM会生成对应的hash code并将值存储到Mark Word中.

偏向锁时, 获得锁的线程会用线程id和epoch覆盖 Mark Word中hash code所在的位置. 如果一个对象的hashCode()方法被调用一次之后, 这个对象不能被设置偏向锁.

轻量级锁时, JVM会在当前线程的栈帧中创建一个锁记录(Lock Record)空间, 用于存储锁对象的Mark Word拷贝.释放锁后, 会将信息写回对象头.

重量级锁时, Mark Word保存的重量级锁指针, 代表重量级锁的ObjectMonitor类里有字段记录非加锁状态下的Mark Word信息, 释放锁后, 会将信息写回对象头.

当一个对象已经计算过hash code值时, 无法进入偏向锁状态, 直接升级轻量级锁.

偏向锁过程中, 遇到hash code计算请求, 立马撤销偏向模式, 膨胀为重量级锁.

锁的优缺点

优点缺点适用场景
偏向锁加锁和解锁无需额外的开销,和执行非同步方法相比仅存在纳秒级的差距如果线程减存在锁竞争,会代来额外的锁撤销的消耗适用于只有一个线程访问的同步代码块
轻量级锁竞争的线程不会阻塞, 提高了程序的响应速度如果始终得不到锁竞争的线程,使用自旋会消耗CPU追求响应时间,同步块执行速度非常快
重量级锁线程竞争不使用自旋, 不会消耗CPU空转线程阻塞, 响应时间缓慢追求吞吐量, 同步块执行速度较长

锁消除

JIT(Just In Time Compiler) 即时编译器会无视没有意义的加锁代码, 进行锁消除.

锁粗化

JIT(Just In Time Compiler) 即时编译器会根据情况把几个synchronized代码块合并, 避免反复的加锁解锁, 提升性能.

十.AQS

AbstractQueuedSynchronizer

用来实现锁或者其他同步器组件的公共基础部分的抽象实现, 是重量级基础框架及整个JUC体系的基石, 主要解决锁分配给谁的问题.

AQS使用一个volatile的int类型的成员变量来表示同步状态(state), 通过内置的FIFO双向队列来完成资源获取的排队工作, 每个要去抢占资源的线程封装成一个Node节点来实现锁的分配, 通过CAS完成对同步状态值的修改.

Node中volatile的int类型的成员变量表示等待状态waitStatus :

  1. 值为0(初始值), 当一个Node初始化时候的默认值
  2. CANCELLED 值为1, 表示线程获取锁的请求已经取消了
  3. CONDITION 值为-2, 表示节点在等待队列中, 节点线程等待唤醒
  4. PROPAGATE 值为-3, 当前线程处于SHARED(共享模式)情况下, 该字段才会使用
  5. SIGNAL 值为-1, 表示线程已经准备好了, 等待资源释放.

Node中有两种模式: SHARED: 线程以共享模式等待锁. EXCLUSIVE: 线程以独占的方式等待锁.

AQS源码分析:

以ReentrantLock中的非公平锁为例

// 通过ReentrantLock构造方法可知, 默认非公平锁, 可传入fair使用公平锁
public ReentrantLock() {
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
    // lock方法调用sync的lock方法
    sync.lock();
}
// 非公平锁的lock实现
final void lock() {
    // 非公平先用CAS操作修改state状态, 看是否能抢到锁
    if (compareAndSetState(0, 1))
        // 如果抢到,则设置独占线程exclusiveOwnerThread为当线程,当前线程方法执行
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 如果没有抢到,则进入acquire方法
        acquire(1);
}
// 公平锁的lock实现
final void lock() {
    acquire(1);
}
// AQS中设置独占线程exclusiveOwnerThread为当线程
protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}
// AQS中的acquire方法
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
// 先判断tryAcquire
// 这是公平锁的tryAcquire实现
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
// 非公平锁的tryAcquire实现
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 如果抢占成功,直接设置当前线程为独占线程exclusiveOwnerThread并运行线程代码
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
// 可以看出公平和非公平的tryAcquire实现方法的不同是
// 公平锁需判断是否有等待的处理线程hasQueuedPredecessors
// 而非公平锁直接CAS操作再次抢占
public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

// 如果线程竞争激烈的情况下, tryAcquire 方法返回false
// 然后进入acquireQueued中的嵌套方法addWaiter
private Node addWaiter(Node mode) {
    // 给当前线程封装为node节点
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 第一次进入,Node还没初始化,所以未null,进入enq方法
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    // 注意这里是循环
    for (;;) {
        // 第一次进入,tail为null
        Node t = tail;
        if (t == null) {
            // 初始化队列
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 第二次循环tail不再为空,把新节点的前节点指向尾节点tail
            node.prev = t;
            // 再通过cas操作将尾节点替换为新节点
            if (compareAndSetTail(t, node)) {
                // 再将新尾节点(已是新节点)的后节点指向旧尾节点
                t.next = node;
                return t;
            }
        }
    }
}
// 然后进入acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // node.predecessor() 节点的上一个节点
            final Node p = node.predecessor();
            // 当前节点的上个节点是头节点,但tryAcquire又竞争失败
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                // 假如tryAcquire竞争成功,证明下次运行的就是当前线程
                // 然后当前线程进行运行代码
                p.next = null;
                failed = false;
                return interrupted;
            }
            // 进行状态的判断,第一次进入shouldParkAfterFailedAcquire返回false
            // 此时pred.waitStatus被改为-1
            // 第二次循环,进入shouldParkAfterFailedAcquire返回true
            // 然后进入parkAndCheckInterrupt方法,当前线程阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
// 前一个节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前一个节点是头节点,刚初始化,此时pred.waitStatus为0
    int ws = pred.waitStatus;
    // 
    if (ws == Node.SIGNAL)
       
        return true;
    if (ws > 0) {
        
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
       // 将pred.waitStatus为状态改为-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
// 使用LockSupport.park方法让当前线程阻塞
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

十一.读写锁

ReentrantReadWriteLock

读写锁定义为: 一个资源能够被多个读线程访问, 或被一个写线程访问, 但是不能同时存在读写线程.允许读读共存, 读写互斥, 写写互斥, 在读多写少的情况下, 读写锁有较高的性能体现.

public class TestReadWriteLock {

    public static void main(String[] args) {
        Account account = new Account();

        for (int i = 0; i < 5; i++) {
            final String d = i + "";
            new Thread(() -> {
                account.read(d);
            }, "THREAD-READ-" + i).start();
        }

        for (int i = 0; i < 5; i++) {
            final String d = i + "";
            final String v = i + 1 + "";
            new Thread(() -> {
                account.write(d, v);
            }, "THREAD-WRITE-" + i).start();
        }
    }

    private static class Account {

        Map<String, String> map = new HashMap<>();

        ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

        public void write(String key, String value) {
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            lock.readLock().lock();
            try {
                System.out.println(Thread.currentThread().getName() + "正在写入: " + key + "--" + value);
                map.put(key, value);
            } finally {
                lock.readLock().unlock();
            }
        }

        public void read(String key) {
            try {
                TimeUnit.MILLISECONDS.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            lock.writeLock().lock();
            try {
                String value = map.get(key);
                System.out.println(Thread.currentThread().getName() + "读取: " + key + "--" + value);
            } finally {
                lock.writeLock().unlock();
            }
        }
    }
}

锁降级, 写锁可降级为读锁(保证写入的数据给读线程可见). 读锁升级为写锁是不可能的.

StampedLock

邮戳锁, 票据锁 对 ReentrantReadWriteLock 读写锁的优化, 也是读写锁.

锁饥饿问题: 一般情况下读线程比写线程多, 读锁存在时间长, 造成写线程一直获取不到写锁.

ReentrantReadWriteLock 读锁占用时, 其他线程尝试获取写锁的时候会被阻塞. 而StampedLock则不会被阻塞, 对读锁的优化, 获取乐观读锁后, 需要对结果进行校验.

StampedLock特点:

  1. 所有获取锁的方法, 都返回一个邮戳stamp, stamp为0表示获取失败, 其余都表示成功.
  2. 所有释放锁的方法, 都需要一个邮戳stamp, 这个stamp必须是和成功获取锁时得到的stamp一致
  3. StampedLock是不可重入的, 悲观读和写锁都不支持Condition
  4. 使用StampedLock一定不要调用中断操作, 即interrupt()方法
  5. 有三种访问模式: 读模式悲观, 写模式, 乐观读模式

读模式悲观: 功能和ReentrantReadWriteLock 的读锁类似

写模式: 功能和ReentrantReadWriteLock 的写锁类似

public class TestStampedLock {
    public static void main(String[] args) {
        Data data = new Data();
        new Thread(() -> {
            data.write();
        }, "write").start();
        new Thread(() -> {
            data.read();
        }, "read").start();
    }

    private static class Data {
        int number = 23;
        StampedLock lock = new StampedLock();

        public void read() {
            for (int i = 0; i < 5; i++) {
                long stamp = lock.readLock();
                try {
                    System.out.println(i + "进入开始读取");
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    System.out.println(i + "读取完成:" + number);
                } finally {
                    lock.unlockRead(stamp);
                }
            }
        }

        public void write() {
            for (int i = 0; i < 5; i++) {
                long stamp = lock.writeLock();
                try {
                    System.out.println(i + "进入开始写入");
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    number += 7;
                    System.out.println(i + "写入完成:" + number);
                } finally {
                    lock.unlockWrite(stamp);
                }
            }
        }
    }
}

乐观读模式: 无锁机制, 类似于数据库中的乐观锁, 支持读写并发, 很乐观认为读取时没人修改,假如被修改再实现升级为悲观读模式.

public class TestStampedLock {
    public static void main(String[] args) {
        Data data = new Data();
        new Thread(() -> {
            data.readOptimistic();
        }, "read").start();
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(() -> {
            data.write();
        }, "write").start();
    }

    private static class Data {
        int number = 23;
        StampedLock lock = new StampedLock();

        public void readOptimistic() {
            long stamp = 0;
            for (int i = 0; i < 5; i++) {
                stamp = lock.tryOptimisticRead();
                System.out.println(i + "进入开始读取\t验证stamp:" + lock.validate(stamp));
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(i + "读取完成\t验证stamp:" + lock.validate(stamp));
            }
            if (!lock.validate(stamp)) {
                stamp = lock.readLock();
                try {
                    System.out.println("变为悲观读,读取完成:" + number);
                } finally {
                    lock.unlockRead(stamp);
                }
            }
        }

        public void read() {
            for (int i = 0; i < 5; i++) {
                long stamp = lock.readLock();
                try {
                    System.out.println(i + "进入开始读取");
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    System.out.println(i + "读取完成:" + number);
                } finally {
                    lock.unlockRead(stamp);
                }
            }
        }

        public void write() {
            for (int i = 0; i < 5; i++) {
                long stamp = lock.writeLock();
                try {
                    System.out.println(i + "进入开始写入");
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    number += 7;
                    System.out.println(i + "写入完成:" + number);
                } finally {
                    lock.unlockWrite(stamp);
                }
            }
        }
    }
}
程序员内功
码出好代码
  • 作者:lzlg520
  • 发表时间:2022-05-17 03:33
  • 版权声明:自由转载-非商用-非衍生-保持署名
  • 公众号转载:请在文末添加作者公众号二维码