欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

JVM heap dump分析,排查Java线程池误用导致OOM

程序员文章站 2022-04-19 20:26:09
...

问题发现

一次偶然用户反馈自己创建的Push任务数据异常,于是后台查看日志排查发现服务异常重启,没有其他异常信息;观察发现,近期出现此类问题较为频繁,当下发的push任务在千万级别时,会出现较高频率重启:

系统内存free图:

JVM heap dump分析,排查Java线程池误用导致OOM

当出现千万量级的push,free的内存从20g一下子打到只剩5g,最后导致OOM,服务直接重启

利用MAT检查内存泄露

heapdump文件生成

在故障定位(尤其是OOM)和性能分析的时候,经常会用到一些文件辅助我们排除代码问题。这些文件记录了JVM运行期间的内存占用、线程执行等情况,这就是我们常说的dump文件。常用的有heap dump和thread dump(也叫javacore,或java dump)。我们可以这么理解:heap dump记录内存信息的,thread dump记录CPU信息。

为了确定是不是OOM导致服务异常被kill,使用指令dmesg,grep相应的进程号,可以确定是因为OOM导致被系统kill,如下图所示:

JVM heap dump分析,排查Java线程池误用导致OOM

因为题主的系统是LINUX,所以使用如下指令生成相应的HEAP文件:

jmap -dump:format=b,file=文件存放目录/heap.hprof  进程号

