[리엑티브 연습] AsyncSubject, BehaviorSubject, PublishSubject, ReplaySubject
안녕하세요. 블랙진입니다.
리엑티브 연습 포스팅 입니다.
AsyncSubject
onComplete() 호출 되기 직전의 아이템만을 받습니다. 즉 마지막 데이터만 받아옵니다.
val asyncSubject = AsyncSubject.create<String>().apply {
onNext("0")
subscribe { data -> println("Subscriber #1 -> $data") }
onNext("1")
onNext("2")
subscribe { data -> println("Subscriber #2 -> $data") }
onNext("3")
onNext("4")
}
asyncSubject.onComplete()
결과
Subscriber #1 -> 4
Subscriber #2 -> 4
BehaviorSubject
가장 최근에 발행된 아이템으로 구독을 시작합니다. 또한 초기값을 설정 할 수 있습니다.
val behaviorSubject = BehaviorSubject.createDefault("BlackJin").apply {
//onNext("0")
subscribe { data -> println("Subscriber #1 -> $data") }
onNext("1")
subscribe { data -> println("Subscriber #2 -> $data") }
onNext("2")
}
결과
Subscriber #1 -> BlackJin
Subscriber #1 -> 1
Subscriber #2 -> 1
Subscriber #1 -> 2
Subscriber #2 -> 2
publishSubject
구독 시점 이후부터 발행된 아이템을 받는다.
val publishSubject = PublishSubject.create<String>().apply {
onNext("0")
subscribe { data -> println("Subscriber #1 -> $data") }
onNext("1")
subscribe { data -> println("Subscriber #2 -> $data") }
onNext("2")
}
결과
Subscriber #1 -> 1
Subscriber #1 -> 2
Subscriber #2 -> 2
활용법
구독한 이후 부터 데이터가 필요한 이벤트 버스로 활용할 수 있다.
object RxEventBusHelper {
/**
* Test Count
*/
val testCountBus = PublishSubject.create<Int>()
fun sendTestCount(testCnt: Int) {
testCountBus.onNext(testCnt)
}
}
disposable +=
RxEventBusHelper.testCountBus
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
if (it > 0) {
...
} else {
...
}
}) {
Dlog.e("testCountBus ${it.message}")
}
ReplaySubject
구독 이전의 모든 아이템들을 반환합니다.
val replaySubject = ReplaySubject.create<String>().apply {
onNext("0")
subscribe { data -> println("Subscriber #1 -> $data") }
onNext("1")
subscribe { data -> println("Subscriber #2 -> $data") }
onNext("2")
}
결과
Subscriber #1 -> 0
Subscriber #1 -> 1
Subscriber #2 -> 0
Subscriber #2 -> 1
Subscriber #1 -> 2
Subscriber #2 -> 2