MENU

Flow 入门

January 4, 2022 • 开发

从 LiveData 谈起

LiveData 发布于 2017 年。从名字就能知道它的用途,LiveData 让一个 data 活了过来,成为了一个可被订阅的变量。LiveData 被设计成所有的更新事件都发生在主线程。(非主线程内需要使用 LiveData.postValue 来更新数据)另外它还通过 Lifecycle 组件得到了感知当前页面所处的生命周期的能力,从而实现了只有在前台时才通知变量的更新。这些特点使得 LiveData 成为了刷新 UI 的利器。
现在让我们简单总结下 LiveData 的优缺点:

  • 回调发生在主线程,方便更新 UI,同时也是 LiveData 的局限所在
  • 借助 Lifecycle 可以感知页面生命周期。然而某些情况下却会引入问题:粘性事件
  • LiveData 本身并不防抖,但是可以借助 Transformations.distinctUntilChanged() 实现
public inline fun <X> LiveData<X>.distinctUntilChanged(): LiveData<X> =
    Transformations.distinctUntilChanged(this)

public static <X> LiveData<X> distinctUntilChanged(@NonNull LiveData<X> source) {
    final MediatorLiveData<X> outputLiveData = new MediatorLiveData<>();
    outputLiveData.addSource(source, new Observer<X>() {

        boolean mFirstTime = true;

        @Override
        public void onChanged(X currentValue) {
            final X previousValue = outputLiveData.getValue();
            if (mFirstTime
                    || (previousValue == null && currentValue != null)
                    || (previousValue != null && !previousValue.equals(currentValue))) {
                mFirstTime = false;
                outputLiveData.setValue(currentValue);
            }
        }
    });
    return outputLiveData;
}
  • LiveData 仅能持有单个数据,订阅者只能拿到最新的值,无法获取历史数据
  • LiveData 的值是可空的,虽然这不是什么缺点,但也给编码时增加了一些不必要的判断

再来说说 Flow

Flow 是 Kotlin Coroutines 的一部分,它的调用必须发生在 CoroutineScope 里。这也说明我们无法直接在 Java 中使用 Flow

/**
 * Flow 基本用法
 */
lifecycleScope.launch {
    flow {
        Log.e(TAG, "Start")
        emit("Hello")
        delay(100)
        emit("World")
        delay(100)
        throw Throwable()
    }.flowOn(Dispatchers.IO).catch {
        this.emit("Error Occur")
    }.onCompletion {
        Log.e(TAG, "Done")
    }.collect {
        Log.e(TAG, it)
    }
}

这里有一点值得注意,那就是没调用 collect 之前是没有数据产生的,如果我把这里的 collect 去掉,此时 Log 并不会输出 Start。
这就是冷流 Cold Flow:数据流在没有下游时不会生产数据

The flow being cold means that the [block] is called every time a terminal operator is applied to the resulting flow.

collect 就是其中一个终端操作符:

public final override suspend fun collect(collector: FlowCollector<T>) {
    val safeCollector = SafeCollector(collector, coroutineContext)
    try {
        collectSafely(safeCollector)
    } finally {
        safeCollector.releaseIntercepted()
    }
}

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

从源码可以看出 collect 最终调用了 collectSafely,直时才开始执行我们写的 lambda

与之相对的还有热流 Hot Flow,与冷流不同,热流数据的更新并不依赖下游。下文我们很快就会见到。
在这之前让我们对照上面看看 FlowLiveData 的差异:

  • Flow 可通过 flowOn 切换工作线程,接收线程为调用线程
  • 本身没有感知生命周期的能力,可以通过 lifecycle-ktx 拓展方法 flowWithLifecycle() 实现
lifecycleScope.launch {
    flow {
        for (i in 1..100) {
            delay(100)
            emit(i)
        }
    }.flowWithLifecycle(lifecycle, Lifecycle.State.RESUMED).collect {
        Log.e(TAG, it.toString())
    }
}
  • Flow 不支持防抖,但子类 StateFlow 默认支持防抖
  • Flow 仅保留一个数据,但子类 SharedFlow 支持保留历史数据
  • Flow 没有初始值且可空,但子类 StateFlow 有初始值且空安全

