欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

深入理解Handler(二) --- 从源码角度来理解Android线程间消息传递机制

程序员文章站 2022-05-13 22:09:22
...

一、概述

本篇通过源码分析Android线程间消息传递机制

消息机制主要包含:

  • Message:消息分为硬件产生的消息(如按钮、触摸)和软件生成的消息;
  • MessageQueue:消息队列的主要功能向消息池投递消息(MessageQueue.enqueueMessage)和取走消息池的消息(MessageQueue.next);
  • Handler:消息辅助类,主要功能向消息池发送各种消息事件(Handler.sendMessage)和处理相应消息事件(Handler.handleMessage);
  • Looper:不断循环执行(Looper.loop),按分发机制将消息分发给目标处理者。

注 Activity 系列framework 源码使用 android10 release 分支

frameworks/base/core/java/android/os
- Looper.java
- Message.java
- MessageQueue.java
- Handler.java

frameworks/base/core/jni
- android_os_MessageQueue.cpp

system/core/libutils
- Looper.cpp
- include/utils/Looper.h

本文将从以下四个方面进行分析

  • 消息循环过程是怎样的 -> Looper::loop
  • 消息是如何分发处理的 -> Handler:: dispatchMessage
  • 消息是如何获取的-> MessageQueue:: next
  • 消息是如何发送的-> Handler:: sendMeessage

二、消息循环过程是怎样的

当使用Looper时,会先调用prepare再去调用loop

2.1 Looper::prepare

    public static void prepare() {
        prepare(true);
    }

    private static void prepare(boolean quitAllowed) {
       //每个线程只允许执行一次该方法,第二次执行时线程的TLS已有数据,则会抛出异常。
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        //创建Looper对象,并保存到当前线程的TLS区域
        sThreadLocal.set(new Looper(quitAllowed));
    }

  • ThreadLocal: 线程本地存储区,每个线程都有自己的私有的本地存储区域,不同线程之间彼此不能访问对方的TLS区域。

2.1 Looper::loop

    public static void loop() {
        final Looper me = myLooper(); //获取ThreadLocal存储的Looper对象 
        if (me == null) { //Looper.prepare() 必须在loop调用前执行
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        final MessageQueue queue = me.mQueue;//获取Looper对象中的消息队列

        //确保在权限检查时基于本地进程,而不是调用进程。
        Binder.clearCallingIdentity();
        final long ident = Binder.clearCallingIdentity();

        // Allow overriding a threshold with a system prop. e.g.
        // adb shell 'setprop log.looper.1000.main.slow 1 && stop && start'
        final int thresholdOverride =
                SystemProperties.getInt("log.looper."
                        + Process.myUid() + "."
                        + Thread.currentThread().getName()
                        + ".slow", 0);

        boolean slowDeliveryDetected = false;

        for (;;) {
            Message msg = queue.next(); // //可能会阻塞
            if (msg == null) {
                // 没有消息表明消息队列退出。
                return;
            }

            // This must be in a local variable, in case a UI event sets the logger
            final Printer logging = me.mLogging;
            if (logging != null) {
                logging.println(">>>>> Dispatching to " + msg.target + " " +
                        msg.callback + ": " + msg.what);
            }
            // Make sure the observer won't change while processing a transaction.
            final Observer observer = sObserver;

            final long traceTag = me.mTraceTag;
            long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;
            long slowDeliveryThresholdMs = me.mSlowDeliveryThresholdMs;
            if (thresholdOverride > 0) {
                slowDispatchThresholdMs = thresholdOverride;
                slowDeliveryThresholdMs = thresholdOverride;
            }
            final boolean logSlowDelivery = (slowDeliveryThresholdMs > 0) && (msg.when > 0);
            final boolean logSlowDispatch = (slowDispatchThresholdMs > 0);

            final boolean needStartTime = logSlowDelivery || logSlowDispatch;
            final boolean needEndTime = logSlowDispatch;

            if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
                Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
            }

            final long dispatchStart = needStartTime ? SystemClock.uptimeMillis() : 0;
            final long dispatchEnd;
            Object token = null;
            if (observer != null) {
                token = observer.messageDispatchStarting();
            }
            long origWorkSource = ThreadLocalWorkSource.setUid(msg.workSourceUid);
            try {
                msg.target.dispatchMessage(msg); //用于分发Message 
                if (observer != null) {
                    observer.messageDispatched(token, msg);
                }
                dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
            } catch (Exception exception) {
                if (observer != null) {
                    observer.dispatchingThrewException(token, msg, exception);
                }
                throw exception;
            } finally {
                ThreadLocalWorkSource.restore(origWorkSource);
                if (traceTag != 0) {
                    Trace.traceEnd(traceTag);
                }
            }
    
            ...

     
            final long newIdent = Binder.clearCallingIdentity();//确保在分派线程的过程中没有损坏线程的标识。
            ...
            msg.recycleUnchecked();//将Message放入消息池
        }
    }
 