下载安装好Mat工具(可参考链接:https://blog.csdn.net/wanghuiqi2008/article/details/50724676)

启动打开 File - Open Heap Dump... 菜单,然后选择生成的Heap DUmp文件,选择 "Leak Suspects Report",然后点击 "Finish" 按钮

JVM heap dump分析,排查Java线程池误用导致OOM

打开heap文件之后,显示如下:

JVM heap dump分析,排查Java线程池误用导致OOM

 

JVM heap dump分析,排查Java线程池误用导致OOM

界面常用到功能:

a、Overview(视图):概要界面,显示了概要的信息,并展示了MAT常用的一些功能。

* Details 显示了一些统计信息,包括整  个堆内存的大小、类(Class)的数量、对象(Object)的数量、类加载器(Class Loader)的数量。
* Biggest Objects by Retained Size 使用饼图的方式直观地显示了在JVM堆内存中最大的几个对象,当光标移到饼图上的时候会在左边Inspector和Attributes窗口中显示详细的信息。
* Actions 这里显示了几种常用到的操作,算是功能的快捷方式,包括 Histogram、Dominator Tree、Top Consumers、Duplicate Classes,具体的含义和用法见下面;
* Reports 列出了常用的报告信息,包括 Leak Suspects和Top Components,具体的含义和内容见下;
* Step By Step 以向导的方式引导使用功能。

b、Histogram(直方图):可以查看每个类的实例(即对象)的数量和大小。

c、Dominator Tree(支配树):列出Heap Dump中处于活跃状态中的最大的几个对象,默认按 retained size进行排序,因此很容易找到占用内存最多的对象。

在leak suspects页面,列出了两个可能存在内存泄漏的方法,其中第一个是程序代码中用到的线程池方法:

JVM heap dump分析,排查Java线程池误用导致OOM                               

点击Details进入详情页面。在详情页面Shortest Paths To the Accumulation Point表示GC root到内存消耗聚集点的最短路径,如果某个内存消耗聚集点有路径到达GC root,则该内存消耗聚集点不会被当做垃圾被回收。

JVM heap dump分析,排查Java线程池误用导致OOM               

在All Accumulated Objects by Class列举了该对象所存储的所有内容。

JVM heap dump分析,排查Java线程池误用导致OOM

可以基本定位到问题可能是是在线程池使用不当导致OOM;

于是开始找代码,注意到一段这样的代码提交:

 ThreadPoolExecutor executor = new ThreadPoolExecutor(wakeupConfiguration.wakeupTaskThreadNum, wakeupConfiguration.wakeupTaskThreadNum,
                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
/***
***
***/
executor.execute(() -> {
                        checkAndSendWakeUpMessage1(imeiMd5, fullPushMessage);
                    });

代码大概的意思是定义了一个线程池,然后处理push任务的用户集的分发,代码是读取一个用户信息,然后起一个线程分发push,进行业务逻辑的处理,问题点就在于,定义的线程池没有设置有限的队列大小,并且没有限制任务的qps,当进行checkAndSendWakeupMessage的时候,会涉及到hbae的查询,hbase在任务高峰期时,会出现大量的延迟,一边是千万级别的用户push塞进队列,一边是队列的延迟,不限长度的队列直接导致队列拥堵,导致内存爆掉.

最后使用限制大小的队列,引入调用者模式饱和策略,成功缓解服务oom的异常.下面总结下关于线程池的知识点,避免大家在以后的工作中踩坑.

Java线程池

虽然Java线程池理论,以及构造线程池的各种参数,以及 Executors 提供的默认实现之前研读过,不过线上还没有发生过线程池误用引发的事故,所以有必要把这些参数再仔细琢磨一遍。

优先补充一些线程池的工作理论,有助于展开下面的内容。线程池顾名思义,就是由很多线程构成的池子,来一个任务,就从池子中取一个线程,处理这个任务。这个理解是我在第一次接触到这个概念时候的理解,虽然整体基本切入到核心,但是实际上会比这个复杂。例如线程池肯定不会无限扩大的,否则资源会耗尽;当线程数到达一个阶段,提交的任务会被暂时存储在一个队列中,如果队列内容可以不断扩大,极端下也会耗尽资源,那选择什么类型的队列,当队列满如何处理任务,都有涉及很多内容。线程池总体的工作过程如下图:

JVM heap dump分析,排查Java线程池误用导致OOM

线程池内的线程数的大小相关的概念有两个,一个是核心池大小,还有最大池大小。如果当前的线程个数比核心池个数小,当任务到来,会优先创建一个新的线程并执行任务。当已经到达核心池大小,则把任务放入队列,为了资源不被耗尽,队列的最大容量可能也是有上限的,如果达到队列上限则考虑继续创建新线程执行任务,如果此刻线程的个数已经到达最大池上限,则考虑把任务丢弃。

在 java.util.concurrent 包中,提供了 ThreadPoolExecutor 的实现。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
} 

既然有了刚刚对线程池工作原理对概述,这些参数就很容易理解了:

corePoolSize- 核心池大小,既然如前原理部分所述。需要注意的是在初创建线程池时线程不会立即启动,直到有任务提交才开始启动线程并逐渐时线程数目达到corePoolSize。若想一开始就创建所有核心线程需调用prestartAllCoreThreads方法。

maximumPoolSize-池中允许的最大线程数。需要注意的是当核心线程满且阻塞队列也满时才会判断当前线程数是否小于最大线程数,并决定是否创建新线程。

keepAliveTime - 当线程数大于核心时,多于的空闲线程最多存活时间

unit - keepAliveTime 参数的时间单位。

workQueue - 当线程数目超过核心线程数时用于保存任务的队列。主要有3种类型的BlockingQueue可供选择:*队列,有界队列和同步移交。将在下文中详细阐述。从参数中可以看到,此队列仅保存实现Runnable接口的任务。 别看这个参数位置很靠后,但是真的很重要,因为楼主的坑就因这个参数而起,这些细节有必要仔细了解清楚。

threadFactory - 执行程序创建新线程时使用的工厂。

handler - 阻塞队列已满且线程数达到最大值时所采取的饱和策略。java默认提供了4种饱和策略的实现方式:中止、抛弃、抛弃最旧的、调用者运行。将在下文中详细阐述。

可选择的阻塞队列BlockingQueue详解

在重复一下新任务进入时线程池的执行策略:
如果运行的线程少于corePoolSize,则 Executor始终首选添加新的线程,而不进行排队。(如果当前运行的线程小于corePoolSize,则任务根本不会存入queue中,而是直接运行)
如果运行的线程大于等于 corePoolSize,则 Executor始终首选将请求加入队列,而不添加新的线程。
如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
主要有3种类型的BlockingQueue:

*队列

队列大小无限制,常用的为*的LinkedBlockingQueue,使用该队列做为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。阅读代码发现,Executors.newFixedThreadPool 采用就是 LinkedBlockingQueue,而楼主踩到的就是这个坑,当QPS很高,发送数据很大,大量的任务被添加到这个*LinkedBlockingQueue 中,导致cpu和内存飙升服务器挂掉。(ps:题主的问题就出现在这个地方,定义了一个无限长度的Queue,)

有界队列

常用的有两类,一类是遵循FIFO原则的队列如ArrayBlockingQueue与有界的LinkedBlockingQueue,另一类是优先级队列如PriorityBlockingQueue。PriorityBlockingQueue中的优先级由任务的Comparator决定。
使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。

在我们的修复方案中,选择的就是这个类型的队列,虽然会有部分任务被丢失,但是我们线上是排序日志搜集任务,所以对部分对丢失是可以容忍的。

同步移交队列

如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用*线程池或者有饱和策略时才建议使用该队列。

可选择的饱和策略RejectedExecutionHandler详解

JDK主要提供了4种饱和策略供选择。4种策略都做为静态内部类在ThreadPoolExcutor中进行实现。

AbortPolicy中止策略

该策略是默认饱和策略。

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
 } 

