深入理解Handler原理

一. Handler初始化

Handler–> Looper <– MessageQueue <– Message


Handler需要Looper,Looperl里包含构建的MessageQueue。 可以在构造的时候,通过参数传入。

1. Handler 构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Handler(Callback callback, boolean async) {

//检查潜在的泄漏
if (FIND_POTENTIAL_LEAKS) {
final Class<? extends Handler> klass = getClass();
if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
klass.getCanonicalName());
}
}

//获取当前线程的Looper
mLooper = Looper.myLooper();
//Handler需要Looper,如果为null,抛出异常
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread " + Thread.currentThread()
+ " that has not called Looper.prepare()");
}
//设置当前Handler的MessageQueue 等于Looper的MessageQueue
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
}

2. 上面的mLooper 为什么可能为空?要调用 Looper.prepare()?


先看一下mLooper = Looper.myLooper()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//这里可以看到是通过ThreadLocal线程持有的一个 Looper
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}

//获取当前线程的map,并返回looper
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
//如果当前线程的map还没有,就在这个方法里面创建
return setInitialValue();
}


再看一下 Looper.prepare(),这里解释了为什么只能调用一次

1
2
3
4
5
6
7
8
private static void prepare(boolean quitAllowed) {
//如果有了,就抛错,一个线程保证了只能有一个Looper
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
//没有就new一个 set
sThreadLocal.set(new Looper(quitAllowed));
}

3. 为什么有什么需要手动调用Looper.prepare() ?

1
当前线程有了looper就不需要手动调用。如果是在主线程ActivityThread,主线程已经默认有了一个Looper了,这个Looper会处理界面的各种事件。


在ActivityThread的main方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//这里就对主线程,调用了Looper.prepare(),所以在主线程启动的时候,就拥有了Looper,后面的是在主线程中的Handler默认共享该Looepr
Looper.prepareMainLooper();

//展开就是Looper下的这个方法
public static void prepareMainLooper() {
//这里就是Looper.prepare()
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}

总结

1
一个Handler只能对应一个Looepr,一个Looper 持有MessageQueue。一个Looper可以对应多个Hander。那么Message是怎么和Handler联系起来的呢,在加入消息的时候 通过  msg.target = this; this就是handler本身联系起来的。

二. Handder 发送消息 —> 入队


通常我们新建消息的时候 通过 Message obtain = Message.obtain();获取,主要是复用里面的Message对象。
通常我们调用

1
2
3
4
sendMessage(Message msg)//发送消息
——> sendMessageDelayed(msg, 0)//延时为0
——> sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis)//转换成绝对时间
——> enqueueMessage(queue, msg, uptimeMillis)//开始入队


下面的就是MeaageQueue入队的逻辑了,类似链表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
boolean enqueueMessage(Message msg, long when) {
//target 就是发送消息的Hander 如果message的handelr为null,抛错
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}

//msg是不是正在被使用,比如说这个消息已经入队了。就不能再加入
// 连续sendMessage同一个msg对象就会抛错,当然msg复用obtian和new的时候FLAG_IN_USE会重置
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}

//保证线程安全
synchronized (this) {
//当前队列是不是调用了quit
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}

//标记为Use状态
msg.markInUse();
msg.when = when;

//先保留之前的节点,准备插入现在的msg
Message p = mMessages;
boolean needWake;

//p = null 现在队列是空的
// 为什么 会有 when = 0 ?Handler提供了一个sendMessageAtFrontOfQueue 方法,保证某个消息插到消息头,及时处理
//when < p.when 当前插入的消息 比前一个节点的时间更小,意味着要先执行消息
//其实就是头插入 类似 null<--msg 或者 null<--.<--p<--msg
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
//根据当前的MessageQueue 状态,确定是否要唤醒队列起来处理消息
//为什么要唤醒?因为队列为空了,或者队列里面的消息都是明天才要处理的,那么今晚可以睡觉。但是睡梦中又有新的(bug)消息来了,就是这次要插入的消息,必须起来处理(bug)消息
needWake = mBlocked;
} else {

// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
//以上情况不满足,就是要把当前这个消息插入到队列中间去了
// 就按时间when的顺序插入
//当前 消息队列是否阻塞 && 队头的消息target=null && 是否支持异步
//target == null 为什么可以为null?不是不能为null吗?见后面
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
//寻找插入点
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
//插入消息
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}