StateFlow

StateFlow 实现了 SharedFlow 接口,还有一个 MutableStateFlow,实现了 MutableSharedFlow 接口。可以说之前的 LiveData 怎么用 StateFlow 就怎么用。(二者都支持 DataBinding)
StateFlowLiveData 不同在于其必须提供一个初始值,并且 StateFlow 保证空安全,最后 StateFlow 支持防抖

private fun updateState(expectedState: Any?, newState: Any): Boolean {
    ...
    synchronized(this) {
        val oldState = _state.value
        if (expectedState != null && oldState != expectedState) return false // CAS support
        if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
        _state.value = newState
        curSequence = sequence
        ...
    }
    ...
}

SharedFlow

SharedFlow 同样有两个:SharedFlowMutableSharedFlow
StateFlow 不同的是它没有初始值,同时更新数据的方法不再是 setValue 而是用 emit()/tryEmit() 方法。SharedFlow 可以在创建时传入 replay 参数,新的订阅者可以获取最近的一系列数据

StateFlow 与 SharedFlow 的选择

简单来讲就是:状态用 StateFlow,事件用 SharedFlow
因为 StateFlow 的空安全特性保证了它一定有值,因此适合数据作为状态使用的场景;
SharedFlow 则可以用来传递事件。一个 replay = 0 的 SharedFlow 就可以避免粘性事件的发生。已经更新过的值不会在订阅时重复出现。了解了两者的区别,剩下的情况其实都可以根据需要具体分析。

Flow 使用中的一些注意点

  • 在 Lifecyle 中 Flow 的 cancel 问题

    • flowWithLifecycle
    • repeatOnLifecycle
    • Compose 同样存在这个问题,需要使用 flowWithLifecycle:
    @Composable
    fun LocationScreen(locationFlow: Flow<Flow>) {
        val lifecycleOwner = LocalLifecycleOwner.current
        val locationFlowLifecycleAware = remember(locationFlow, lifecycleOwner) {
            locationFlow.flowWithLifecycle(lifecycleOwner.lifecycle,Lifecycle.State.STARTED)
        }
    
        val location by locationFlowLifecycleAware.collectAsState()
    
        // Current location, do something with it
    }
  • 粘性事件
  • stateIn、sharedIn 操作符

官方推荐用法:

  1. Expose a StateFlow, using the WhileSubscribed strategy, with a timeout
  2. Collect with repeatOnLifecycle

下面的做法是不正确的:

  1. Expose using WhileSubscribed and collect inside lifecycleScope.launch/launchWhenX
  2. Expose using Lazily/Eagerly and collect with repeatOnLifecycle
lifecycleScope.launchWhenStarted {
    flow<Any> { Any() }.collect {
        // do something
    }
}

上面这段代码是有问题的,因为 Flow 没有被取消掉,而想要取消一个 Flow 需要这样:

val job = lifecycleScope.launchWhenStarted {
    flow<Any> { Any() }.collect {
        // New location! Update the map
    }
}
job.cancel()

有没有更优雅的办法呢?有。那就是 repeatOnLifecycle。由 lifecycle-ktx 提供:

viewLifecycleOwner.lifecycleScope.launch {
    viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED){
        launch {
            flow<Any> { Any() }.collect {
                // do something
            }
        }
        launch {
            flow<Any> { Any() }.collect {
                // do something
            }
        }
    }
}

如果只有一个 Flow 的话更加简单:

viewLifecycleOwner.lifecycleScope.launch {
    flow<Any> { Any() }
        .flowWithLifecycle(viewLifecycleOwner.lifecycle, Lifecycle.State.STARTED)
        .collect {
            // do something
        }
}

这样一来,Flow 就和 LiveData 一样,拥有了感知生命周期的能力,Flow 会在对应的 State 启动和取消。从而避免了资源的浪费。

Tip:在 Fragment 中,所有与 UI 相关的 Scope 都应该使用 viewLifecycleOwner 的,仅在部分没有 View 的 DialogFragment 中才应该使用 lifecycleOwner