Java代理模式与动态代理原理

本文介绍代理模式的应用和Java中动态代理的原理

代理模式

动态代理


2022-04-17 对象行为型模式

Kotlin中suspend和async源码解读

本文主要记录Kotlin中suspend关键字和async函数的使用及源码解读

书接上回Kotlin协程的使用以及launch源码解读

前置知识

对于suspend函数系统会自动为其生成一个Continuation参数,Continuation参数里会携带函数原来定义的返回值,Continuation接口定义如下:

public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

譬如定义如下testSuspend函数

```kotlin
fun testSuspend():String{
	delay(400)
	return ""
}

这个函数在编译后会变为

public static Object testSuspend(Continuation $complete){
	 //....
}

$complete里会携带String信息,这里由于泛型擦除导致泛型被擦除掉。

suspend关键字和withContext函数

suspend和withContext最常见的用法如下:

fun main(args: Array<String>) {
     println("Main Start")
    GlobalScope.launch {
        val startStamp = System.currentTimeMillis()
        println("scope start")
        val userId = getUser()
        println("userid:$userId,当前时间:${System.currentTimeMillis()},耗时:${System.currentTimeMillis()-startStamp}")
        val avatarUrl = getAvatarUrl(userId)
        println("avatar:$avatarUrl,当前时间:${System.currentTimeMillis()},耗时:${System.currentTimeMillis()-startStamp}")
    }
    println("Main end")
    Thread.sleep(4000)
}

suspend fun getUser():Int = withContext(Dispatchers.IO){
    delay(1000)
    1
}

suspend fun getAvatarUrl(userId:Int):String = withContext(Dispatchers.IO){
    delay(2000)
    "https://avatar"
}

输出结果为:
Main Start
Main end
scope start
userid1,当前时间:1649999052854,耗时:1015
avatarhttps://avatar,当前时间:1649999054861,耗时:3022

通过输出结果可以看出getAvatarUrl是在getUser执行完毕后才开始执行的,按Java常理异步请求如果用这种同步的方法去写异步请求那请求是会同时启动的,那协程是如何实现这种同步的方式代码串联异步请求的呢?通过把Kotlin代码转换为字节码我们来研究一下getUser函数是如何实现的,getUser字节码如下:

public static final Object getUser(@NotNull Continuation $completion) {
      return BuildersKt.withContext((CoroutineContext)Dispatchers.getIO(), (Function2)(new Function2((Continuation)null) {
         int label;//状态机流转的标签,初始为0

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               this.label = 1;
               if (DelayKt.delay(1000L, this) == var2) {
                  return var2;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            return Boxing.boxInt(1);
         }

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }

         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      }), $completion);
   }

通过字节码和源码对比可以看出

  1. 虚拟机自动为getUser方法加上了一个名为$completion的Continuation参数
  2. 把源码中的分发器作为参数传给了BuildersKt.withContext方法
  3. 虚拟机自动生成了Function2的对象,Function2中有一个label

所以关键要看BuildersKt.withContext里做了啥,withContext源码如下:

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        // compute new context
        val oldContext = uCont.context
        val newContext = oldContext + context
        // always check for cancellation of new context
        newContext.ensureActive()
        // FAST PATH #1 -- new context is the same as the old one
        if (newContext === oldContext) {
            val coroutine = ScopeCoroutine(newContext, uCont)
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        }
        // FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
        // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
            val coroutine = UndispatchedCoroutine(newContext, uCont)
            // There are changes in the context, so this thread needs to be updated
            withCoroutineContext(newContext, null) {
                return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
            }
        }
        // SLOW PATH -- use new dispatcher
        // 在我们的例子中都会调用到这里                                            
        val coroutine = DispatchedCoroutine(newContext, uCont)
        // 会调用Function2中的create方法,然后用拦截器包一层,最后调用resumeWith进行分发                                              
        block.startCoroutineCancellable(coroutine, coroutine)
        // DispatchedCoroutine在resumeWith调用最后会调uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))恢复上一层的协程                                      
        coroutine.getResult()
    }
}

block.startCoroutineCancellable(coroutine, coroutine)和上书Kotlin协程的使用以及launch源码解读中是一样的

参考文章

Kotlin协程实现原理:挂起与恢复


2022-04-15 协程

Kotlin中线程池处理过程

本文主要记录Kotlin中线程池的调度过程

书接上回Kotlin协程的使用以及launch源码解读

###

参考文章

Kotlin协程实现原理:挂起与恢复


2022-04-15 协程 , 线程池

Kotlin中异常的处理和传播

本文主要记录Kotlin中suspend关键字和async函数的使用及源码解读

书接上回Kotlin协程的使用以及launch源码解读

代码示例

下文中代码均以此示例为例,不再赘述。

初始源码为:

fun main(args: Array<String>) {
    GlobalScope.launch {
        println("scope start")
        val job = withContext(Dispatchers.IO) {
            delay(1000)
            println("with end")
            "1"
        }
        val asyncJob  = async{
            println("async start")
            delay(2000)
            "async end"
        }
        val res = asyncJob.await()
        println("scope end $res")
    }
    println("Main end")
}

输出结果:
Main end
scope start
with end
async start
scope end async end

这里有几个问题:

  1. Main end为什么会最先打印?(launch源码中已经解释了为啥)
  2. 为什么协程能实现asyncJob中的异步调用一定在job中的异步调用执行完毕后再开始执行呢?
  3. 为什么协程最后的scope end打印一定会在asyncJob调用await完毕后再执行?

为了搞懂第2个问题我们先看下suspend源码里是怎么实现的

suspend源码

参考文章

Kotlin协程实现原理:挂起与恢复


2022-04-15 协程 , 异常

整数转字符串

本文记录2022年四月份字节跳动三面的算法题目。

题目描述

把一个整数转成字符串(不考虑负数,整数单位最大为亿),举个例子:

123->一百二十三

1200345->一百二十万零三百四十五

思路

当时做的时候没仔细考虑,妈的上来就开始写,下面贴一下错误思路:

错误思路

建立两个字典,第一个字典存储0-10的数字,第二个字典存储(十、百、千、万、亿)的单位,通过不断取余拿到最后一个数字后查询字典然后进行字符串拼接,这种思路问题有两个:

  1. 不好处理取余后数字为0的情况(需要加一堆if)
  2. 对于高位譬如12000000这种数字不能处理高位的数量,这个例子1200就取不出来

正确思路

先进行字典建立,字典建立过程和上述错误思路是一样的

然后把数字看成多个部分:

  1. 大于1亿的
  2. 大于1万但小于1亿的
  3. 大于1千但小于1万的
  4. 大于1百但小于1千的
  5. 大于20但小于100的
  6. 大于10但小于20的
  7. 0到10

通过对不同部分的单位进行取整得到当前单位的数量,然后在对余数部分进行递归操作,中间当然也要处理取余和取整结果为0的情况。

举个例子:

对于120103206789 这个数字

先对亿取整后为1201,取余后为3206789,字符串为1201亿

再对3206789进行万取整后为320、取余后为6789

6789对千取整为6、取余为789

789对百取整为7、取余为89

89对10取整为8、取余为9

代码

代码更新在我的Github中

整数转字符串源码


2022-04-13 整数、字符串

Kotlin协程的使用以及launch源码解读

本文主要记录Kotlin中协程的使用以及launch方法的源码解读。

代码示例

下文中代码均以此示例为例,不再赘述。

初始源码为:

fun main(args: Array<String>) {
    GlobalScope.launch {
        println("scope start")
        val job = withContext(Dispatchers.IO) {
            delay(1000)
            println("with end")
            "1"
        }
        val asyncJob  = async{
            println("async start")
            delay(2000)
            "async end"
        }
        val res = asyncJob.await()
        println("scope end $res")
    }
    println("Main end")
}

输出结果:
Main end
scope start
with end
async start
scope end async end

这里有几个问题:

  1. Main end为什么会最先打印?
  2. 为什么协程能实现asyncJob中的异步调用一定在job中的异步调用执行完毕后再开始执行呢?
  3. 为什么协程最后的scope end打印一定会在asyncJob调用await完毕后再执行?

为了搞懂这几个问题我们先看下launch源码里是怎么实现的

launch函数

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)//协程上下文,注意这里默认会使用Dispatchs.default作为分发器
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

从源码可知以下几点:

  1. launch是CoroutineScope的扩展函数,在参数里可以设置协程运行时的上下文、协程启动类型以及协程体(就是我们写的那段代码)
  2. 我们写的代码块默认封装在CoroutineScope扩展的一个suspend函数内
  3. launch时会调用start函数启动协程

按默认情况走的话launch函数使用CoroutineStart.DEFAULT,下面看看start函数里做了啥

public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {

    /**
     * Completes execution of this with coroutine with the specified result.
     */
    public final override fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }
   
    /**
     * Starts this coroutine with the given code [block] and [start] strategy.
     * This function shall be invoked at most once on this coroutine.
     * 
     * * [DEFAULT] uses [startCoroutineCancellable].
     * * [ATOMIC] uses [startCoroutine].
     * * [UNDISPATCHED] uses [startCoroutineUndispatched].
     * * [LAZY] does nothing.
     */
    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        start(block, receiver, this)
    }
}

start方法调用了CoroutineStart里的方法

public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            ATOMIC -> block.startCoroutine(receiver, completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit // will start lazily
        }

之后调用block的扩展函数startCoroutineCancellable

/**
 * Use this function to start coroutine in a cancellable way, so that it can be cancelled
 * while waiting to be dispatched.
 */
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }

在startCoroutineCancellable里会调用createCoroutineUnintercepted和intercepted方法

public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
  	//probeCoroutineCreated直接返回completion
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
  			//launch函数的代码体默认会生成SuspendLambda对象,SuspendLambda继承自BaseContinuationImpl,所以会调用到这里
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

SuspendLambda类如下

//注意这里实现了ContinuationImpl和FunctionBase接口
internal abstract class SuspendLambda(
    public override val arity: Int,
    completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
    constructor(arity: Int) : this(arity, null)
}

这里可以看到SuspendLambda里是没有create方法的,往上找到BaseContinuationImpl类里有create方法

internal abstract class BaseContinuationImpl(
    public open fun create(completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Continuation) has not been overridden")
    }

    public open fun create(value: Any?, completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Any?;Continuation) has not been overridden")
    }
}

这里create方法直接抛出异常,其实这里create方法用的是字节码里的create方法,再看看字节码里launch是咋样的

BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
         // $FF: synthetic field
         private Object L$0;
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            //省略其他代码
         }

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);//这里生成的是SuspendLambda对象
            var3.L$0 = value;
            return var3;
         }

         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      }), 3, (Object)null);

这里会生成一个新的Function2函数(前面说过SuspendLambda继承自FunctionBase)并传入Continuation对象,在launch中这个completion对象就是一开始调用协程时生成的StandaloneCoroutine对象,到这里createCoroutineUnintercepted方法就算完事了,接下来看看intercepted方法是咋回事

//默认是ContinuationImpl,所以调用ContinuationImpl里的intercepted方法
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this

再看看intercepted方法

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)

    private var intercepted: Continuation<Any?>? = null

    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }

}

interceptContinuation方法在CoroutineDispatcher类里,interceptContinuation最后会用DispatchedContinuation包装一下生成一个新的Continuation对象,到这里intercepted方法执行完毕

public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)

intercepted调用完后接着会调用Continuation的resumeCancellableWith函数

public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation) //默认是走这里
    else -> resumeWith(result)
}

这里由于前面的层层包装最后调用的是DispatchedContinuation中的resumeCancellableWith方法

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
  
  inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
  			//launch方法默认的是Dispatchers.Default,这里isDispatchNeeded默认是true
        if (dispatcher.isDispatchNeeded(context)) {
          //需要切换线程
            _state = state
            resumeMode = MODE_CANCELLABLE
            dispatcher.dispatch(context, this)//这里可以看成是把block传入线程里去运行
        } else {
          //不需要切换线程
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
                  //这里直接调用continuation.resumeWith(result)方法
                    resumeUndispatchedWith(result)
                }
            }
        }
    }
  
}

这里会判断当前的任务是否需要切换线程再进行分发,如果需要分发则进入线程池的分发流程(线程池具体实现可以看Kotlin协程中的线程池这篇文章),否则直接调用resumeWith方法。在线程池里最后会调用DispatchedContinuation中的run方法,DispatchedContinuation本身没重写run方法,所以往上找到DispatchedTask类,DispatchedTask的run方法如下:

internal abstract class DispatchedTask<in T>(
    @JvmField public var resumeMode: Int
) : SchedulerTask() {
  
   public final override fun run() {
        assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
        val taskContext = this.taskContext
        var fatalException: Throwable? = null
        try {
            val delegate = delegate as DispatchedContinuation<T>
            val continuation = delegate.continuation
            withContinuationContext(continuation, delegate.countOrElement) {
                val context = continuation.context
                val state = takeState() // NOTE: Must take state in any case, even if cancelled
                val exception = getExceptionalResult(state)
                /*
                 * Check whether continuation was originally resumed with an exception.
                 * If so, it dominates cancellation, otherwise the original exception
                 * will be silently lost.
                 */
                val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
                if (job != null && !job.isActive) {
                    val cause = job.getCancellationException()
                    cancelCompletedResult(state, cause)
                    continuation.resumeWithStackTrace(cause)
                } else {
                    if (exception != null) {
                        continuation.resumeWithException(exception)
                    } else {
                        continuation.resume(getSuccessfulResult(state))
                    }
                }
            }
        } catch (e: Throwable) {
            // This instead of runCatching to have nicer stacktrace and debug experience
            fatalException = e
        } finally {
            val result = runCatching { taskContext.afterTask() }
            handleFatalException(fatalException, result.exceptionOrNull())
        }
    }

}

执行run方法时最后一定会调用resumeWith函数,所以不管是需要切换线程还是不需要切换线程最后走的都是resumeWith方法,下面看看resumeWith里是如何实现的

internal abstract class BaseContinuationImpl(
    // This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
    // it has a public getter (since even untrusted code is allowed to inspect its call stack).
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    // This implementation is final. This fact is used to unroll resumeWith recursion.
    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}

这里会用一个循环去调用invokeSuspend函数,invokeSuspend函数是在字节码里重写了的,内部会有一个状态机,当碰到COROUTINE_SUSPENDED时invokeSuspend函数就会return,此时整个launch函数不再运行,等同于被挂起了,此时调用launch的线程就可以往下运行,这就是非阻塞挂起。这里也解释了开篇第一个问题,Main end第一个打印的原因是因为launch默认使用的Dispatchs.default去进行分发,此时我们写的block会分发到Dispatchs.default中的线程池里运行,所以理所应当Main end第一个打印。

剩余的两个问题由于篇幅原因放到下回Kotlin中suspend和async源码解读中去解释

参考文章

Kotlin协程实现原理:挂起与恢复


2022-04-03 协程

Android事件分发机制浅析

本文主要结合源码记录Android中的事件分发机制。


2022-03-22 源码 , 事件分发

Kotlin委托模式以及by关键字用法及原理

本文主要记录Kotlin中委托的使用(类委托、属性委托)以及by关键字的用法和原理。


2022-03-16 委托、by

RxJava的原理以及flatMap和map的使用

本文主要记录RxJava中flatMap和map操作符的使用以及RxJava切换线程的底层原理。

flatMap和map操作符

map的使用方法

Flowable.just("a","ab")
            .map {
                it.length
            }
            .subscribe {

            }

map在这里的作用是把一个String类型的Flowable转化为Int类型的Flowable。

map源码

public final <@NonNull R> Flowable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
        Objects.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new FlowableMap<>(this, mapper));
}

源码中可以看出map把Flowable<T>转化为Flowable<R>,这个R也可以是Flowable类型,在map的方法体内我们必须直接返回R类型的对象,而flatMap则不一样。

flatMap的使用方法

Flowable.just("a","b")
            .flatMap {
                Flowable.just(it.length)
            }
            .subscribe {
            }

flatMap在这里的作用也是把String类型的Flowable转化为Int类型的Flowable,但它与map又有些区别。

flatMap源码

public final <@NonNull R> Flowable<R> flatMap(@NonNull Function<? super T, @NonNull ? extends Publisher<? extends R>> mapper) {
        return flatMap(mapper, false, bufferSize(), bufferSize());
}

源码中可以看出flatMap把Flowable<T>转化为Flowable<R>,这个R也可以是Flowable类型,**在flatMap的方法体内我们必须直接返回一个Flowable类型的对象**。

RxJava原理以及其是如何切换线程的

RxJava切换线程主要依靠subscribeOn(Scheduler)observeOn(Scheduler)两个操作符实现,举个例子:

Flowable.just("a","ab")
            .map {
              	// 运行在io线程
                it.length
            }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe {
            	//运行在主线程
            }

通过查阅源码可知RxJava在组装事件时是通过装饰者模式嵌套对象来组装的,譬如上述例子中在subscribe函数调用前就组装了个:

FlowableObserveOn(FlowableSubscribeOn(FlowableMap(FlowableFromArray)))

类型的对象,其中FlowableObserveOn、FlowableSubscribeOn、FlowableMap、FlowableFromArray都是接口 Publisher的实现类(Flowable的子类),Publisher的源码如下:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Publisher就是被观察者(发布者),专门用来发布事件的。事件组装完毕后调用subscribe方法进行订阅,subscribe方法的源码在Flowable里,核心逻辑如下:

public final void subscribe(@NonNull FlowableSubscriber<? super T> subscriber) {
        try {
            Subscriber<? super T> flowableSubscriber = RxJavaPlugins.onSubscribe(this, subscriber);//这行是为了RxJava中一个hook方法,如果不调用RxJavaPlugins.setOnFlowableSubscribe方法默认返回subscriber自身
            subscribeActual(flowableSubscriber);//抽象方法,此时走到子类具体调用中
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            throw npe;
        }
    }

以上面的例子为例,FlowableObserveOn中的subscribeActual如下所示:

public void subscribeActual(Subscriber<? super T> s) {
        Worker worker = scheduler.createWorker();//在observeOn中定义的工作线程,在这里是主线程

        if (s instanceof ConditionalSubscriber) {
            source.subscribe(new ObserveOnConditionalSubscriber<>(
                    (ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
        } else {
            //默认会走到这里,source是它的上游,在例子中是FlowableSubscribeOn
            source.subscribe(new ObserveOnSubscriber<>(s, worker, delayError, prefetch));
        }
    }

再来看看FlowableSubscribeOn中的subscribe方法(其实也是看subscribeActual方法):

public void subscribeActual(final Subscriber<? super T> s) {
        Scheduler.Worker w = scheduler.createWorker();//subscribeOn定义的工作线程,这里指io线程
        final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<>(s, w, source, nonScheduledRequests);
        s.onSubscribe(sos);//直接执行ObserveOnSubscriber的onSubscribe方法

    		w.schedule(sos);//工作线程处理sos的内容,会调用SubscribeOnSubscriber中的run方法
    }

  1. RxJava在组装事件流时使用的装饰者模式(俗称套娃)
  2. 在进行订阅和发布事件时则是用的观察者模式
  3. observeOn处理下游事件的线程,每用一次会切一次,subscribeOn处理上游事件的线程,只有第一个会生效

2022-03-16 RxJava

let、with、run、apply、also函数的用法

本文主要记录Kotlin中let、with、run、apply和also这几个内联函数的使用方法和底层原理。以下代码都以下列的User类的对象user为例。

class User(var name: String, var age: Int)

let

let源码如下

@kotlin.internal.InlineOnly
public inline fun <T, R> T.let(block: (T) -> R): R {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return block(this)
}

通过源码可以看出let函数是类型T的扩展函数,参数需要传入一个闭包,闭包内返回R类型,最后整个let函数返回R类型对象。

使用方法如下

val res: String = user.let {
        it.name = "越活越年轻"
        it.age = 27
        "123" //处理完后返回的值,可以是任意类型,如果不写默认是Unit对象
    }
 val res1:Unit = user.let { } // 默认返回Unit对象

通过例子可看出let函数内部可以通过it.的方式设置属性,最后还可以选择返回一个值(如果不返回值则默认是unit对象)

with

with源码如下

@kotlin.internal.InlineOnly
public inline fun <T, R> with(receiver: T, block: T.() -> R): R {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return receiver.block()
}

源码中with是个顶层函数,参数有两个,第一个参数需要传入我们需要处理的对象,第二个参数是一个T的扩展函数的闭包,闭包返回R类型,最后整个函数返回R类型。

使用方法如下

val with = with(user){
        this.name = "with来了"
        this.age = 26
        "1234" //处理完后返回的值,可以是任意类型,如果不写默认是Unit对象
    }
val with1:Unit = with(user){ } //默认返回Unit对象

通过例子看出with函数和let功能差不多,但是with是一个顶层函数,可以直接调用。

run

源码如下

@kotlin.internal.InlineOnly
public inline fun <T, R> T.run(block: T.() -> R): R {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return block()
}

run方法是T的一个扩展函数,参数需要传入一个T类型的闭包,闭包内需要返回R类型,最后整个run返回R类型。

用法如下

val run = user.run {
        this.name = "run run run"
        this.age = 3
        "123455" //处理完后返回的值,可以是任意类型,如果不写默认是Unit对象
    }
val run1:Unit = user.run {  } //默认返回Unit对象

run方法基本与let差不多,区别在于run方法传入的参数是T的扩展函数的闭包,而let传入的是参数为T的闭包。

apply

apply源码如下

@kotlin.internal.InlineOnly
public inline fun <T> T.apply(block: T.() -> Unit): T {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    block()
    return this
}

apply方法是T的一个扩展函数,需要传入一个T的扩展函数的闭包,闭包返回Unit类型,最后apply方法返回T类型。

apply的使用方法如下

val apply = user.apply {
        this.name = "apply apply apply"
        this.age = 5
    }
val apply1 = user.apply {  } //返回的还是当前调用的User对象

apply返回的还是自身对象

also

源码如下

@kotlin.internal.InlineOnly
@SinceKotlin("1.1")
public inline fun <T> T.also(block: (T) -> Unit): T {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    block(this)
    return this
}

also方法是T的一个扩展函数,需要传入一个参数为T的闭包,闭包返回Unit对象,also函数最后返回的还是调用对象。

用法如下

    val also = user.also {
        it.name = "also also alse "
        it.age = 7
    }

also返回的和apply一样,都是调用者自身

使用场景

Kotlin系列之let、with、run、apply、also函数的使用

Kotlin 操作符:run、with、let、also、apply 的差异与选择


2022-03-14 内联函数

最近文章