// We can assume mPtr != 0 because mQuitting is false.
//是否需要唤醒?mPtr used by native code
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}

target == null 为什么可以为null?不是不能为null吗?


因为提供了骚操作,还有这种打开方式。
postSyncBarrier。 以前叫enqueueSyncBarrier 网上找的其它有写到这个
Barrier 可以联想到JUC的CyclicBarrier。
就是插入一下Barrier消息,这个消息之后的所有消息会阻塞在这个消息之后。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

public int postSyncBarrier() {
//添加现在的时间
return postSyncBarrier(SystemClock.uptimeMillis());
}

//消息入队
private int postSyncBarrier(long when) {
// Enqueue a new sync barrier token.
// We don't need to wake the queue because the purpose of a barrier is to stall it.
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();
msg.markInUse();
msg.when = when;
msg.arg1 = token;
//这里没有setTarget ,所以是null的
Message prev = null;
Message p = mMessages;
//寻找插入点
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
//插入这个Barrier消息
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}

异步消息?


作者:AngelDevil


出处:https://www.cnblogs.com/angeldevil/p/3340644.html


转载请注明出处!


所谓的异步消息其实就是这样的,我们可以通过enqueueBarrier往消息队列中插入一个Barrier,那么队列中执行时间在这个Barrier以后的同步消息都会被这个Barrier拦截住无法执行,直到我们调用removeBarrier移除了这个Barrier,而异步消息则没有影响,消息默认就是同步消息,除非我们调用了Message的setAsynchronous,这个方法是隐藏的。只有在初始化Handler时通过参数指定往这个Handler发送的消息都是异步的,这样在Handler的enqueueMessage中就会调用Message的setAsynchronous设置消息是异步的,从上面Handler.enqueueMessage的代码中可以看到。


所谓异步消息,其实只有一个作用,就是在设置Barrier时仍可以不受Barrier的影响被正常处理,如果没有设置Barrier,异步消息就与同步消息没有区别,可以通过removeSyncBarrier移除Barrier:


nativeWake唤醒 ? nativePollOnce?睡眠


native方法。轮询利用epoll机制
有点类似wait 与 notify。
具体见后面的native分析

1
2
3
private native static void nativeWake(long ptr);

private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/

三. Handler 处理消息及延时消息 —> 出队


消息是怎么处理的呢?


通过调用Looper.loop()方法,开始处理消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111

public static void loop() {
//获取当前线程的Looper
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue;

// Make sure the identity of this thread is that of the local process,
// and keep track of what that identity token actually is.
//清空远程调用端的uid和pid,用当前本地进程的uid和pid替代;
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();

// Allow overriding a threshold with a system prop. e.g.
// adb shell 'setprop log.looper.1000.main.slow 1 && stop && start'
final int thresholdOverride =
SystemProperties.getInt("log.looper."
+ Process.myUid() + "."
+ Thread.currentThread().getName()
+ ".slow", 0);

boolean slowDeliveryDetected = false;

//这里开始处理消息,死循环
for (;;) {
//这里可能阻塞,现在没有消息需要处理的消息
Message msg = queue.next(); // might block
if (msg == null) {
// No message indicates that the message queue is quitting.
//msg = null,消息队列退出了,退出死循环
return;
}

// This must be in a local variable, in case a UI event sets the logger
//每次更新log,防止被set了最新的了,打印分发log
final Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " " +
msg.callback + ": " + msg.what);
}

final long traceTag = me.mTraceTag;
long slowDispatchThresholdMs = me.mSlowDispatchThresholdMs;
long slowDeliveryThresholdMs = me.mSlowDeliveryThresholdMs;
if (thresholdOverride > 0) {
slowDispatchThresholdMs = thresholdOverride;
slowDeliveryThresholdMs = thresholdOverride;
}
final boolean logSlowDelivery = (slowDeliveryThresholdMs > 0) && (msg.when > 0);
final boolean logSlowDispatch = (slowDispatchThresholdMs > 0);

