RxJava的一些线程问题

RxJava上提供了一系列的线程池,用起来确实很方便,不过随便用的话很容易遇到坑。

坑1:Schedulers.computation()

Schedulers.computation()在RxJava中定义为用于计算任务的线程池,其线程数等于当前运行环境的CPU核心数。比如在我的华为Mate9 Pro中,其线程数为8。

事实上RxJava中很多操作符都默认使用computation()线程池,比如timer()interval()intervalRange()等。

最近有同事跟我说,使用timer()interval()的时候,有几率不执行。其实根本原因是其他地方也使用了computation()中的线程,并且其线程池中的所有线程都被占用了,必须等到computation()中有空闲的线程后才执行后面的任务。

所以使用到默认computation()线程的操作符时,最好直接设置io()或者newThread()线程。

坑2:Schedulers.trampoline()

Schedulers.trampoline()在RxJava中定义为在当前线程中执行,如果同时存在多个任务,则根据FIFO原则执行任务。

上面说到有同事遇到timer()不执行的情况,所以他实际操作的时候使用这种写法:

1
2
3
4
5
6
Observable.timer(5, TimeUnit.MINUTES, Schedulers.trampoline())
.subscribeOn(Schedulers.io())
.observerOn(AndroidSchedulers.mainThread())
.subscribe(() -> {
//TODO
});

同时由于项目中设置了RxJavaPlugins.setErrorHandler(),所以一开始执行没什么问题。后来我去掉了这个设置之后,就开始频繁崩溃了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
io.reactivex.exceptions.UndeliverableException: java.lang.InterruptedException
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:349)
at io.reactivex.internal.schedulers.TrampolineScheduler.scheduleDirect(TrampolineScheduler.java:64)
at io.reactivex.internal.operators.observable.ObservableTimer.subscribeActual(ObservableTimer.java:38)
at io.reactivex.Observable.subscribe(Observable.java:11040)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:463)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1162)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:636)
at java.lang.Thread.run(Thread.java:764)
Caused by: java.lang.InterruptedException
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:373)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:395)
at io.reactivex.internal.schedulers.TrampolineScheduler.scheduleDirect(TrampolineScheduler.java:60)
... 11 more
java.lang.InterruptedException
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:373)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:395)
at io.reactivex.internal.schedulers.TrampolineScheduler.scheduleDirect(TrampolineScheduler.java:60)
at io.reactivex.internal.operators.observable.ObservableTimer.subscribeActual(ObservableTimer.java:38)
at io.reactivex.Observable.subscribe(Observable.java:11040)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:463)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1162)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:636)
at java.lang.Thread.run(Thread.java:764)

要理解为什么会崩溃,首先要从timer()的实现去入手:

1
2
3
4
5
6
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");

return RxJavaPlugins.onAssembly(new ObservableTimer(Math.max(delay, 0L), unit, scheduler));
}

其实现为ObservableTimer,只需要看其中的subscribeActual()方法实现即可:

1
2
3
4
5
6
7
8
9
@Override
public void subscribeActual(Observer<? super Long> s) {
TimerObserver ios = new TimerObserver(s);
s.onSubscribe(ios);

Disposable d = scheduler.scheduleDirect(ios, delay, unit);

ios.setResource(d);
}

可以看到,其实现为调用传入的SchedulerscheduleDirect()方法。上面代码中传入的为Schedulers.trampoline(),其scheduleDirect()的实现:

1
2
3
4
5
6
7
8
9
10
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
try {
unit.sleep(delay);
RxJavaPlugins.onSchedule(run).run();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
RxJavaPlugins.onError(ex);
}
return EmptyDisposable.INSTANCE;
}

上面的代码表示当timer()传入Schedulers.trampoline()时,其延时效果通过当前线程的sleep()实现。

作为对比,看一下Schedulers.computation()scheduleDirect()实现:

1
2
3
4
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
PoolWorker w = pool.get().getEventLoop();
return w.scheduleDirect(run, delay, unit);
}

PoolWorker继承了NewThreadWorker,其scheduleDirect()的实现为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
try {
Future<?> f;
if (delayTime <= 0L) {
f = executor.submit(task);
} else {
f = executor.schedule(task, delayTime, unit);
}
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}

代码里的executor为线程池的实例,所以当timer()传入Schedulers.computation()时,其延时效果通过线程池的任务计划调度执行。

再回头看出现的崩溃,经试验,其崩溃的时机在于timer()还未执行,然后手动调用dispose()取消其执行时出现的。所以还需要看下dispose()的具体实现,其被调用时实际是调用ScheduledRunnabledispose()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void dispose() {
for (;;) {
Object o = get(FUTURE_INDEX);
if (o == DONE || o == SYNC_DISPOSED || o == ASYNC_DISPOSED) {
break;
}
boolean async = get(THREAD_INDEX) != Thread.currentThread();
if (compareAndSet(FUTURE_INDEX, o, async ? ASYNC_DISPOSED : SYNC_DISPOSED)) {
if (o != null) {
((Future<?>)o).cancel(async);
}
break;
}
}

for (;;) {
Object o = get(PARENT_INDEX);
if (o == DONE || o == PARENT_DISPOSED || o == null) {
return;
}
if (compareAndSet(PARENT_INDEX, o, PARENT_DISPOSED)) {
((DisposableContainer)o).delete(this);
return;
}
}
}

即最终的实现是通过调用线程池中对应任务的Futurecancel()方法,在当前例子中其cancel()方法传入的为true,即会执行中断线程的操作。

回到开头的例子,在传入Schedulers.trampoline()后,又通过subscribeOn()将任务放入Schdulers.io()的线程中执行,即其在io()线程中调用了sleep()方法。紧接着在调用dispose()方法时,将当前io()线程的Future取出并执行cancel(true)。如果当前线程中正在sleep(),则会直接抛出InterruptedException异常来中断操作。而在trampoline()的内部实现中,InterruptedException不会被截获到原来的Observable的onError中,而是直接抛到RxJavaPluginsonError()中处理,所以在取消设置RxJavaPlugins.setErrorHandler()后,就会出现崩溃。

总结

  1. 使用默认computation()线程池实现的操作符时,最后另外设置线程进去。
  2. 尽量不要使用trampoline()来实现延时操作
  3. 日常开发时不要设置RxJavaPlugins.setErrorHandler()
  4. 线上正式包中最好设置RxJavaPlugins.setErrorHandler()
MediaCodec编码摄像头数据时控制帧率的方法 Flutter先导篇

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×