使用该策略时在饱和时会抛出RejectedExecutionException(继承自RuntimeException),调用者可捕获该异常自行处理。

Demo:

public class AbortPolicyDemo {  

    private static final int THREADS_SIZE = 1;  
    private static final int CAPACITY = 1;  

    public static void main(String[] args) throws Exception {  

        // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,  
                new ArrayBlockingQueue<Runnable>(CAPACITY));  
        // 设置线程池的拒绝策略为"抛出异常"  
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());  

        try {  

            // 新建10个任务,并将它们添加到线程池中。  
            for (int i = 0; i < 10; i++) {  
                Runnable myrun = new MyRunnable("task-"+i);  
                pool.execute(myrun);  
            }  
        } catch (RejectedExecutionException e) {  
            e.printStackTrace();  
            // 关闭线程池  
            pool.shutdown();  
        }  
    }  
}  

DiscardPolicy抛弃策略

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}

如代码所示,不做任何处理直接抛弃任务

Demo:

public class DiscardPolicyDemo {  

    private static final int THREADS_SIZE = 1;  
    private static final int CAPACITY = 1;  

    public static void main(String[] args) throws Exception {  

        // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(CAPACITY));  
        // 设置线程池的拒绝策略为"丢弃"  
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());  

        // 新建10个任务,并将它们添加到线程池中。  
        for (int i = 0; i < 10; i++) {  
            Runnable myrun = new MyRunnable("task-"+i);  
            pool.execute(myrun);  
        }  
        // 关闭线程池  
        pool.shutdown();  
    }  
}  

DiscardOldestPolicy抛弃旧任务策略

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
} 

如代码,先将阻塞队列中的头元素出队抛弃,再尝试提交任务。如果此时阻塞队列使用PriorityBlockingQueue优先级队列,将会导致优先级最高的任务被抛弃,因此不建议将该种策略配合优先级队列使用。

Demo:

public class DiscardOldestPolicyDemo {  

    private static final int THREADS_SIZE = 1;  
    private static final int CAPACITY = 1;  

    public static void main(String[] args) throws Exception {  

        // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,  
                new ArrayBlockingQueue<Runnable>(CAPACITY));  
        // 设置线程池的拒绝策略为"DiscardOldestPolicy"  
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());  

        // 新建10个任务,并将它们添加到线程池中。  
        for (int i = 0; i < 10; i++) {  
            Runnable myrun = new MyRunnable("task-"+i);  
            pool.execute(myrun);  
        }  
        // 关闭线程池  
        pool.shutdown();  
    }  
} 