final boolean needStartTime = logSlowDelivery || logSlowDispatch;
final boolean needEndTime = logSlowDispatch;

if (traceTag != 0 && Trace.isTagEnabled(traceTag)) {
Trace.traceBegin(traceTag, msg.target.getTraceName(msg));
}

final long dispatchStart = needStartTime ? SystemClock.uptimeMillis() : 0;
final long dispatchEnd;
try {
//这里开始调用 target来处理消息,tartget就是Hander
// 转而调用到我们重写的dispatchMessage 方法里面去
msg.target.dispatchMessage(msg);
dispatchEnd = needEndTime ? SystemClock.uptimeMillis() : 0;
} finally {
if (traceTag != 0) {
Trace.traceEnd(traceTag);
}
}

//下面主要是打印log,消息处理可能延时
if (logSlowDelivery) {
if (slowDeliveryDetected) {
if ((dispatchStart - msg.when) <= 10) {
Slog.w(TAG, "Drained");
slowDeliveryDetected = false;
}
} else {
if (showSlowLog(slowDeliveryThresholdMs, msg.when, dispatchStart, "delivery",
msg)) {
// Once we write a slow delivery log, suppress until the queue drains.
slowDeliveryDetected = true;
}
}
}
if (logSlowDispatch) {
showSlowLog(slowDispatchThresholdMs, dispatchStart, dispatchEnd, "dispatch", msg);
}

if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}

// Make sure that during the course of dispatching the
// identity of the thread wasn't corrupted.
final long newIdent = Binder.clearCallingIdentity();
if (ident != newIdent) {
Log.wtf(TAG, "Thread identity changed from 0x"
+ Long.toHexString(ident) + " to 0x"
+ Long.toHexString(newIdent) + " while dispatching to "
+ msg.target.getClass().getName() + " "
+ msg.callback + " what=" + msg.what);
}

//回收消息
msg.recycleUnchecked();
}
}


回收消息,标记清空,添加缓存池


android.os.Message#recycleUnchecked

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Recycles a Message that may be in-use.
* Used internally by the MessageQueue and Looper when disposing of queued Messages.
*/
void recycleUnchecked() {
// Mark the message as in use while it remains in the recycled object pool.
// Clear out all other details.
flags = FLAG_IN_USE;
what = 0;
arg1 = 0;
arg2 = 0;
obj = null;
replyTo = null;
sendingUid = -1;
when = 0;
target = null;
callback = null;
data = null;

synchronized (sPoolSync) {
if (sPoolSize < MAX_POOL_SIZE) {
next = sPool;
sPool = this;
sPoolSize++;
}
}
}


主要来看一下怎么取的消息?


android.os.MessageQueue#next

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
Message next() {
// Return here if the message loop has already quit and been disposed.
// This can happen if the application tries to restart a looper after quit
// which is not supported.
final long ptr = mPtr;
//mPtr 退出了, return null,前面的 Looper.loop 收到null消息就退出死循环了
if (ptr == 0) {
return null;
}

//什么是 IdleHandler ?见后面
//这个现在主要做标记
int pendingIdleHandlerCount = -1; // -1 only during first iteration
// 休息时间
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
//本地方法
nativePollOnce(ptr, nextPollTimeoutMillis);

synchronized (this) {
// Try to retrieve the next message. Return if found.
//取消息
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
//msg != null,但是target = null,前面说了这个是barrier消息
if (msg != null && msg.target == null) {
// Stalled by a barrier. Find the next asynchronous message in the queue.
//继续寻找下一个barrier消息,找到一个不是异步的消息退出
//为什么不处理这个barrier?barrier 就是一个时间界限,且没有Handler,消息队列就是按时间排序的,处理到了当前这个barrier消息,说明时间满足
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
//若果现在的时间还小于这个消息设置的执行时间
if (now < msg.when) {
// Next message is not ready. Set a timeout to wake up when it is ready.
//计算一下可以休息多久时间
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
//如果时间满足,就是这个消息了,返回这个消息给loop
// Got a message.
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
// No more messages.
nextPollTimeoutMillis = -1;
}

// Process the quit message now that all pending messages have been handled.
//可能在这个时间,退出了
if (mQuitting) {
dispose();
return null;
}

// If first time idle, then get the number of idlers to run.
// Idle handles only run if the queue is empty or if the first message
// in the queue (possibly a barrier) is due to be handled in the future.

// pendingIdleHandlerCount = -1 标记没有被改。有空闲了,处理IdleHandler
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
//居然没有IdleHandler?那就可以休息了
if (pendingIdleHandlerCount <= 0) {
// No idle handlers to run. Loop and wait some more.
//标记我要休息了,发送消息的时候,可以判断是否需要唤醒我
mBlocked = true;
// 直接continue ,在上面的 nativePollOnce(ptr, nextPollTimeoutMillis)就是睡眠了
continue;
}

//把 mIdleHandlers 转化到 mPendingIdleHandlers
//mIdleHandlers 是会变化的
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}


// Run the idle handlers.
// We only ever reach this code block during the first iteration.
//下面就是处理IdleHandler了
// 显然这里面不能做耗时操作,不然会影响消息的及时处理
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler

boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}

if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}

