返回

如何正确地停止线程

前言

通常情况下,我们不会手动停止一个线程,而是允许线程运行到结束,然后让它自然停止。但是依然会有许多特殊的情况需要我们提前停止线程,比如:用户突然关闭程序,或程序运行出错重启等。在这种情况下,即将停止的线程在很多业务场景下仍然很有价值。尤其是我们想写一个健壮性很好,能够安全应对各种场景的程序时,正确停止线程就显得格外重要。

为什么不强制停止?

对于Java来说,最正确的停止线程的方式就是使用 interrupt。但是 interrupt仅仅是起到通知被停止线程的作用。对于被停止的线程来说,它拥有完全的自主权, 既可以选择立即停止,也可以选择过一段时间后再停止,甚至压根不停止。 在Java中,程序通过互相通知、相互协作来管理线程,如果不了解线程的工作贸然停止可能会造成一些安全问题。就比如线程正在写入一个文件,但是收到了终止信号,这时候线程就要根据自身业务情况决定是立即停止还是写完文件再停止,如果选择立即停止可能会造成数据的不完整。

interrupt

如何使用interrupt终止线程?

一般来说,被停止线程总是需要不断的判断线程是否收到了终止信号,常见的循环如下:

while (Thread.currentThread().isInterrupted() && do sth.) {
	// do sth. of thread;
}

接下来用代码来看看Java是怎么实现停止线程的逻辑的。一旦调用某个线程的interrupt方法之后,这个线程的中断标志位就会被置为true。 每个线程都有这样的标志位,当线程执行时应该定期检查这个标志位,如果标志位被置为true,说明有程序想要终止该线程。 示例代码如下:

public class StopThread implements Runnable {
    @Override
    public void run() {
        int cnt = 0;
        while (!Thread.currentThread().isInterrupted() && cnt <= 1000) {
            System.out.println(cnt++);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new StopThread());
        thread.start();
        Thread.sleep(2);
        thread.interrupt();
    }
}

此时会得到如下输出:

在上面的代码中,StopThread类的run()方法首先判断进程是否被中断,之后再判断cnt的值是否小于等于1000。线程的主要工作就是打印0~1000的数字。从最后的输出结果中也可以看到,线程并未完整的打印所有数字,这是因为在主线程中创建了子线程来执行这个任务,之后主线程休眠了2毫秒就使用interrupt方法对子线程打断,子线程的中断标志被置为true,同时在进入循环判断时就会因为标志位的改变而中断。

当然,主线程不能休眠太久,否则等子线程执行完任务之后再打断就没有任何意义了。

sleep期间能否感受到中断?

我们知道Java线程在执行阻塞式IO操作、调用sleep函数等一些情况下会进入休眠状态。那么,如果是在sleep期间,线程是否能够感受到中断呢?所以,我们改写一下之前的代码:

public class StopDuringSleep {
    public static void main(String[] args) throws InterruptedException {
        Runnable run = () -> {
            int cnt = 0;
            while (!Thread.currentThread().isInterrupted() && cnt <= 1000) {
                System.out.println("current cnt: " + cnt);
                cnt++;
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        Thread thread = new Thread(run);
        thread.start();
        Thread.sleep(5);
        thread.interrupt();
    }
}

在上面这段代码中,我们在循环中加入子线程的休眠,模拟线程在工作时进入休眠状态,主线程依旧在5毫秒后中断子线程。此时得到的输出结果如下:

可以看到,处于休眠状态的线程被中断是 **可以感受到中断信号的。**同时会抛出一个 InterruptedException 异常,但是此时子线程并没有像之前那样结束运行,而是继续打印接下来的数。这是因为在抛出异常之后, 子线程会清除中断标志位,也就是此时中断标志位为false,自然再进入下一次循环判断时就不会中断。

这里有两点需要注意,在一开始写代码的时候,将catch代码块放在了循环外面,导致一直无法复现出想要的结果。因为在捕获异常之后循环不会再次检查,这时候自然也就会终止所有程序。

另外一点是,如果你照着上面的代码写,IDEA就会建议你不要在循环中加入sleep函数,这会导致忙等。在看了相关资料后更重要的原因应该是sleep函数并不会释放锁,因此造成死锁的可能性就上升了。(当然,应当是有其他原因的,可能得之后遇到相关场景之后才会有更深刻的理解了)。

合理的实现中断

从之前代码了解到,子线程是因为接收到中断信号后抛出异常,而异常处理后会清除中断信号,导致进入循环时中断标志位为false无法中断。了解了原理之后,自然可以从根源上来解决这个问题。

方法签名中抛出异常

可以考虑在方法中抛出异常,并进行捕获。因为 run() 本身是不能抛出异常的,只能通过try/catch来处理异常,如果在方法中抛出异常,那么就可以将中断信号层层传递到顶层,最终让 run() 方法可以捕获异常进行处理,在这种处理方式中可以考虑将 run() 方法中的逻辑抽象为另一方法,并抛出异常。代码如下所示:

public class StopDuringSleep {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new InterruptTask());
        thread.start();
        Thread.sleep(500);
        thread.interrupt();
    }
    static class InterruptTask implements Runnable {
        @Override
        public void run() {
            try {
                process();
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println("task is interrupted");
            }
        }
        private void process() throws InterruptedException {
            int cnt = 0;
            while (!Thread.currentThread().isInterrupted() && cnt <= 1000) {
                System.out.println("current cnt:" + cnt);
                cnt++;
                Thread.sleep(10000);
            }
        }
    }
}