CallerRunsPolicy调用者运行

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
} 

既不抛弃任务也不抛出异常,直接运行任务的run方法,换言之将任务回退给调用者来直接运行。使用该策略时线程池饱和后将由调用线程池的主线程自己来执行任务,因此在执行任务的这段时间里主线程无法再提交新任务,从而使线程池中工作线程有时间将正在处理的任务处理完成。

Demo:

public class CallerRunsPolicyDemo {  

    private static final int THREADS_SIZE = 1;  
    private static final int CAPACITY = 1;  

    public static void main(String[] args) throws Exception {  

        // 创建线程池。线程池的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),"线程池"的阻塞队列容量为1(CAPACITY)。  
        ThreadPoolExecutor pool = new ThreadPoolExecutor(THREADS_SIZE, THREADS_SIZE, 0, TimeUnit.SECONDS,  
                new ArrayBlockingQueue<Runnable>(CAPACITY));  
        // 设置线程池的拒绝策略为"CallerRunsPolicy"  
        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  

        // 新建10个任务,并将它们添加到线程池中。  
        for (int i = 0; i < 10; i++) {  
            Runnable myrun = new MyRunnable("task-"+i);  
            pool.execute(myrun);  
        }  

        // 关闭线程池  
        pool.shutdown();  
    }  
}  

Java提供的四种常用线程池解析

简而言之 Executors 工厂方法Executors.newCachedThreadPool() 提供了*线程池,可以进行自动线程回收;Executors.newFixedThreadPool(int) 提供了固定大小线程池,内部使用*队列;Executors.newSingleThreadExecutor() 提供了单个后台线程。

详细介绍一下上述四种线程池。

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
} 

在newCachedThreadPool中如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
初看该构造函数时我有这样的疑惑:核心线程池为0,那按照前面所讲的线程池策略新任务来临时无法进入核心线程池,只能进入 SynchronousQueue中进行等待,而SynchronousQueue的大小为1,那岂不是第一个任务到达时只能等待在队列中,直到第二个任务到达发现无法进入队列才能创建第一个线程?
这个问题的答案在上面讲SynchronousQueue时其实已经给出了,要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。因此即便SynchronousQueue一开始为空且大小为1,第一个任务也无法放入其中,因为没有线程在等待从SynchronousQueue中取走元素。因此第一个任务到达时便会创建一个新线程执行该任务。

newFixedThreadPool

 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
 }

看代码一目了然了,线程数量固定,使用无限大的队列。再次强调,楼主就是踩的这个无限大队列的坑。

newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}

在来看看ScheduledThreadPoolExecutor()的构造函数

 public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    } 

ScheduledThreadPoolExecutor的父类即ThreadPoolExecutor,因此这里各参数含义和上面一样。值得关心的是DelayedWorkQueue这个阻塞对列,在上面没有介绍,它作为静态内部类就在ScheduledThreadPoolExecutor中进行了实现。简单的说,DelayedWorkQueue是一个*队列,它能按一定的顺序对工作队列中的元素进行排列。

newSingleThreadExecutor

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
 } 

首先new了一个线程数目为 1 的ScheduledThreadPoolExecutor,再把该对象传入DelegatedScheduledExecutorService中,看看DelegatedScheduledExecutorService的实现代码:

DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
            super(executor);
            e = executor;
} 

在看看它的父类

DelegatedExecutorService(ExecutorService executor) { 
           e = executor; 
} 

其实就是使用装饰模式增强了ScheduledExecutorService(1)的功能,不仅确保只有一个线程顺序执行任务,也保证线程意外终止后会重新创建一个线程继续执行任务。

参考:https://zhuanlan.zhihu.com/p/32867181

https://www.jianshu.com/p/c34af977ade1

https://blog.csdn.net/wanghuiqi2008/article/details/50724676

https://www.jianshu.com/p/aa420c7df275

相关标签: 项目开发