//处理完了,标记复位
// Reset the idle handler count to 0 so we do not run them again.
pendingIdleHandlerCount = 0;
// While calling an idle handler, a new message could have been delivered
// so go back and look again for a pending message without waiting.
//在处理IdleHandler的过程中,没有睡眠,可能有新的消息来了,不能睡眠
nextPollTimeoutMillis = 0;
}
}

什么是 IdleHandler ?


IdleHandler 可以用来提升性能,主要用在我们希望能够在当前线程消息队列空闲时做些事情(譬如 UI 线程在显示完成后,如果线程空闲我们就可以提前准备其他内容)的情况下,不过最好不要做耗时操作。具体用法如下。

1
2
3
4
5
6
7
8
//getMainLooper().myQueue()或者Looper.myQueue()
Looper.myQueue().addIdleHandler(new IdleHandler() {
@Override
public boolean queueIdle() {
//你要处理的事情
return false;
}
});

可能存在的问题?

  • 消息不是及时处理的,及时性不保证。

  • 我发送了一个延时MSG,这个MSG在没有被处理的情况下,这个MSG可能持有了其它引用导致内存泄漏。所以在在Activity退出的时候,clear一下msg,清掉handler。

  • 类似 Dialog引发的内存泄漏

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    for (;;) {
    //假设这是循环的第10次,然后这里阻塞了。然后其它地方刚好从线程池里面取走了一个消息,但是没有被发送。
    // 想一想这个msg依然持有对第9次消息的引用,并在读到消息的时候覆盖。间接的这个msg就保持其它引用。
    Message msg = queue.next();
    if (msg == null) {
    return;
    }
    msg.target.dispatchMessage(msg);
    //第9次取的消息,在这里标记复位了,表示回收了
    msg.recycleUnchecked();

    //这里可以加一句
    msg = null;

    }
  • 最后用一张图,来表示整个消息机制


四. Native层


主要分析 nativeWake() nativePollOnce() 方法 实现


够着MessageQueue的时候调用了nativeInit();

1
2
3
4
5
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
// 通过JNI调用了Native层的相关函数,导致了NativeMessageQueue的创建
mPtr = nativeInit();
}


对应的 C++ 层的实现

1
2
3
4
5
6
7
8
9
10
11
12
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
// 在Native层又创建了NativeMessageQueue
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}

nativeMessageQueue->incStrong(env);
// 这里返回值是Java层的mPtr,因此mPtr实际上是Java层MessageQueue与NativeMessesageQueue的桥梁
return reinterpret_cast<jlong>(nativeMessageQueue);
}
1
此时Java层和Native层的MessageQueue被mPtr连接起来了,NativeMessageQueue只是Java层MessageQueue在Native层的体现,其本身并没有实现Queue的数据结构,而是从其父类MessageQueue中继承mLooper变量。与Java层类型,这个Looper也是线程级别的单列。


NativeMessageQueue无参构造函数

