从零开始的高并发之路(二)

  1. 多线程安全
    1. 同步关键字Synchronized
    2. 对象锁
    3. this锁
    4. 类锁
    5. 总结
  2. 线程间通信
    1. 线程等待与线程通知

多线程安全

同步关键字Synchronized

  同步代码块可以解决多线程时,操作统一个类变量时的线程安全问题。他可以用来修饰方法和代码块 ,他有类锁和对象锁,一个类可以有多个对象,他们直接的锁是互不打扰的,而一个类只有一个类锁。我们看下面两个例子。第一个例子在对i进行自加时,多个线程同时获得一个数据,导致数据出现问题,我们看第二个,通过对自加操作进行加锁,这样,其他线程在要对i进行自加时,就要等正在做自加的线程完成后获得锁才能再进行自加。这样,数据就不会出现错乱的问题了。

public class ThreadSync {
    static int i;

    public static void main(String[] args) {
        for (int j = 0; j < 10; j++) {
        Thread thread = new Thread(() -> {
            for (int k = 0; k < 100; k++) {
                add();
                System.out.println(i + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread.start();
         }
    }

    static void add() {
        //System.out.println("加之前"+i);
           ++i;
       // System.out.println("加之后"+i);
    }
}
结果: 我们看到结果输出了多个15  多个16 这样的 
1Thread-0
3Thread-2
2Thread-3
4Thread-1
5Thread-4
6Thread-5
7Thread-6
8Thread-7
9Thread-9
10Thread-8
11Thread-0
12Thread-5
16Thread-7
16Thread-1
15Thread-4
15Thread-9
15Thread-3
13Thread-2
......
public class ThreadSync {
    static Object object = new Object();
    static int i;

    public static void main(String[] args) {
        for (int j = 0; j < 10; j++) {
        Thread thread = new Thread(() -> {
            for (int k = 0; k < 100; k++) {
                add();
                System.out.println(i + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread.start();
         }
    }

    static void add() {
        //System.out.println("加之前"+i);
        synchronized (object) {
           ++i;
        }
       // System.out.println("加之后"+i);
    }
}
结果: 可以看到虽然不是按照顺序输出,但是,他不会出现重复的两个数字了
1Thread-0
3Thread-2
2Thread-1
4Thread-3
5Thread-4
6Thread-9
7Thread-6
8Thread-7
9Thread-8
10Thread-5
12Thread-0
17Thread-4
19Thread-2
18Thread-9
18Thread-3
15Thread-7
14Thread-1
13Thread-8
......

  造成这样的原因是在多个线程同时对i进行加操作,第一个线程先把i加载进了工作内存,自加操作完成还未返回时,第二个线程又把i加载进了自己的工作内存,此时,两个线程中的i都是相同的,所有,他们自加是对同一个值进行的自加。

对象锁

  对比下面两段代码,可以清晰看出 ,对象锁仅仅对同一个对象生效,第一个是在线程1输出之后在输出线程2 ,而第二个则是两个线程同时输出。他们消耗的时间几乎相同。

// 使用同一个对象作为监视器
public class ThreadSynchronizedObjLock {
    private final Object object= new Object();
    public static void main(String[] args) {
        ThreadSynchronizedObjLock objlock = new ThreadSynchronizedObjLock();
        Thread t1 = new Thread(()->{
            try {
                while (true)
                {objlock.A();Thread.sleep(500);}
            } catch (InterruptedException e) {e.printStackTrace();}
        });
        Thread t2 = new Thread(()->{
            try {
                while (true)
                {objlock.B();Thread.sleep(500);}
            } catch (InterruptedException e) {e.printStackTrace();}
        });
        t1.start();t2.start();
    }
     void A() throws InterruptedException {
        synchronized (object){
        long starttm = System.currentTimeMillis();
        Thread.sleep(2000);
        Optional.of(""+Thread.currentThread().getName()+"耗时:"+(System.currentTimeMillis()-starttm)).ifPresent(System.out::println);}
    }
     void B() throws InterruptedException {
         synchronized (object) {
             long starttm = System.currentTimeMillis();
             Thread.sleep(2000);
             Optional.of("" + Thread.currentThread().getName() + "耗时:" + (System.currentTimeMillis() - starttm)).ifPresent(System.out::println);
         }
    }
}
public class ThreadSynchronizedObjLock {
    private final Object object= new Object();
    public static void main(String[] args) {
        Thread t1 = new Thread(()->{
            try {
                while (true)
                {new ThreadSynchronizedObjLock().A();Thread.sleep(500);}
            } catch (InterruptedException e) {e.printStackTrace();}
        });
        Thread t2 = new Thread(()->{
            try {
                while (true)
                {new ThreadSynchronizedObjLock().B();Thread.sleep(500);}
            } catch (InterruptedException e) {e.printStackTrace();}
        });
        t1.start();t2.start();
    }
     void A() throws InterruptedException {
        synchronized (object){
        long starttm = System.currentTimeMillis();
        Thread.sleep(2000);
        Optional.of(""+Thread.currentThread().getName()+"耗时:"+(System.currentTimeMillis()-starttm)).ifPresent(System.out::println);}
    }
     void B() throws InterruptedException {
         synchronized (object) {
             long starttm = System.currentTimeMillis();
             Thread.sleep(2000);
             Optional.of("" + Thread.currentThread().getName() + "耗时:" + (System.currentTimeMillis() - starttm)).ifPresent(System.out::println);
         }
    }
}
//两个结果几乎同时输出,说明锁住的不是同一个对象 两个锁之间相互毫无关系

this锁

this锁是一种特殊的对象锁 它指的是用当前实例来加锁。

public class ThreadSynchronizedThisLock {
    private static int i = 0;
    public static void main(String[] args) {
        ThreadSynchronizedThisLock lock = new ThreadSynchronizedThisLock();
        new Thread(() -> {
            while (true) {
                lock.add();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(() -> {
            while (true) {
                lock.dec();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }


    void add() {
        synchronized (this) {
            long startme = System.currentTimeMillis();
            i++;
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Optional.of("i: "+i+ "当前线程为"+ Thread.currentThread().getName()+"耗时: "+ (System.currentTimeMillis()-startme)).ifPresent(System.out::println);
        }
    }

    void dec() {
        synchronized (this) {
            long startme = System.currentTimeMillis();
            i--;
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Optional.of("i: "+i+ "当前线程为"+ Thread.currentThread().getName()+"耗时: "+ (System.currentTimeMillis()-startme)).ifPresent(System.out::println);
        }
    }
}

// 当add运行时 dec被阻塞 dec运行时 add被阻塞

直接加在(实例方法)非静态方法前面是表示的是对当前对象加锁,和this代码块意义相同

类锁

对比下面两个例子 无论是在对象本类中调用还是在对象中调用的方式,类锁都会对add 和dec的代码块进行加锁。

public class ThreadSynchronizedClassLock {
    private final  Object object = new Object();
    private static int i = 0;

    public static void main(String[] args) {
        new Thread(() -> {
            while (true) {
                add();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(() -> {
            while (true) {
                dec();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
    static void add() {//一般会锁要操作的共享变量的类
        synchronized (ThreadSynchronizedThisLock.class) {
            long startme = System.currentTimeMillis();
            i++;
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Optional.of("i: "+i+ "当前线程为"+ Thread.currentThread().getName()+"耗时: "+ (System.currentTimeMillis()-startme)).ifPresent(System.out::println);
        }
    }
    static void dec() {
        synchronized (ThreadSynchronizedThisLock.class) {
            long startme = System.currentTimeMillis();
            i--;
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Optional.of("i: "+i+ "当前线程为"+ Thread.currentThread().getName()+"耗时: "+ (System.currentTimeMillis()-startme)).ifPresent(System.out::println);
        }
    }
}
public class ThreadSynchronizedClassLock {
    private final  Object object = new Object();
    private static int i = 0;

    public static void main(String[] args) {
        new Thread(() -> {
            while (true) {
                new ThreadSynchronizedClassLock().add();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(() -> {
            while (true) {
                new ThreadSynchronizedClassLock().dec();
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
     void add() {//一般会锁要操作的共享变量的类
        synchronized (ThreadSynchronizedThisLock.class) {
            long startme = System.currentTimeMillis();
            i++;
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Optional.of("i: "+i+ "当前线程为"+ Thread.currentThread().getName()+"耗时: "+ (System.currentTimeMillis()-startme)).ifPresent(System.out::println);
        }
    }
     void dec() {
        synchronized (ThreadSynchronizedThisLock.class) {
            long startme = System.currentTimeMillis();
            i--;
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Optional.of("i: "+i+ "当前线程为"+ Thread.currentThread().getName()+"耗时: "+ (System.currentTimeMillis()-startme)).ifPresent(System.out::println);
        }
    }
}

***NOTE:**加载静态方法上的同步关键字也表示类锁

总结

synchronized 会在代码块前后添加 monitorenter,monitorexit 指令 线程执行到enter获得锁,exit后就会释放锁。 在使用对象锁和this锁时,他只对持有相同对象的代码块进行阻塞,在使用类锁时, 会对所有获取该类锁的代码块进行阻塞。

线程间通信

线程等待与线程通知

  • sleep和wait的区别
      sleep: 1.sleep是Thread的方法。2.sleep可以在任意位置使用。3.sleep只会让出cpu,不会释放资源锁,在时间结束后重新开始执行。
      wait: 1.wait是object的方法。 2. wait只能在同步块中使用。3.wait方法会使当前线程让出同步资源的锁。并放入一个等待队列中 ,使其他资源可以获得该锁,被通知后重新获得锁。

我们看下面两个例子,第一个例子中,我们先让一个线程进入等待状态,这时他让出obj锁,第二个线程获得了该锁 wait后 他也释放的该锁,第三个线程开始执行,会通知等待队列的第一个正在等待的线程重新开始获得锁。第二个例子,首先5个线程启动后,都进入的等待状态,当我们调用notifyall后,会通知所有的等待队列所有的线程开始进行锁的竞争。

  • wait set

waitset.png

1.所有对象都会有一个wait set,用于存放调用了该对象wait方法之后进入block状态的线程,2.线程被notify之后,不一定会立即执行,线程会进入就绪状态准备抢锁。


  • notify
public class ThreadNotify {
    private static final Object obj = new Object();
    public static void main(String[] args) throws InterruptedException {

        for(int i=0;i<2;i++) {
            Thread thread1 = new Thread(() ->
            {
                mywait();
            });
            thread1.start();
        }
        Thread.sleep(5000);
        Thread thread2 = new Thread(() ->
        {
            mynotify();
        });
        thread2.start();
    }
    static void mywait() {
        synchronized (obj) {
            try {
                Optional.of("我要开始等待了。。。。--"+Thread.currentThread().getName()).ifPresent(System.out::println);
                obj.wait();
                Optional.of("我被通知不等了。。。。"+Thread.currentThread().getName()).ifPresent(System.out::println);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    static void mynotify() {
        synchronized (obj) {
            obj.notify();
            Optional.of("我告诉自己不等了。。。。").ifPresent(System.out::println);
        }
    }
}
  • notifyall
public class ThreadNotify {
    public static final Object obj = new Object();
    public static void main(String[] args) throws InterruptedException {

        for(int i=0 ;i<5 ;i++) {
            Thread thread1 = new Thread(() ->
            {
               new ThreadNotify().mywait();
            });
            thread1.start();
        }
        Thread.sleep(5000);
        Thread thread2 = new Thread(() ->
        {
            new ThreadNotify().mynotify();
        });
        thread2.start();
    }

     void mywait() {
        synchronized (obj) {
            try {
                Optional.of("我要开始等待了。。。。--"+Thread.currentThread().getName()).ifPresent(System.out::println);
                obj.wait();
                Optional.of("我被通知不等了。。。。"+Thread.currentThread().getName()).ifPresent(System.out::println);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
     void mynotify() {
        synchronized (obj) {
            obj.notifyAll();
            Optional.of("我告诉所有人别等了。。。。").ifPresent(System.out::println);
        }
    }
}