再次中断

之前没能实现中断是因为在捕获异常之后没有进行任何处理,子线程又清除了中断标志,所以就有一种较为暴力的方式,只要在捕获异常之后,在catch方法块中将中断标志再次置为true,待子线程再次进入循环之后便能够收到中断请求,线程也就可以正常退出了。示例代码如下:

public class StopDuringSleep {
    public static void main(String[] args) throws InterruptedException {
        Runnable run = () -> {
            int cnt = 0;
            while (!Thread.currentThread().isInterrupted() && cnt <= 1000) {
                System.out.println("current cnt: " + cnt);
                cnt++;
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    e.printStackTrace();
                }
            }
        };
        Thread thread = new Thread(run);
        thread.start();
        Thread.sleep(5);
        thread.interrupt();
    }
}

在实际生产环境中,不能盲目的屏蔽中断请求,否则可能导致线程无法正确停止。

volatile

我们知道volatile变量 **能够保证变量修改在多线程中仍保持可见性,强制线程去主存中获取这个变量,**所以,在一些代码中,也能见到使用volatile变量来实现线程停止的案例。如下面的代码所示,就是使用volatile代码来实现线程停止的案例。

public class VolatileStopThread implements Runnable {
    private volatile boolean cancenled = false;
    @Override
    public void run() {
        int cnt = 0;
        while (!cancenled && cnt <= 1000) {
            System.out.println("current cnt " + cnt);
            cnt++;
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        VolatileStopThread run = new VolatileStopThread();
        Thread thread = new Thread(run);
        thread.start();
        Thread.sleep(5);
        run.cancenled = true;
    }
}

代码声明了一个叫作 VolatileStopThread 的类, 它实现了 Runnable 接口,然后在 run() 中进行 while 循环。之后启动主线程,主线程经过5毫秒之后将volatile修饰的变量更改为true。这样不再满足while条件,因此子线程会退出循环。当然,这是volatile停止线程方法适用的情况,如果这种方法是正确的,那么应该在其他情况下也适用才是合理的。接下来就来看看这种方法不适用的场景。代码示例如下:

public class Producer implements Runnable {
    private BlockingQueue<Integer> storage;
    public volatile boolean canceled = false;

    public Producer(BlockingQueue<Integer> storage) {
        this.storage = storage;
    }

    @Override
    public void run() {
        int num = 0;
        try {
            while (!canceled && num <= 100000) {
                if (num % 50 == 0) {
                    storage.put(num);
                    System.out.println("put num into storage");
                }
                ++num;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("producer is stopped");
        }
    }
}

public class Consumer {
    BlockingQueue<Integer> storage;

    public Consumer(BlockingQueue<Integer> storage) {
        this.storage = storage;
    }

    public boolean needMoreNum() {
        if (Math.random() > 0.97) {
            return false;
        }
        return true;
    }
}

public class Main {
    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<Integer> storage = new ArrayBlockingQueue<>(8);
        Producer producer = new Producer(storage);
        Thread producerThread = new Thread(producer);
        producerThread.start();

        Thread.sleep(500);
        Consumer consumer = new Consumer(storage);
        while (consumer.needMoreNum()) {
            System.out.println(consumer.storage.take() + "is consumed");
            Thread.sleep(100);
        }
        System.out.println("consumer don't need numbers");
        producer.canceled = true;
        System.out.println(producer.canceled);
    }
}

这是一个消费者生产者模型,声明了一个生产者 Producer,通过 volatile 标记的初始值为 false 的布尔值 canceled 来停止线程。而在 run() 方法中,while 的判断语句是 num 是否小于等于 100000 及 canceled 是否被标记。while 循环体中判断 num 如果是 50 的倍数就放到 storage 仓库中,storage 是生产者与消费者之间进行通信的存储器,当 num 大于 100000 或被通知停止时,会跳出 while 循环并执行 finally 语句块,告诉大家“生产者结束运行”。

而对于消费者 Consumer,它与生产者共用同一个仓库 storage,并且在方法内通过 needMoreNum() 方法判断是否需要继续使用更多的数字,刚才生产者生产了一些 50 的倍数供消费者使用,消费者是否继续使用数字的判断条件是产生一个随机数并与 0.97 进行比较,大于 0.97 就不再继续使用数字。

在main 函数中,首先创建了生产者/消费者共用的仓库 storage,仓库容量是 8,并且建立生产者并将生产者放入线程后启动线程,启动后进行 500 毫秒的休眠,休眠时间保障生产者有足够的时间把仓库塞满,而仓库达到容量后就不会再继续往里塞,这时生产者会阻塞,500 毫秒后消费者也被创建出来,并判断是否需要使用更多的数字,然后每次消费后休眠 100 毫秒

最后我们来看最终的输出结果,如下图所示:

很明显,生产者并没有像预料的那样终止生产,反而是阻塞住了。这是因为 使用了阻塞队列,消费者的在一开始的消费速度远低于生产者的生产速度,因此阻塞队列中始终是满的状态,而 put() 方法会阻塞生产者线程,在阻塞状态中的线程被唤醒之前无法进行下一次循环的判断,所以这种情况下是无法通过volatile标志变量来终止线程。

你要相信流星划过会带给我们幸运,就像现实告诉你我要心存感激