1
2
3
4
5
6
7
8
9
10
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
// 获取TLS中的Looper对象
mLooper = Looper::getForThread();
if (mLooper == NULL) {
// 创建native层的Looper对象
mLooper = new Looper(false);
// 保存native 层的Looper到TLS中(线程级单例)
Looper::setForThread(mLooper);
}
}
  • Looper::getForThread():功能类比于Java层的Looper.myLooper();
  • Looper::setForThread(mLooper):功能类比于Java层的ThreadLocal.set()


通过上述代码我们知道:

  • 1、Java层的Looper的创建导致了MessageQueue的创建,而在Native层则刚刚相反,NativeMessageQueue的创建导致了Looper的创建
  • 2、MessageQueue是在Java层与Native层有着紧密的联系,但是此次Native层的Looper与Java层的Looper没有任何关系。
  • 3、Native层的Looper创建和Java层的也完全不一样,它利用了Linux的epoll机制检测了Input的fd和唤醒fd。从功能上来讲,这个唤醒fd才是真正处理Java Message和Native Message的钥匙。


所以创建MessageQueue的流程如图

为什么要关联一个Native MessageQueue ???


首先Native 层也提供了 sendMesage,其它地方可以向这个消息队列发送消息。比如响应点击事件,WMS中接收到消息后,会调用ViewRootImpl中的dispatchInputEvent

1
2
3
4
   void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now, handler, message);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
   void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d",
this, uptime, handler.get(), message.what);
#endif

size_t i = 0;
{ // acquire lock
// 请求锁
AutoMutex _l(mLock);

size_t messageCount = mMessageEnvelopes.size();
// 找到message应该插入的位置i
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}

MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);

// Optimization: If the Looper is currently sending a message, then we can skip
// the call to wake() because the next thing the Looper will do after processing
// messages is to decide when the next wakeup time should be. In fact, it does
// not even matter whether this code is running on the Looper thread.
// 如果当前正在发送消息,那么不再调用wake(),直接返回
if (mSendingMessage) {
return;
}
} // release lock
// 释放锁
// Wake the poll loop only when we enqueue a new message at the head.
// 当消息加入到消息队列的头部时,需要唤醒poll循环
if (i == 0) {
wake();
}
}
  • sendMessage()和sendMessageDelayed()都是调用sendMessageAtTime()来完成消息插
  • sendMessageAtTime() 和上面Java层类似
  • MessageQueue成为Java层和Native层的枢纽,既能处理上层消息,也能处理Native消息,当然Native层消息是Native层Hanlder处理。

感觉自己写一个Handler 根本用不到native 的 MessageQueue ???

为什么主线程android.app.ActivityThread#main 里面明明有个 Looper.loop() 却不会卡UI主线程?


因为UI事件本身也是通过主线程的Handelr机制去处理的。当前主线程loop了,还有消息从其它地方过来。当前Activity的可以响应回调事件和 广播事件。
引用 https://stackoverflow.com/questions/35931899/why-main-threads-looper-loop-doesnt-block-ui-thread

1
2
3
If there is no message, it means that no updates are required. All of our code is just a callback, such as Application onCreate, Activit onCreate, BroadcastReceiver onReceive.

All update callbacks are caused by the message, and these messages are from the system services, such as ActivityManagerService, InputManagerService, WindowMangerService. If you need to update the UI, the android service will send a message to the loop via the IPC.

nativePollOnce()

1
2
3
4
5
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) {
//将Java层传递下来的mPtr转换为nativeMessageQueue
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
  • 首先,将Java层传递下来的mPtr转换为nativeMessageQueue
  • 其次,nativeMessageQueue调用pollOnce(env, obj, timeoutMillis)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
// 重点函数
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;

if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
  • 调用pollOnce(timeoutMillis),最后调用到 Looper::pollOnce(int, int, int, void**)函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
// 对fd对应的Responses进行处理,后面发现Response里都是活动fd
for (;;) {
// 先处理没有Callback的Response事件
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
// ident>=0则表示没有callback,因为POLL_CALLBACK=-2
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
"fd=%d, events=0x%x, data=%p",
this, ident, fd, events, data);
#endif
if (outFd != NULL) *outFd = fd;
if (outEvents != NULL) *outEvents = events;
if (outData != NULL) *outData = data;
return ident;
}
}
// 注意这里处于循环内部,改变result的值在后面的pollInner
if (result != 0) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
if (outFd != NULL) *outFd = 0;
if (outEvents != NULL) *outEvents = 0;
if (outData != NULL) *outData = NULL;
return result;
}
// 再处理内部轮训
result = pollInner(timeoutMillis);
}
}