loop()进入循环模式,不断重复下面的操作,直到没有消息时退出循环

  • 首先获取Looper对象 再获取MessageQueue对象
  • 读取MessageQueue的下一条Message;
  • 把Message分发给相应的target(target其实就是一个Handler对象);
  • 再把分发后的Message回收到消息池,以便重复利用。

Message msg = queue.next();这段代表没有消息的时候,是会被阻塞住的。
这里我们看到这边的处理流程先获取下一条消息,并做分发。

我们这边先看看消息是如何分发的 msg.target.dispatchMessage(msg); //用于分发Message

三、消息怎么分发的

3.1 Handler:: dispatchMessage

public void dispatchMessage(Message msg) {
    if (msg.callback != null) {
        //当Message存在回调方法,回调msg.callback.run()方法;
        handleCallback(msg);
    } else {
        if (mCallback != null) {
            //当Handler存在Callback成员变量时,回调方法handleMessage();
            if (mCallback.handleMessage(msg)) {
                return;
            }
        }
        //Handler自身的回调方法handleMessage()
        handleMessage(msg);
    }
}

分发消息流程:

  • 当Message的回调方法不为空时,则回调方法msg.callback.run(),其中callBack数据类型为Runnable,否则进入步骤2;
  • 当Handler的mCallback成员变量不为空时,则回调方法mCallback.handleMessage(msg),否则进入步骤3;
  • 调用Handler自身的回调方法handleMessage(),该方法默认为空,Handler子类通过覆写该方法来完成具体的逻辑。

对于很多情况下,消息分发后的处理方法是第3种情况,即Handler.handleMessage(),一般地往往通过覆写该方法从而实现自己的业务逻辑。

四、消息是如何获取的

4.1 MessageQueue::next()

    @UnsupportedAppUsage
    Message next() {
        final long ptr = mPtr;
        if (ptr == 0) { //当消息循环已经退出,则直接返回
            return null;
        }

        int pendingIdleHandlerCount = -1; // 循环迭代的首次为-1
        int nextPollTimeoutMillis = 0;
        for (;;) {
            if (nextPollTimeoutMillis != 0) {
                Binder.flushPendingCommands();
            }
           //阻塞操作,当等待nextPollTimeoutMillis时长,或者消息队列被唤醒,都会返回
            nativePollOnce(ptr, nextPollTimeoutMillis);
             //如果阻塞操作结束,则去获取消息 
            synchronized (this) {
                // 去获取下一条消息
                final long now = SystemClock.uptimeMillis();
                Message prevMsg = null;
                Message msg = mMessages;
                 //当消息的Handler为空时,则查询异步消息
                if (msg != null && msg.target == null) {
                    // 查找队列中的下一个异步消息
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }
                if (msg != null) {
                    if (now < msg.when) {
                        //当异步消息触发时间大于当前时间,则设置下一次轮询的超时时长
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                          // 获取一条消息,并返回
                        mBlocked = false;
                        if (prevMsg != null) {
                            prevMsg.next = msg.next;
                        } else {
                            mMessages = msg.next;
                        }
                        msg.next = null;
                        if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                        //设置消息的使用状态,即flags |= FLAG_IN_USE
                        msg.markInUse();
                        return msg;  //成功地获取MessageQueue中的下一条即将要执行的消息
                    }
                } else {
                    //没有消息
                    nextPollTimeoutMillis = -1;
                }

               // 现在,所有挂起的消息都已处理完毕,请处理退出消息
                if (mQuitting) {
                    dispose();
                    return null;
                }
               //当消息队列为空,或者是消息队列的第一个消息时
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount <= 0) {
                    //没有idle handlers 需要运行,则循环并等待。
                    mBlocked = true;
                    continue;
                }

                if (mPendingIdleHandlers == null) {
                    mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
                }
                mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
            }

            //只有第一次循环时,会运行idle handlers,执行完成后,重置pendingIdleHandlerCount为0.
            for (int i = 0; i < pendingIdleHandlerCount; i++) {
                final IdleHandler idler = mPendingIdleHandlers[i];
                mPendingIdleHandlers[i] = null; //去掉handler的引用

                boolean keep = false;
                try {
                    keep = idler.queueIdle();//idle时执行的方法
                } catch (Throwable t) {
                    Log.wtf(TAG, "IdleHandler threw exception", t);
                }

                if (!keep) {
                    synchronized (this) {
                        mIdleHandlers.remove(idler);
                    }
                }
            }

               //重置idle handler个数为0,以保证不会再次重复运行
            pendingIdleHandlerCount = 0;

            //当调用一个空闲handler时,一个新message能够被分发,因此无需等待可以直接查询pending message.
            nextPollTimeoutMillis = 0;
        }
    }

