일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- 안드로이드
- 함수형 프로그래밍
- 한단어의힘
- 프래그먼트
- 끝말잇기
- 한달독서
- 목적중심리더십
- 북한살둘레길
- 소프시스
- 슬기로운 온라인 게임
- 좌식테이블
- 테트리스
- T자형인재
- 목적 중심 리더십
- 재택근무
- 한달브런치북만들기
- 커스텀린트
- 아비투스
- 리얼하다
- 면접
- 자취필수템
- 베드테이블
- 한달어스
- 베드트레이
- 1일1커밋
- 소프시스 밤부 좌식 엑슬 테이블
- 캐치마인드
- 지지않는다는말
- 어떻게 나답게 살 것인가
- 브런치작가되기
- Today
- Total
정상에서 IT를 외치다
[리액티브 프로그래밍] 리액티브 연산자의 활용 본문
안녕하세요. 블랙진입니다.
한빛미디어 RxJava프로그래밍을 보며 리액티브를 공부한 내용을 정리하기 위한 세번째 포스팅입니다.
Chapter 4
4.1 생성연산자
4.1.1 interval() 함수
/**
* 일정 시간 간격으로 데이터 흐름을 생성합니다.
* 현재 스레드가 아닌 스케줄러에서 실행
*/
public class IntervalExample {
public void example() {
Observable<String> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map(data -> (data + 1) * 100)
.map(Object::toString)
.take(5);
source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}
(결과)
RxComputationThreadPool-1: 100
RxComputationThreadPool-1: 200
RxComputationThreadPool-1: 300
RxComputationThreadPool-1: 400
RxComputationThreadPool-1: 500
public class CommonUtils {
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
CommonUtils.sleep() 함수가 있습니다. 이는 Thread.sleep(millis) 함수로 millis초 동안 실행중인 쓰레드를 정지하라는 명령입니다.
위 예제 에서는 1000ms 동안 메인 쓰레드가 중지됩니다. 메인 스레드가 중지되면서 스케줄러가 동작되는 원리입니다.
4.1.2 timer() 함수
/**
* interval() 함수와 유사하지만 한 번만 실행하는 함수입니다.
* 일정 시간이 지난 후에 한 개의 데이터를 발행하고 onComplete() 이벤트가 발생합니다.
*
* 현재 스레드가 아닌 스케줄러에서 실행
*/
public class TimerExample {
public void example() {
Observable<String> source = Observable.timer(500L, TimeUnit.MILLISECONDS)
.map(notUser -> {
return new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
.format(new Date());
});
source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}
4.1.3 range() 함수
/**
* 주어진 값(n)부터 m개의 Integer 객체를 발행합니다.
*/
public void example() {
Observable<Integer> source = Observable.range(1, 10)
.filter(number -> number % 2 == 0);
source.subscribe(System.out::println);
}
4.1.4 intervalRange() 함수
/**
* interval()과 range()를 혼합해놓은 함수입니다. interval() 함수 처럼 일정한 시간 간격으로 값을 출력하지만
* range() 함수처럼 시작 숫자(n)로부터 m개 만큼의 값만 생성하고 onComplete 이벤트가 발생합니다.
*/
public void example() {
Observable<Long> source = Observable.intervalRange(
1,
5,
100L,
100L,
TimeUnit.MILLISECONDS);
source.subscribe(System.out::println);
CommonUtils.sleep(1000);
}
4.1.5 defer() 함수
/**
* timer 함수와 비슷하지만 데이터 흐름 생성을 구독자가 subscribe() 함수를 호출할 때까지 미룰 수 있습니다.
*/
(원형)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier) {
ObjectHelper.requireNonNull(supplier, "supplier is null");
return RxJavaPlugins.onAssembly(new ObservableDefer<T>(supplier));
}
SchedulerSupport(value="none") 이므로 현재 스레드 (main 스레드) 에서 실행되며 인자로는 Callable<Observable<T>>를 받습니다.
4.1.6 repeat() 함수
/**
* 단순히 반복 실행을 합니다.
*/
public void example() {
String[] balls = {"1","3","5"};
Observable<String> source = Observable.fromArray(balls)
.repeat(3); // 3번 반복 실행합니다.
source.subscribe(System.out::println);
}
4.2 변환 연산자
4.2.1 concatMap() 함수
/**
* flatMap() 함수와 매우 비슷합니다. flatMap() 는 먼저 들어온 데이터를 처리하는 도중에 새로운 데이터가 들어오면
* 나중에 들어온 데이터의 처리 결과가 먼저 출력될 수도 있습니다. 이를 이터리밍(interleaving) 이라고 합니다.
*
* concatMap() 함수는 먼저 들어온 데이터 순서대로 처리해서 결과를 낼 수 있도록 보장해줍니다.
*/
public class ConcatMap {
public void example() {
// 시간을 측정하기 위해 호출
CommonUtils.exampleStart();
String[] balls = {"1","3","5"};
//100ms 간격으로 interval() 함수를 호출합니다.
Observable<String> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
// 0 부터 발생하는 Long 객체 값을 Integer 값으로 변환합니다.
.map(Long::intValue)
// 숫자를 "1","3","5" 문자열로 변환합니다.
.map(idx -> balls[idx])
// 3개의 값만은 가져옵니다.
.take(balls.length)
// 입력인 "1","3","5 은 100ms 간격으로 발생하지만 출력인 별은 200ms 간격으로 발생하기 때문에
// 입력과 출력의 순서가 역전될 수 있습니다.
// 그것을 concatMap 함수가 잡아줍니다.
.concatMap(ball -> Observable.interval(200L, TimeUnit.MILLISECONDS)
.map(notUser -> ball + "☆")
.take(2)
);
source.subscribe(MyLog::LogTime);
CommonUtils.sleep(2000);
}
}
4.2.2 switchMap() 함수
/**
* concatMap() 함수가 인터러빙 발생할 수 있는 상황에서 동작의 순서를 보장해준다면
* switchMap() 함수는 순서를 보장하기 위해 기존에 진행 중이던 작업을 바로 중단합니다.
* 여러 개의 값이 발행되었을 때 마지막에 들어온 값만 처리하고 싶을 때 사용합니다.
* 중간에 끊기더라도 마지막 데이터의 처리는 보장하기 때문입니다.
*/
public class SwitchMap {
public void example() {
// 시간을 측정하기 위해 호출
CommonUtils.exampleStart();
String[] balls = {"1","3","5"};
//100ms 간격으로 interval() 함수를 호출합니다.
Observable<String> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
// 0 부터 발생하는 Long 객체 값을 Integer 값으로 변환합니다.
.map(Long::intValue)
// 숫자를 "1","3","5" 문자열로 변환합니다.
.map(idx -> balls[idx])
// 3개의 값만은 가져옵니다.
.take(balls.length)
// 중간 결과 확인용 함수
.doOnNext(MyLog::LogTime)
// SwitchMap 함수로 처리합니다.
.switchMap(ball -> Observable.interval(200L, TimeUnit.MILLISECONDS)
.map(notUser -> ball + "☆")
.take(2)
);
source.subscribe(MyLog::LogTime);
CommonUtils.sleep(2000);
}
}
4.2.3 GroupBy() 함수
/**
* 어떤 기준으로 단일 Observable 을 여러개로 이루어진 Observable 그룹으로 만듭니다.
*
* map() 함수는 1개의 데이터를 다른 값이나 다른 타입으로 변환해줍니다.
* flatMap() 함수는 1개의 값을 받아서 여러 개의 데이터(Observable)로 확장해줍니다.
* groupBy() 함수는 값들을 받아서 어떤 기준에 맞는 새로운 Observable 다수를 생성합니다.
*/
public class GroupBy {
public void example() {
String[] objs = {"6","4","2-T","2","6-T","4-T"};
// GroupedObservable 클래스는 Observable 과 동일하지만 getKey() 라는 메서드를 제공하여
// 구분된 그룹을 알 수 있게 해줍니다.
Observable<GroupedObservable<String, String>> source =
Observable.fromArray(objs).groupBy(CommonUtils::getShape);
source.subscribe(obj ->
obj.subscribe(
val -> System.out.println("GROUP:" + obj.getKey() + "\t Value: " + val)
));
}
}
4.2.3 Scan() 함수
/**
* reduce() 함수와 비슷합니다.
* reduce() 함수는 Observable 에서 모든 데이터가 입력된 후 그것을 종합하여 마지막 1개의 데이터만을 구독자에게 발행했습니다.
* scan() 함수는 실행할 때마다 입력값에 맞는 중간 결가 및 최종 결과를 구독자에게 발행합니다.
*/
public class Scan {
public void example() {
String[] balls = {"1","3","5"};
Observable<String> source = Observable.fromArray(balls)
.scan((ball1, ball2) -> ball2 + "(" + ball1 + ")");
source.subscribe(MyLog::Log);
}
}
4.3 결합 연산자
4.3.1 zip() 함수
/**
* 각각의 Observable 을 모두 활용해 2개 혹은 그 이상의 Observable 을 결합합니다.
*
* Zip() 함수는 최대 9개의 Observable 을 결합할 수 있지만 보통 2개 혹은 3개면 충분합니다.
*/
public class Zip {
public void example() {
String[] shapes = {"BALL","PENTAGON","STAR"};
String[] coloredTriangles = {"2-T","6-T","4-T"};
Observable<String> source = Observable.zip(
// 모양을 가져옵니다.
Observable.fromArray(shapes).map(Shape::getSuffix),
// 색상을 가져옵니다.
Observable.fromArray(coloredTriangles).map(Shape::getColor),
(suffix, color) -> color + suffix);
source.subscribe(MyLog::Log);
}
}
4.3.2 combineLatest() 함수
/**
* 2개 이상의 Observable 을 기반으로 Observable 각각의 값이 변경되었을 때 갱신해주는 함수입니다.
* 마지막 인자로 combiner 가 들어가는데 그것이 각 Observable 을 결합하여 어떤 결과를 만들어주는 역활을 합니다.
*
* 예를 들어 첫 번째 Observable 과 두 번째 Observable 을 결합하는 기능을 만든다고 하면
* 첫 번째 Observable 값 혹은 두 번째 Observable 의 값이 변경되었을 때 그 값을 자동으로 갱신해줍니다.
*/
public class CombineLatest {
public void example() {
String[] data1 = {"6","7","4","2"};
String[] data2 = {"DIAMOND","STAR","PENTAGON"};
Observable<String> source = Observable.combineLatest(
Observable.fromArray(data1)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS),
(shape, notUser) -> Shape.getColor(shape)),
Observable.fromArray(data2)
.zipWith(Observable.interval(150L, 200L, TimeUnit.MILLISECONDS),
(shape, notUser) -> Shape.getSuffix(shape)), (v1, v2) -> v1 + v2
);
source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}
4.3.3 merge() 함수
/**
* 입력 Observable 의 순서와 모든 Observable 이 데이터를 발행하는지 등에 관여하지 않고
* 어느 것이든 업스트림에서 먼저 입력되는 데이터를 그래로 발행합니다.
*/
public class Merge {
public void example() {
String[] data1 = {"1","3"};
String[] data2 = {"2","4","6"};
Observable<String> source1 = Observable.interval(0L, 100L, TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(idx -> data1[idx])
.take(data1.length);
Observable<String> source2 = Observable.interval(50, TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(idx -> data2[idx])
.take(data2.length);
Observable<String> source = Observable.merge(source1, source2);
source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}
4.3.3 concat() 함수
**
* 2개 이상의 Observable 을 이어 붙여주는 함수입니다.
* 첫 번째 Observable 에 onComplete 이벤트가 발생하야 두 번 째 Observable 을 구독합니다.
*/
public class Concat {
public void example() {
Action onCompleteAction = () -> MyLog.Log("onComplete()");
String[] data1 = {"1","3","5"};
String[] data2 = {"2","4","6"};
Observable<String> source1 = Observable.fromArray(data1)
.doOnComplete(onCompleteAction);
Observable<String> source2 = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map(Long::intValue)
.map(idx -> data2[idx])
.take(data2.length)
.doOnComplete(onCompleteAction);
Observable<String> source= Observable.concat(source1, source2)
.doOnComplete(onCompleteAction);
source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}
4.4 조건연산자
4.4.1 amb() 함수
/**
* 여러개의 Observable 중에서 1개의 Observable 을 선택하는데,
* 선택 기준을 가장 먼저 데이터를 발행하는 Observable 입니다.
* 이후에 나머지 Observable 에서 발행하는 데이터는 모두 무시합니다.
*/
public class Amb {
public void example() {
String[] data1 = {"1","3","5"};
String[] data2 = {"2-R","4-R"};
List<Observable<String>> sources = Arrays.asList(
Observable.fromArray(data1)
.doOnComplete(() -> MyLog.Log("Observable #1 : onComplete()")),
Observable.fromArray(data2)
.delay(100L, TimeUnit.MILLISECONDS)
.doOnComplete(() -> MyLog.Log("Observable #2 : onComplete()"))
);
Observable.amb(sources)
.doOnComplete(() -> MyLog.Log("Result : onComplete()"))
.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}
4.4.2 takeUntil() 함수
/**
* take() 함수에 조건을 설정할 수 있습니다.
* take() 함수처럼 일정 개수만 값을 발행하되 완료 기준을 다른 Observable 에서 값을 발행하는지고 판단하는 것입니다.
*/
public class TakeUntil {
public void example() {
String[] data = {"1","2","3","4","5","6"};
Observable<String> source = Observable.fromArray(data)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (val, notUsed) -> val)
.takeUntil(Observable.timer(500L, TimeUnit.MILLISECONDS));
source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}
4.4.3 skipUntil() 함수
/**
* takeUntil() 과 정반대의 함수입니다.
* other Observable 을 인자로 받다는 점은 같지만 Observable 에서 데이터를 발행할 때까지 값을 건너뜁니다.
*/
public class SkipUntil {
public void example() {
String[] data = {"1","2","3","4","5","6"};
Observable<String> source = Observable.fromArray(data)
.zipWith(Observable.interval(100L, TimeUnit.MILLISECONDS), (val, notUser) -> val)
.skipUntil(Observable.timer(500L, TimeUnit.MILLISECONDS));
source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}
4.4.4 all() 함수
/**
* 주어진 조건에 100% 맞을 때만 true 값을 발행하고 조건에 맞이 않는 데이터가 발행되면
* 바로 false 값을 발행합니다.
*/
public class All {
public void example() {
String[] data = {"1","2","3","4"};
Single<Boolean> source = Observable.fromArray(data)
.map(Shape::getShape)
.all(Shape.BALL::equals);
source.subscribe(MyLog::Log);
}
}
4.5 수학 및 기타 연산자
4.5.1 수학 함수
4.5.2 delay() 함수
/**
* RxJava 에는 시간을 다루는 함수들이 많습니다.
*
* 주기적으로 Observable 에서 값을 발행해주는 interval() 함수
* 일정 시간이 지난 후 값을 발행해주는 timer() 함수
* Callable 을 등록해두고 실행을 지연하는 defer() 함수
*
* 앞서 소개한 세 가지 함수가 Observable 을 생성하는 역활이라면
* delay() 함수는 유틸리티 연산자로서 보조 역활을 합니다.
*/
public class Delay {
public void example() {
String[] data = {"1","7","2","3","4"};
Observable<String> source = Observable.fromArray(data)
.delay(100L, TimeUnit.MILLISECONDS);
source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}
4.5.3 timeInterval() 함수
/**
* 어떤 값을 발행했을 때 이전 값을 발행한 이후 얼마나 시간이 흘렀는지를 알려줍니다.
*/
public class TimeInterval {
public void example() {
String[] data = {"1","3","7"};
CommonUtils.exampleStart();
Observable<Timed<String>> source = Observable.fromArray(data)
.delay(item -> {
CommonUtils.doSomething();
return Observable.just(item);
})
.timeInterval();
source.subscribe(MyLog::Log);
CommonUtils.sleep(1000);
}
}
'안드로이드' 카테고리의 다른 글
[Android,Material with refresh] 머테리얼 변형 예제 (0) | 2019.01.02 |
---|---|
[Android, MVVM] ViewModel, LiveData, DataBinding, Koin 을 사용한 MVVM (3) | 2018.12.27 |
[리액티브 프로그래밍] 리액티브 연산자 입문 (0) | 2018.12.19 |
[리액티브 프로그래밍] 리액티브 프로그래밍 이란? (0) | 2018.12.18 |
[Android, Databinding] 데이터 바인딩 이벤트 처리 (0) | 2018.12.17 |