Looper::pollInner()函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
   int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif

// Adjust the timeout based on when the next message is due.
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - next message in %" PRId64 "ns, adjusted timeout: timeoutMillis=%d",
this, mNextMessageUptime - now, timeoutMillis);
#endif
}

// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;

// We are about to idle.
// 即将处于idle状态
mPolling = true;
// fd最大的个数是16
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
// 等待时间发生或者超时,在nativeWake()方法,向管道写端写入字符,则方法会返回。
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

// No longer idling.
// 不再处于idle状态
mPolling = false;
// 请求锁 ,因为在Native Message的处理和添加逻辑上需要同步
// Acquire lock.
mLock.lock();

// Rebuild epoll set if needed.
// 如果需要,重建epoll
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
// epoll重建,直接跳转到Done
rebuildEpollLocked();
goto Done;
}

// Check for poll error.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error, errno=%d", errno);
// epoll事件个数小于0,发生错误,直接跳转Done
result = POLL_ERROR;
goto Done;
}

// Check for poll timeout.
//如果需要,重建epoll
if (eventCount == 0) {
//epoll事件个数等于0,发生超时,直接跳转Done
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - timeout", this);
#endif
result = POLL_TIMEOUT;
goto Done;
}

// Handle all events.
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
// 循环处理所有的事件
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
//首先处理mWakeEventFd
if (fd == mWakeEventFd) {
//如果是唤醒mWakeEventFd有反应
if (epollEvents & EPOLLIN) {
/**重点代码*/
// 已经唤醒了,则读取并清空管道数据
awoken(); // 该函数内部就是read,从而使FD可读状态被清除
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
// 其他input fd处理,其实就是将活动放入response队列,等待处理
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
// 处理request,生成对应的response对象,push到响应数组
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
// Invoke pending message callbacks.
// 再处理Native的Message,调用相应回调方法
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
// Remove the envelope from the list.
// We keep a strong reference to the handler until the call to handleMessage
// finishes. Then we drop it so that the handler can be deleted *before*
// we reacquire our lock.
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
// 释放锁
mLock.unlock();

#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
this, handler.get(), message.what);
#endif
// 处理消息事件
handler->handleMessage(message);
} // release handler
// 请求锁
mLock.lock();
mSendingMessage = false;
// 发生回调
result = POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}

// Release lock.
// 释放锁
mLock.unlock();

// Invoke all response callbacks.
// 处理带有Callback()方法的response事件,执行Response相应的回调方法
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
this, response.request.callback.get(), fd, events, data);
#endif
// Invoke the callback. Note that the file descriptor may be closed by
// the callback (and potentially even reused) before the function returns so
// we need to be a little careful when removing the file descriptor afterwards.
// 处理请求的回调方法
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
// 移除fd
removeFd(fd, response.request.seq);
}

// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
// 清除response引用的回调方法
response.request.callback.clear();
// 发生回调
result = POLL_CALLBACK;
}
}
return result;
}


pollOnce返回值说明

  • POLL_WAKE: 表示由wake()出发,即pipe写端的write事件触发
  • POLL_CALLBACK:表示某个被监听fd被触发
  • POLL_TIMEOUT:表示等待超时
  • POLL_ERROR:表示等待期间发生错误


pollInner()方法的处理流程:

  • 1、先调用epoll_wait(),这是阻塞方法,用于等待事件发生或者超时。
  • 2、对于epoll_wait()返回,当且仅当以下3种情况出现
    • POLL_ERROR:发生错误,直接跳转Done
    • POLL_TIMEOUT:发生超时,直接跳转到Done
    • 检测到管道有事情发生,则再根据情况做相应处理:
      • 如果检测到管道产生事件,则直接读取管道的数据
      • 如果是其他事件,则处理request,生成对应的response对象,push到response数组
  • 3、进入Done标记位的代码:
    • 先处理Native的Message,调用Native的Handler来处理该Message
    • 再处理Resposne数组,POLL_CALLBACK类型的事件