nativePollOnce是阻塞操作,其中nextPollTimeoutMillis代表下一个消息到来前,还需要等待的时长;当nextPollTimeoutMillis = -1时,表示消息队列中无消息,会一直等待下去。

当处于空闲时,往往会执行IdleHandler中的方法。当nativePollOnce()返回后,next()从mMessages中提取一个消息
我们先看下native层的nativePollOnce是如何实现的

4.2 android_os_MessageQueue.cpp::nativePollOnce

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

4.3 android_os_MessageQueue.cpp:: pollOnce

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;

    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}

这里调用转发给了Looper::pollOnce

4.4 Looper.cpp:: pollOnce

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
     ....
        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != nullptr) *outFd = 0;
            if (outEvents != nullptr) *outEvents = 0;
            if (outData != nullptr) *outData = nullptr;
            return result;
        }

        result = pollInner(timeoutMillis);
    }
}

这里又是一个for循环,首先判断这个result是不是等于0,这个第一次不会为0,继续往下走,pollInner里也有一个超时的参数,我们继续往下看

4.5 Looper.cpp:: pollInner

这个函数呢,是整个消息循环的核心

int Looper::pollInner(int timeoutMillis) {
       ...
    // Poll.
    int result = POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;
    mPolling = true;//即将处于idle状态

    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);//等待事件发生或者超时,在nativeWake()方法,向管道写端写入字符,则该方法会返回;
    // 不再处于idle状态
    mPolling = false;
    mLock.lock(); //请求锁
    if (mEpollRebuildRequired) {
        mEpollRebuildRequired = false;
        rebuildEpollLocked();
        goto Done; // epoll重建,直接跳转Done;
    }
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
        result = POLL_ERROR;
        goto Done;
    }

    // Check for poll timeout.
    if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - timeout", this);
#endif
        result = POLL_TIMEOUT;
        goto Done;
    }
 //循环遍历,处理所有的事件
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd.get()) {
            if (epollEvents & EPOLLIN) {
                awoken();//已经唤醒了,则读取并清空管道数据,可理解成消化事件
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
            }
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex));//处理request,生成对应的reponse对象,push到响应数组
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
Done: ;
    //再处理Native的Message,调用相应回调方法
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            { // obtain handler
                sp<MessageHandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();//释放锁
                handler->handleMessage(message); // 处理消息事件
            } // release handler

            mLock.lock(); //请求锁
            mSendingMessage = false;
            result = POLL_CALLBACK;
        } else {
            // The last message left at the head of the queue determines the next wakeup time.
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }
    mLock.unlock(); //释放锁

   //处理带有Callback()方法的Response事件,执行Reponse相应的回调方法
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
            // 处理请求的回调方法
            int callbackResult = response.request.callback->handleEvent(fd, events, data);
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
            }
            response.request.callback.clear();//清除reponse引用的回调方法
            result = POLL_CALLBACK;
        }
    }
    return result;
}

pollInner()方法的处理流程:

  1. 先调用epoll_wait(),这是阻塞方法,用于等待事件发生或者超时;
  2. 对于epoll_wait()返回,当且仅当以下3种情况出现:对应着eventCount不同的返回值
  • POLL_ERROR,发生错误,直接跳转到Done;
  • POLL_TIMEOUT,发生超时,直接跳转到Done;
  • 检测到管道有事件发生,则再根据情况做相应处理:

如果是管道读端产生事件,则直接读取管道的数据;
如果是其他事件,则处理request,生成对应的reponse对象,push到reponse数组;

  1. 进入Done标记位的代码段:
  • 先处理Native的Message,调用Native 的Handler来处理该Message;
  • 再处理Response数组,POLL_CALLBACK类型的事件;

讲了如何获取消息和处理消息,回过头来我们看下如何发送消息的

五、消息如何发送的

5.1 Handler::sendMessage

 public final boolean sendMessage(@NonNull Message msg) {
        return sendMessageDelayed(msg, 0);
    }

5.2 Handler:: sendMessageDelayed

   public final boolean sendMessageDelayed(@NonNull Message msg, long delayMillis) {
        if (delayMillis < 0) {
            delayMillis = 0;
        }
        return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
    }

5.3 Handler:: sendMessageAtTime

    public boolean sendMessageAtTime(@NonNull Message msg, long uptimeMillis) {
        MessageQueue queue = mQueue;
        if (queue == null) {
            RuntimeException e = new RuntimeException(
                    this + " sendMessageAtTime() called with no mQueue");
            Log.w("Looper", e.getMessage(), e);
            return false;
        }
        return enqueueMessage(queue, msg, uptimeMillis);
    }

5.4 Handler::enqueueMessage

  private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
            long uptimeMillis) {
        msg.target = this;
        msg.workSourceUid = ThreadLocalWorkSource.getUid();

        if (mAsynchronous) {
            msg.setAsynchronous(true);
        }
        return queue.enqueueMessage(msg, uptimeMillis);
    }

5.5 MessageQueue::enqueueMessage

    boolean enqueueMessage(Message msg, long when) {
       // 每一个普通Message必须有一个target
        if (msg.target == null) {
            throw new IllegalArgumentException("Message must have a target.");
        }
        if (msg.isInUse()) { //检查信息是否有使用过
            throw new IllegalStateException(msg + " This message is already in use.");
        }

        synchronized (this) {
            if (mQuitting) {  //正在退出时,回收msg,加入到消息池
                msg.recycle();
                return false;
            }

            msg.markInUse();
            msg.when = when;
            Message p = mMessages;
            boolean needWake;
            if (p == null || when == 0 || when < p.when) {
            //p为null(代表MessageQueue没有消息) 或者msg的触发时间是队列中最早的, 则进入该该分支
                msg.next = p;
                mMessages = msg;
                needWake = mBlocked;
            } else {
                 //将消息按时间顺序插入到MessageQueue。一般地,不需要唤醒事件队列,除非
               //消息队头存在阻塞,并且同时Message是队列中最早的异步消息
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
            }
            // We can assume mPtr != 0 because mQuitting is false.
            if (needWake) {
            //去唤醒消息队列所在的线程
                nativeWake(mPtr);
            }
        }
        return true;
    }

MessageQueue是按照Message触发时间的先后顺序排列的,队头的消息是将要最早触发的消息。
当有消息需要加入消息队列时,会从队列头开始遍历,直到找到消息应该插入的合适位置,以保证所有消息的时间顺序。

我们来看下nativeWake的实现

5.6 android_os_MessageQueue.cpp::android_os_MessageQueue_nativeWake

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->wake();
}
void NativeMessageQueue::wake() {
    mLooper->wake();
}

5.7 Looper.cpp::wake()

void Looper::wake() {
    uint64_t inc = 1;
    // 向管道mWakeEventFd写入字符1
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
                             mWakeEventFd.get(), nWrite, strerror(errno));
        }
    }
}

mWakeEventFd就是一个计数器,线程收到这个可读事件就会被唤醒

六 、小结

深入理解Handler(二) --- 从源码角度来理解Android线程间消息传递机制

  • Handler通过sendMessage 发送Message到MessageQueue队列
  • Looper通过loop(),不断提取出到达触发条件的Message,并将message交给target来处理
  • 经过dispatchMessage后,交回给handler的handlerMessage来进行相应的处理
  • 将message加入MessageQueue时,往管道写入字符,可以唤醒loop线程;如果MessageQueue中没有Message,并处于idle状态,则会执行idelHandler接口中的方法,往往用于做一些请理性地工作。