java入门篇13 -- 多线程
多线程是java并发的基础,我们先来学习一下吧:
首先,让我们来起一个多线程,看看
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- // lambda 写法
- Thread t = new Thread(() -> {
- System.out.println("thread start");
- try {
- Thread.sleep(100); // 模拟IO操作,让线程等100毫秒
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- System.out.println("thread end");
- }
- });
- t.start();
- System.out.println("main end");
- }//thread start
- // main end
- // thread end mianend与thread start 打印顺序并非一定的,这个是并发,不一定谁会先执行
- }
线程一般会存在几个状态,New 新建的线程对象,Runnable 正在运行中,Block 被阻塞,Waitting 等待中, Timeed Waittding 被sleep计时等待, Terminated 执行完毕
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- // lambda 写法
- Thread t = new Thread(() -> {
- System.out.println("thread start");
- try {
- Thread.sleep(100); // 模拟IO操作,让线程等100毫秒
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- System.out.println("thread end");
- }
- });
- System.out.println(t.getState()); // NEW 未执行线程
- t.start();
- System.out.println(t.getState()); // RUNNABLE 正在运行的线程
- System.out.println("main end");
- Thread.sleep(10);
- System.out.println(t.getState()); // TIMED_WAITING 被sleep阻塞住的线程
- Thread.sleep(100);
- System.out.println(t.getState()); // TERMINATED 线程退出
- }
- }
在学习python的时候我们知道一个线程可以等待另外一个线程,那java中也是一样的,都使用join
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- // lambda 写法
- Thread t = new Thread(() -> {
- System.out.println("thread start");
- try {
- Thread.sleep(100); // 模拟IO操作,让线程等100毫秒
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- System.out.println("thread end");
- }
- });
- t.start();
- t.join(); // 等待 t 结束再往下走
- System.out.println("main end");
- } // thread start
- // thread end
- // main end 这个main end会最终执行
- }
注意join中也是可以传参数的,传入的int值,意思是等待多少毫秒
进程如何中断呢,有两种方法
方法一
- class MyThread extends Thread{
- @Override
- public void run(){
- // 判断进程是否被终端
- while(! isInterrupted()){
- System.out.println("i am alive");
- }
- System.out.println("end");
- }
- }
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- // lambda 写法
- Thread t = new MyThread();
- t.start();
- Thread.sleep(1000);
- t.interrupt(); // 中断进程
- System.out.println("main end");
- }
- }
还有一种是设置标识位
当我们使用public时,线程间是可以访问变量的,java虚拟机是将变量保存在主内存上,当某一个线程访问变量时,会复制一份走,然后保存在自己的工作空间,如果发生改写,虚拟机会在某个时间之后将修改后的变量值改写主内存,但是这个时间是不确定的,因此我们需要使用volatile来声明这个变量,这样就会做到下述两点:
- 每次访问变量,都会去主内存中取复制一份
- 每次修改变量,会立即写会主内存
- class MyThread extends Thread {
- public volatile boolean isExit = false;
- @Override
- public void run() {
- // 判断标志位
- while (!this.isExit) {
- System.out.println("i am alive");
- }
- System.out.println("end");
- }
- }
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- // lambda 写法
- MyThread t = new MyThread();
- t.start();
- Thread.sleep(1);
- t.isExit = true; // 中断进程
- System.out.println("main end");
- }
- }
我们在使用的如果不设置volatile中断时间感觉也很快,这个是因为JVM的回写主内存非常快,所以不要爆侥幸信息,一定要记得声明volatile
接下来看一下守护线程,守护线程就是当所有非守护线程结束时,他也会结束,我记得在pyton中又叫做傀儡线程,设置守护需要在启动线程之前。
- class MyThread extends Thread {
- @Override
- public void run() {
- // 判断进程是否被终端
- try {
- Thread.sleep(1000000);
- // 当线程被推出时 会抛出 InterruptedException 这个错误
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("end");
- }
- }
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- // lambda 写法
- MyThread t = new MyThread();
- t.setDaemon(true);
- t.start();
- Thread.sleep(10);
- System.out.println("main end");
- }
- }
t这个线程是主线程的守护线程,会在主线程退出时退出。
说了这个,那么多线程对于修改数据,是不是安全的呢?我们来看一个例子
- class Num{
- public static int num = 0;
- }
- class AddThread extends Thread {
- @Override
- public void run() {
- int i = 0;
- for(;i<100000;i++){
- Num.num += 1;
- }
- System.out.println("AddThread end");
- }
- }
- class DecThread extends Thread{
- @Override
- public void run(){
- int i = 0;
- for(;i<100000;i++){
- Num.num -=1;
- }
- System.out.println("DecThread end");
- }
- }
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- // lambda 写法
- Thread t = new AddThread();
- Thread t1 = new DecThread();
- System.out.println(Num.num); // 0
- t.start();
- t1.start();
- t.join();
- t1.join();
- System.out.println(Num.num); // -803
- }
- }
这个结果并非我们预期的0,因此需要咋办,这个是因为所有的线程调度都是由操作系统做的,如果add正在主内存中取值就被挂起,然后dec去主内存中取值并作减法,然后退回给主系统,之后add才调用,这个时候就会造成数据混乱,那么怎么办呢?加锁,也就是说,某个线程在操作数据时,另外一个线程是不能去碰这个数据
- class Num {
- public static int num = 0;
- public static final Object lock = new Object();
- }
- class AddThread extends Thread {
- @Override
- public void run() {
- int i = 0;
- for (; i < 100000; i++) {
- // synchronized 这个就锁住了Num.lock
- synchronized (Num.lock) {
- Num.num += 1;
- }// 释放锁
- }
- System.out.println("AddThread end");
- }
- }
- class DecThread extends Thread {
- @Override
- public void run() {
- int i = 0;
- for (; i < 100000; i++) {
- synchronized (Num.lock) {
- Num.num -= 1;
- }
- }
- System.out.println("DecThread end");
- }
- }
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- // lambda 写法
- Thread t = new AddThread();
- Thread t1 = new DecThread();
- System.out.println(Num.num); // 0
- t.start();
- t1.start();
- t.join();
- t1.join();
- System.out.println(Num.num); // 0
- }
- }
另外对于一些复制操作(基础类型除了long,double,还有引用类型)是原子操作,不需要加锁
我们来看一下如何进行加锁
- class Num {
- public static int num = 0;
- public static final Object lock = new Object();
- private int nn = 0;
- // 同步方法,这条语句相当于 synchronized(this){this.nn += 1;} 就是把整个实例都锁住了
- public synchronized void add() {
- this.nn += 1;
- }
- public synchronized void dec() {
- this.nn -= 1;
- }
- public void printNn() {
- System.out.println(this.nn);
- }
- }
- class AddThread extends Thread {
- private Num n = null;
- public AddThread(Num n) {
- super();
- this.n = n;
- }
- @Override
- public void run() {
- int i = 0;
- for (; i < 100000; i++) {
- // synchronized 这个就锁住了Num.lock
- synchronized (Num.lock) {
- Num.num += 1;
- }// 释放锁
- this.n.add();
- }
- System.out.println("AddThread end");
- }
- }
- class DecThread extends Thread {
- private Num n = null;
- public DecThread(Num n) {
- super();
- this.n = n;
- }
- @Override
- public void run() {
- int i = 0;
- for (; i < 100000; i++) {
- synchronized (Num.lock) {
- Num.num -= 1;
- }
- this.n.dec();
- }
- System.out.println("DecThread end");
- }
- }
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- Num n = new Num();
- // lambda 写法
- Thread t = new AddThread(n);
- Thread t1 = new DecThread(n);
- System.out.println(Num.num); // 0
- t.start();
- t1.start();
- t.join();
- t1.join();
- System.out.println(Num.num); // 0
- n.printNn(); // 0
- }
- }
java中的锁是可重入锁,什么意思,就是可以重复获取,获取一次锁计数+1,释放一次锁计数-1,最终释放完毕所有锁就归零
因此如果我们将锁住的信息定义为final标识符,那么他就不能重复获取锁,这样一旦锁的顺序不对,就会产生死锁,如下面的例子
- class Num {
- public static int num = 0;
- public static final Object lock = new Object();
- public static final Object lock2 = new Object();
- }
- class AddThread extends Thread {
- @Override
- public void run() {
- int i = 0;
- for (; i < 100000; i++) {
- // synchronized 这个就锁住了Num.lock
- synchronized (Num.lock2) {
- Num.num += 1;
- synchronized (Num.lock) {
- Num.num -= 1;
- }
- }// 释放锁
- }
- System.out.println("AddThread end");
- }
- }
- class DecThread extends Thread {
- @Override
- public void run() {
- int i = 0;
- for (; i < 100000; i++) {
- synchronized (Num.lock) {
- Num.num -= 1;
- synchronized (Num.lock2) {
- Num.num += 1;
- }
- }
- }
- System.out.println("DecThread end");
- }
- }
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- Thread t = new AddThread();
- Thread t1 = new DecThread();
- System.out.println(Num.num); // 0
- t.start();
- t1.start();
- t.join();
- t1.join();
- System.out.println(Num.num); // 0
- }
- }
t线程先锁住lock然后在去锁住lock2,t1线程先锁住lock2然后在去锁住lock,因为lock都是不可变的,有final标识符,这样t跟t1就产生了冲突,谁也无法获取对方的未释放的锁,产生了死锁,程序进入等待,因此一定要注意加锁的顺序
那么接下来我们需要思考对于队列来讲,如何进行多线程协同呢,也就是一些放任务,一些取任务,我们可能想到取任务时使用poll,但是我们要求必须取到任务,
- import java.util.ArrayList;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Queue;
- class Task {
- Queue<String> q = new LinkedList<>();
- public synchronized void add(String s) {
- q.add(s);
- }
- public synchronized String get() throws InterruptedException {
- while (q.isEmpty()) {
- System.out.println("wait me");
- }
- return q.remove();
- }
- }
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- Task t = new Task();
- List<Thread> tt = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- Thread ts = new Thread(() -> {
- while (true) {
- String s = null;
- try {
- s = t.get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(s);
- }
- });
- ts.start();
- tt.add(ts);
- }
- Thread add = new Thread() {
- @Override
- public void run() {
- while (true) {
- for (int i = 0; i < 100; i++) {
- t.add("task" + i);
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- };
- add.start();
- }
- }
我看上述代码,这个就是我们之前看的加锁,但是看输出结果确实无限的wait me,这是因为在获取任务时,因为为空就会陷入while的无限循环中,这个这个方法锁住的是this,这个实例,add也就无法进行添加任务,形成了死循环,那么这个就需要wait跟notifyall这两个方法,他们必须在synchorized中使用,wait就是释放当前锁,并休眠,notifyall就是通知所有休眠的进行起来抢锁并执行,上述代码修改为下面这样
- import java.util.ArrayList;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Queue;
- class Task {
- Queue<String> q = new LinkedList<>();
- public synchronized void add(String s) {
- q.add(s);
- this.notifyAll();
- }
- public synchronized String get() throws InterruptedException {
- while (q.isEmpty()) {
- this.wait();
- System.out.println("wait me");
- }
- return q.remove();
- }
- }
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- Task t = new Task();
- List<Thread> tt = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- Thread ts = new Thread(() -> {
- while (true) {
- String s = null;
- try {
- s = t.get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(s);
- }
- });
- ts.start();
- tt.add(ts);
- }
- Thread add = new Thread() {
- @Override
- public void run() {
- while (true) {
- for (int i = 0; i < 100; i++) {
- t.add("task" + i);
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- };
- add.start();
- }
- }
增加两行代码,就实现了我们的功能。
synchronized锁在锁比较多的情况下容易造成死锁,而且在想要获取锁的时候必须等待,没有任何尝试机制,接下来我们看一下另外一种锁的ReentrantLock
- import java.util.ArrayList;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Queue;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.Lock;
- import java.util.concurrent.locks.ReentrantLock;
- class Task {
- Queue<String> q = new LinkedList<>();
- private final Lock lock = new ReentrantLock(); // 实例化锁
- private final Condition condition = lock.newCondition(); // 通过该锁,实例化一个与之匹配的condition
- public void add(String s) throws InterruptedException {
- // 这个就是尝试获取锁,如果1s获取不到就不再等待,跳过这里面的代码块
- if (lock.tryLock(1, TimeUnit.SECONDS)) {
- try {
- q.add(s);
- condition.signalAll(); // 等同于notifyall
- } finally {
- lock.unlock();
- }
- }
- }
- public String get() throws InterruptedException {
- // 普通的锁,会等待
- lock.lock();
- try {
- while (q.isEmpty()) {
- condition.await(); // 等同于wait
- }
- return q.remove();
- } finally {
- lock.unlock();
- }
- }
- }
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- Task t = new Task();
- List<Thread> tt = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- Thread ts = new Thread(() -> {
- while (true) {
- String s = null;
- try {
- s = t.get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(s);
- }
- });
- ts.start();
- tt.add(ts);
- }
- Thread add = new Thread() {
- @Override
- public void run() {
- while (true) {
- for (int i = 0; i < 100; i++) {
- try {
- t.add("task" + i);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- };
- add.start();
- }
- }
加下来看一下读写锁,上面的例子其实所有的读操作也会被锁,但对于读操作来讲,不影响我们的数据,我们希望在进行写的时候锁住,不写的时候,读没有限制,因此可以使用读写锁
- import java.util.*;
- import java.util.concurrent.locks.*;
- class Task {
- Queue<String> q = new LinkedList<>();
- private final ReadWriteLock lock = new ReentrantReadWriteLock(); // 实例化读写锁
- private final Lock rlock = lock.readLock(); // 实例化读锁
- private final Lock wlock = lock.writeLock(); // 实例化写锁
- private int[] n = new int[100];
- public void add(int i) throws InterruptedException {
- // 写入锁
- wlock.lock();
- try {
- n[i] += 1;
- } finally {
- wlock.unlock();
- }
- }
- public int[] get() throws InterruptedException {
- // 读锁
- rlock.lock();
- try {
- return Arrays.copyOf(n, n.length);
- } finally {
- rlock.unlock();
- }
- }
- }
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- Task t = new Task();
- List<Thread> tt = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- Thread ts = new Thread(() -> {
- while (true) {
- int[] s = null;
- try {
- s = t.get();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(s);
- }
- });
- ts.start();
- tt.add(ts);
- }
- Thread add = new Thread() {
- @Override
- public void run() {
- while (true) {
- for (int i = 0; i < 100; i++) {
- try {
- t.add(i);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- };
- add.start();
- }
- }
上面的读写锁是一种悲观锁,比如说,如果换成队列,也就是说,写的时候,读锁必须全部释放完毕,否则无法进行下一步操作,可以使用之前队列的例子试试,下面看一下乐观锁
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.locks.*;
- class Task {
- private final StampedLock lock = new StampedLock(); // 实例化锁
- private Double s = 0.00;
- public void add(Double d) {
- // 写入锁
- long stamp = lock.writeLock();
- try {
- s += d;
- } finally {
- lock.unlockWrite(stamp);
- }
- }
- public Double get() {
- // 获取乐观锁
- long stamp = lock.tryOptimisticRead();
- Double d = s; // double 赋值非原子级操作
- System.out.println("du");
- // 检查当前所是否是最新的
- if (!lock.validate(stamp)) {
- System.out.println("not new");
- // 如果不是则获取悲观锁
- stamp = lock.readLock();
- try {
- d = s;
- } finally {
- lock.unlockRead(stamp);
- }
- }
- return d;
- }
- }
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- Task t = new Task();
- List<Thread> tt = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- Thread ts = new Thread(() -> {
- while (true) {
- Double s;
- s = t.get();
- System.out.println(s);
- }
- });
- ts.start();
- tt.add(ts);
- }
- Thread add = new Thread() {
- @Override
- public void run() {
- while (true) {
- for (int i = 0; i < 100; i++) {
- t.add(1.0000001);
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- };
- add.start();
- }
- }
说完线程,那接下来我们看一下线程池
- import java.util.ArrayList;
- import java.util.concurrent.*;
- class Task implements Callable<String> {
- public String call() throws Exception {
- Thread.sleep(10000);
- return "mmm";
- }
- }
- class Task1 implements Runnable {
- private int i;
- public Task1(int d) {
- this.i = d;
- }
- public void run() {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("end" + this.i);
- }
- }
- class MyThread extends Thread {
- private Future<String> f;
- public MyThread(Future<String> f) {
- super();
- this.f = f;
- }
- @Override
- public void run() {
- try {
- System.out.println(f.get());
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- }
- }
- }
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- ExecutorService es = Executors.newFixedThreadPool(4); // 固定大小的线程池
- // <T> Future<T> submit(Callable<T> task); 这个方法是往线程池中塞任务,看到接受的是Callable类型,返回的是Future类型,那就按这个借口来定义
- Future<String> f = es.submit(new Task()); // 获取一个未来产生的Future对象
- try {
- String m = f.get(1, TimeUnit.SECONDS); // 获取结果只等待指定时间,如果没有等到,会报出下面的错误
- System.out.println(m);
- } catch (Exception e) {
- System.out.println(e); // java.util.concurrent.TimeoutException
- } finally {
- System.out.println("not recv");
- }
- System.out.println(f.isDone()); // 判断是否完成
- System.out.println(f.get()); // 等待获取结果,会一直阻塞
- es.shutdown(); // 关闭线程池
- ExecutorService es1 = Executors.newCachedThreadPool(); // 动态变化的线程池
- var f1 = new ArrayList<Future<String>>();
- for (int i = 0; i < 10; i++) {
- Future<String> ff = es1.submit(new Task()); // 在这里尽量不要用定值的线程池,如果超出线程池大小,会报错
- f1.add(ff);
- } // 十个 mmm 同时打印
- for (Future<String> ffff : f1) {
- var mt = new MyThread(ffff);
- mt.start();
- mt.join();
- }
- Thread.sleep(10100); // 记得晚点关闭线程池
- es1.shutdown();
- ExecutorService es2 = Executors.newFixedThreadPool(2); // 固定大小的线程池
- for (int i = 0; i < 6; i++) {
- es2.submit(new Task1(i));
- } // 两个两个的打印
- Thread.sleep(10000);
- es2.shutdown();
- // 定时反复执行任务的线程池
- ScheduledExecutorService es3 = Executors.newScheduledThreadPool(4);
- es3.schedule(new Task1(1), 4, TimeUnit.SECONDS); // 4 s后执行这个任务
- es3.scheduleAtFixedRate(new Task1(2), 2, 1, TimeUnit.SECONDS); // 两秒后,每隔1秒执行一次任务
- es3.scheduleWithFixedDelay(new Task1(3), 2, 1, TimeUnit.SECONDS); // 两秒后,上一次任务结束,隔1秒执行一次任务
- Thread.sleep(10000);
- es3.shutdown();
- }
- }
使用feature总是得用get等待,或者轮询看看是否isDone,来看一下自动调用回掉对象
- import java.util.concurrent.*;
- public class HelloWorld {
- public static void main(String[] args) throws Exception {
- // 这个是串行
- CompletableFuture<String> f = CompletableFuture.supplyAsync(HelloWorld::call); // 第一个任务
- // 上面任务完成后执行这个任务,s是上一个的返回值
- CompletableFuture<String> f1 = f.thenApplyAsync((s) -> {
- System.out.println(s); // mmm
- return HelloWorld.call();
- });
- // f1成功之后打印
- f1.thenAccept((res) -> {
- System.out.println(res); // mmm
- });
- // f1失败打印
- f1.exceptionally(e -> {
- e.printStackTrace();
- return null;
- });
- Thread.sleep(4000);
- System.out.println("开始并行");
- // 接下来看一下并行的
- CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
- return call1(2);
- });
- CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
- return call1(3);
- });
- CompletableFuture<String> f4 = CompletableFuture.supplyAsync(() -> {
- return call1(4);
- });
- CompletableFuture<String> f5 = CompletableFuture.supplyAsync(() -> {
- return call1(5);
- });
- CompletableFuture<Object> ff = CompletableFuture.anyOf(f2, f3, f4, f5);
- f2.thenAccept(System.out::println);
- f3.thenAccept(System.out::println);
- f4.thenAccept(System.out::println);
- f5.thenAccept(System.out::println);
- ff.thenAccept(System.out::println); // 感觉默认的线程应该是3个,因为第四个总感觉执行的慢一点,ff就是谁的快接受那个线程
- Thread.sleep(4000);
- }
- static String call() {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return "mmm";
- }
- static String call1(int n) {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return "mmm" + n;
- }
- }