总结一下 nativePollOnce() 的流程就是

nativeWake()


android_os_MessageQueue_nativeWake()

1
2
3
4
5
6
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
// 将Java层传递下来的mPtr转换为nativeMessageQueue
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
//调用wake函数
nativeMessageQueue->wake();
}
1
2
3
void NativeMessageQueue::wake() {
mLooper->wake();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif

uint64_t inc = 1;
// 向管道mWakeEventFd写入字符1
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
1
2
Looper类的 wake()函数只是往mWakeEventfd中写了一些内容,这个fd只是通知而已,类似于pipi,最后会把epoll_wai唤醒,线程就不阻塞了继续发送
Native层的消息,然后处理之前的addFd事件,然后处理Java层的消息。


总结一下 nativeWake() 的流程就是

Native与Java的对应关系

1
MessageQueue通过mPtr变量保存了NativeMessageQueue对象,从而使得MessageQueue成为Java层和Native层的枢纽,既能处理上层消息,也能处理Native消息,下图列举了Java层与Native层的对应图



  • 1、红色虚线关系:Java层和Native层的MessageQueue通过JNI建立关联,彼此之间能相互调用,搞明白这个互调关系,也就搞明白Java如何调用C代码,C代码如何调用Java代码
  • 2、蓝色虚线关系:Handler/Looper/Message这三大类Java层与Native层并没有任何真正的关系,只是分别在Java层和Native层的handler消息模型中具有相似的功能。都是彼此独立的,各自实现相应的逻辑。
  • 3、WeakMessageHandler继承与MessageHandler类,NativeMessageQueue继承于MessageQueue类。


另外,消息处理流程是先处理NativeMessage,再处理Native Request,最后处理Java Message。理解了该流程也就明白了有时上层消息很少,但响应时间却比较长的真正原因。

总结

  • 1、NativeMessageQueue:在MessageQueue.java的构造函数中,调用了nativeInit创建了NativeMessageQueue对象,并且把指针变量返回给Java层的mPtr。而在NativeMessageQueue的构造函数中,会在当前线程中创建C++的Looper对象。
  • 2、Looper:控制eventfd的读写,通过epoll监听eventfd的变化,来阻塞调用pollOnce和恢复调用wake当前线程
    • 通过 epoll监听其他文件描述符的变化
    • 通过 epoll处理C++层的消息机制,当调用Looper::sendMessageAtTime后,调用wake触发epoll
    • Looper的构造函数,创建一个eventfd(以前版本是pipe),eventfd它的主要用于进程或者线程间的通信,然后创建epoll来监听该eventfd的变化
    • Looper::pollOnce(int timeoutMillis) 内部调用了pollInner,再调用epoll_wait(mEpollFd, …, timeoutMillis)阻塞timeoutMills时间,并监听文件描述符mEpollFd的变化,当时间到了或者消息到了,即eventfd被写入内容后,从epoll_wait继续往下执行,处理epoll_wait返回的消息,该消息既有可能是eventfd产生的,也可能是其他文件描述符产生的。处理顺序是,先处理普通的C++消息队列mMessageEnvelopes,然后处理之前addFd的事件,最后从pollOnce返回,会继续MessageQueue.java的next()函数,取得Java层的消息来处理;
    • Looper类的wake,函数只是往mWakeEventfd中写了一些内容,这个fd只是通知而已,类似pipe,最后会把epoll_wait唤醒,线程就不阻塞了,继续先发送C层消息,然后处理之前addFd事件,然后处理Java层消息

五 . 感谢

1
2
3
4
5
6
7
8
9
10
11
http://gityuan.com/2015/12/26/handler-message-framework/

作者:隔壁老李头
链接:https://www.jianshu.com/p/91a4b797553d
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

作者:Little丶Jerry
链接:https://www.jianshu.com/p/a1d945c4f5a6
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。