上一篇文章,我介绍了Kotlin协程的创建,使用,协作等内容。本篇将引入更多的使用场景,继续带你走进协程世界。
(相关资料图)
使用协程处理异步数据流
常用编程语言都会内置对同一类型不同对象的数据集表示,我们通常称之为容器类。不同的容器类适用于不同的使用场景。Kotlin的Flow
就是在异步计算的需求下引入的,用于表示异步的数据流。
Flow
“问渠哪得清如许,为有源头活水来”,异步数据流的基本就是以某种方式获得异步数据。Kotlin提供了多种种方式,比较常用的就是Kotlin协程包的asFlow
扩展和flow
构造器。前者是对普通数据集的Flow
化封装,没有更多可言,我们着重来看后者。flow
构造器的主要目标就是产生一个异步数据流,它是一个泛型函数,参数是一个挂起函数,并且是FlowCollector
是扩展函数。这个接口只有一个emit
方法,就是为创建的Flow
提供异步计算的数据的,因为它是挂起函数,所以我们能在里面使用其他挂起函数计算异步值,然后通过emit
方法将值发送出去,如此反复就能为下游操作提供源源不断的数据流了。事情还没完,上面的步骤我们只是规定了创建数据的方式,并没有真正执行,也就是建好了道路,但是还没有车上路。那么,怎样才能让车在路上跑呢,查看Flow
的接口会发现,它提供了collect
方法来处理数据。collect
接收一个挂起函数作为处理逻辑,但是同时,collect
方法本身也是挂起函数,所以,这个方法只能在挂起函数中运行。有了这些知识,我们就可以写出最简单的异步数据流了。
1uspend fun compute():Int{ delay(123) return 1024 } viewModelScope.launch { val flow=flow { emit(9527) emit(compute()) delay(256) emit(256) } flow.collect { println(it) }}
在flow
构造器里面随意做各种操作,只要在必要的时候传递结果就行了,但是需要注意的是,emit
方法只能运行在同一个协程里。乍一看,这样分开写和写在一起并没有本质上的差别,但Flow
还能做到更多。
该给Flow换个工作环境了
上一节,我们那个简单的示例,假如把构造器里面的数据获取方法换成网络请求,应用就歇菜了。因为它们都是运行在主线程里面的。那么这个时候,看过上一篇文章的小伙伴马上就会反应过来,用withContext
方法在构造器里面切换线程就行了哇。思路是很对,因为Flow
的默认配置就是构造器和collect
方法工作在同一线程,既然现在主线程不让运行,那就把构造器的线程切换一下就行了呗。然后事实并不是这样,这样写出来的代码根本无法运行。因为官方提供了唯一的flowOn
方法来切换构造器的执行线程。使用也很简单,就是对创建好的Flow
对象配置一次flowOn
方法就行了。
val flow=["1.jpg","2.jpg"].asFlow()flow.map { decode(it) } .flowOn(Dispatchers.IO)viewModelScope.launch { flow.collect{ adapter.add(it) }
有些中间处理逻辑
熟悉RxJava的小伙伴可能有疑问了,这些操作RxJava也能完成,甚至还有更多的操作符来支持中间状态的处理,那么异步数据流能做到这些吗。毫无疑问,它可以。普通的数据集有map
,filter
等操作方法,对于异步数据流来说,这些方法同样适用。而且这些方法参数都是挂起函数,都可以执行异步操作。而且它还有个更灵活的transform
方法,这个方法可以定制自己的操作符,实现更灵活的数据操作。
当然,上面那些操作符都只能实现单一异步流的操作,对于多数据流的支持,它也同样不在话下。zip
可以将两个两个数据源两两合并起来,合成的数据流长度为两个数据流中最短的那个数据流的长度。combine
则与zip
不同,它会将两个数据流最近的发送数据作为输入,也就是说,假如一块一慢的两个数据源,慢的数据源的元素可能会被多次取到,从而最终的数据流比最短的那个都长。
val flow = flowOf(1, 2).delayEach(10)val flow2 = flowOf("a", "b", "c").delayEach(15)flow.combine(flow2) { i, s -> i.toString() + s }.collect { println(it) // Will print "1a 2a 2b 2c"}
结束状态跟踪
上一节提到,由于数据源和处理逻辑不在同一个地方,所以很难确定最终的数据流大小,进而不知道数据流什么时候处理结束。而且中间操作也可能会改变数据流的大小,由此就更加难以确定数据处理结束的时机了。但是我们有的时候却需要在数据处理完成后做一些操作,该怎么办呢?这个时候当然是该onCompletion
方法上场了。这个方法有一个可为空的Throwable
类型参数,很显然,这可以同时指示两种处理结果,成功或者失败,失败就会将异常对象传递进来。
多个协程共同工作
很多时候,避免不了让多个协程共同工作。对于返回单个值的协程,上一篇我们也提到过了,可以传递async
构造器的返回对象Deferred
,但是局限性就是这个对象只能传递一个值。针对多值传递的情况,Kotlin提供了Channel
的解决方法。Channel
类似于阻塞队列,数据通过send
方法发送出去,在另外的地方使用receive
方法接收。通过这种方法,我们可以极大提供协程的工作效率。利用它就可以轻松实现生产者和消费者模型。
val chanel=Channel() viewModelScope.launch(Dispatchers.IO) { for (i in 1..5){ delay(1000) chanel.send(i) } } viewModelScope.launch { for (i in chanel){ println("Handle ${i}") }}
当然,这只是最简单的用法,还可以加入更多的生产者,或者不再需要数据时取消,甚至还有专门的product
构造器,直接获得返回多个值的协程对象。
总结
Kotlin协程有很多有用的API,这些API覆盖了大部分异步使用的场景。所以在使用协程的时候,我们首先需要明确使用场景,再根据使用场景确定使用哪一套API,这可以使我们避免陷入API恐惧症。为此,我根据这两篇文章的内容,整理出了一份情景表格,实际开发中可以参照使用。Kotlin协程构造器
API | 使用场景 |
---|---|
launch | 执行耗时操作,不需要返回值 |
async | 需要获取耗时操作的单个返回值 |
produce | 需要获取耗时操作的多个返回值 |
Kotlin协程协同工具
API | 使用场景 |
---|---|
Flow | 操作异步数据流 |
Channel | 协程间通信 |
青山不改,绿水长流,咱们下期见!