cautions!
1. 관례상 stream 변수의 뒤에 $를 붙이곤 합니다.
이 포스트에서는 뗐다가 썼다가 혼용되어 있습니다.
const inputObservable$ = fromEvent(inputDom, 'keyup');
2. 공식문서의 그림(이런 녀석들을 "마블 다이어그램"이라고 부릅니다)을 이해할 수 있어야 합니다!
정보의 흐름을 아는데 그림이 정말 중요합니다.
3. 그 외의 지식들
- 기본적으로 Observable은 cold 합니다. 즉, subscription이 되지 않으면 등록되지 않습니다.
- Observable은 immutable합니다. operator를 거친다고해서 변형되지 않습니다.
무엇보다, 공식 문서 보는게 제일 좋습니다. 거기에 타입이랑 인자 다 써 있어요.
그러니까 이 포스트는 그냥 한 번 쭈욱 읽어보고 버리는게 제일 좋습니다.
Operator?
observable에 전달된 데이터를 조작, 변형하는 친구들입니다.
수 많은 operator가 있는데 6버전의 구분에 따르면 다음과 같이 구분할 수 있습니다.
(문서에 따른 내용입니다. rxjs-dev.firebaseapp.com/guide/operators)
- Creation Operators => 해당 포스트
- Join Creation, Join Operators => darrengwon.tistory.com/1376
- Transformation Operators => darrengwon.tistory.com/1374
- Filtering Operators => darrengwon.tistory.com/1375
- Multicasting Operators
- Error Handling Operators => 해당 포스트
- Utility Operators => 해당 포스트
- Conditional and Boolean Operators
- Mathematical and Aggregate Operators => 해당 포스트
이 많은 operator들을 전부 살피는 것은 무리이므로, 간략하게 살펴보도록하겠습니다.
Create Operator: of, fromEvent, from, interval, timer, range ... etc
뭐 별거 있나요, Observable 생성하는 녀석들입니다.
0) 커스텀 Observable 만들기
이렇게 만드는게 번거로워서 여러가지 create operator를 rxjs에서 제공해주는 겁니다.
import { Observable } from 'rxjs';
// observable
const observable = new Observable(subscriber => {
try {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete(); // 끝
subscriber.next(5); // 끝났음. 출력안됨
}, 1000);
} catch (error) {
subscriber.error(error);
}
});
// observer. 단순히 next, error, complete 핸들링할 함수 객체 덩어리임 ㅋㅋ
observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
// 옵저버가 옵저버블을 구독하도록 시킴+ 옵저버의 함수를 실행시키는 주체는 옵저버블임.
observable.subscribe(observer);
참고조, 옵저버는 다음과 같이 간략한 방식으로 작성할 수도 있습니다.
다만 저는 선호하지 않는 편이어서, 객체 형태로 명확히 넘겨주는 편을 선호하는 편입니다.
// 객체 형태로
observable.subscribe({
next: v => console.log(v),
error: (err) => console.log(err.message),
complete: () => console.log("done")
})
// 간략하게
observable.subscribe(
v => console.log(v),
(err) => console.log(err.message),
() => console.log("done")
)
unsubscribe까지 작성해보았습니다.
이 코드의 경우에는 2초가 지나기도 전에 unsubscribe되었으므로 complete 메서드가 실행되지 않습니다.
unsubscribe는 메모리 릭은 방지하기 위해서라도 꼭 해줘야 합니다.
예를 들어 특정 페이지에서만 사용했다가, 다른 페이지로 이동했으면 unsubscribe 해줘야 합니다.
* 주의 : unsubscribe 메서드 한 번 썼다고 모든 subscribe가 종료되는 건 아닙니다. 만약, 여러 개의 subscription이 있다면 각각 unsubscribe해주어야 합니다.
import { Observable } from "rxjs";
// observable
const observable = new Observable((subscriber) => {
subscriber.next(1);
const id = setTimeout(() => {
subscriber.next(2);
subscriber.complete();
}, 2000);
return () => {
console.log("clearTimeout");
clearTimeout(id);
};
});
// subscriber
const subscription = observable.subscribe({
next: (v) => console.log(v),
complete: () => console.log("done"),
});
// 참고로, unsubscribe한다고 해서 observer의 complete 메서드가 실행되는 건 아님. 그냥 끝남.
subscription.unsubscribe();
1) 정적인 값으로부터 observable 생성: of
const source$ = of("Hello", [1, 2, 3], 64);
source$.subscribe({
next: (x) => console.log(x),
});
// output
"Hello"
[1, 2, 3]
64
2) dom의 event로 부터 observable 생성 : fromEvent
fromEvent로 생성하면 됩니다. 별도의 파이프 달고 싶으면 달아주면 되구요
우선 가장 기본적인 버튼 클릭 이벤트.
const btn = document.querySelector('.btn');
const btnObservable = fromEvent(btn, 'click'); // observable
// observer
btnObservable.subscribe({
next: e => console.log(e),
err: err => console.log(err),
complete: () => console.log('complete'),
});
Rxjs을 특성을 이용하여 곧바로 input의 값을 다른 DOM에 뿌리는 작업을 더욱 쉽게할 수 있습니다.
const inputDom = $('.input');
const outputDom = $('.output');
const inputObservable = fromEvent(inputDom, 'keyup');
inputObservable.subscribe({
next: e => (outputDom.textContent = e.target.value),
err: err => console.log(err),
complete: () => console.log('complete'),
});
마우스의 좌표를 받아오겠습니다. 그런데 매번 일어나면 브라우저가 펑 하고 터져버릴 것 같은 그런 느낌이죠?
throttle을 걸어놓겠습니다.
import { fromEvent, interval } from 'rxjs';
import { throttle } from 'rxjs/operators';
const docMoveObservable = fromEvent(document, 'mousemove');
const pipeObservable = docMoveObservable.pipe(throttle(() => interval(1000)));
pipeObservable.subscribe({
next: e => console.log(e.clientX),
err: err => console.log(err),
complete: () => console.log('complete'),
});
3) 배열로부터 obsrvable 생성하기 : from
from을 사용합니다.
이벤트는 끝나지 않기 때문에 complete를 위한 unscribe를 해주지 않으면 끝나지 않지만,
배열은 끝이 명확하므로 별다른 작업 없이도 complete 메서드가 실행됩니다.
const numbers = [1, 2, 3, 4, 5];
const numberObservable = from(numbers);
numberObservable.subscribe({
next: x => console.log(x),
error: err => console.log(err),
complete: () => console.log('done'),
});
set, map도 잘 굴러가네요
const mySet = new Set(['hello', 34, { name: 'cineps&good', asset: 'people' }]);
const myMap = new Map([['name', 'darren'],['feeling', 'depression']]);
const mysetObv = from(mySet);
const mymapObv = from(myMap);
mysetObv.subscribe({
next: x => console.log(x),
complete: () => console.log('done'),
});
mymapObv.subscribe({
next: x => console.log(x),
complete: () => console.log('done'),
});
4) Promise로부터 Observable 만들기 : fromPromise 없어졌음 ㅋㅋ from 쓰세요
rxjs 버전 5 때는 fromPromise가 있었는데 6 부터는 from으로 통합되었습니다.
data fetching이 아주 편안해졌습니다.
const promiseSource$ = from(
axios.get("https://jsonplaceholder.typicode.com/todos/1")
);
promiseSource$.subscribe({ next: (x) => console.log(x) });
const promise = new Promise((resolve, reject) => {
try {
setTimeout(() => resolve('resolved'), 1000);
} catch (error) {
reject(error.message);
}
});
const PromiseObsv = from(promise);
PromiseObsv.subscribe({
next: x => console.log(x),
error: err => console.log(err),
complete: () => console.log('done'),
});
5) interval, timer, range
interval
setTimeout, setInterval보다 편리하다.
setInterval의 경우 cleaup을 별도로 해줘야 하지만 interval은 complete하면 즉시 cleanup된다.
take 4를 취했으니, 0부터 3까지 출력될 것이다. 끝이 명확하니 complete 메서드는 자동으로 실행된다.
+
timer랑 다른 점이 무엇이냐면, inteval은 첫 시작이 주어진 시간이 지나야 시작합니다.
반면 timer는 일단 주어진 시간에 시작하고, 그 다음으로 주어진 시간 대로
interval(1000) => 1초 지나고부터 1초 간격으로 반복.
timer(0, 1000) => 0초(즉시) 시작하고, 그 다음부터 1초 간격으로 반복.
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
const numbersObsv = interval(1000).pipe(take(4));
numbersObsv.subscribe({
next: x => console.log(x),
complete: () => console.log('done'),
});
timer
일정 시간이 지나면 한번에 다 실행된다.
아, 그리고 옵저버는 귀찮아서 그냥 옵저버블에 체인으로 달아버렸다. 이게 편하더라구요.
// 처음에는 2초, 그 다음부터는 5초 간격으로 출력
timer(2000, 5000).subscribe({ next: (x) => console.log(x) });
import { timer, of } from 'rxjs';
import { concatMapTo } from 'rxjs/operators';
// This could be any observable
const source = of(1, 2, 3);
const result = timer(1000)
.pipe(concatMapTo(source))
.subscribe({
next: x => console.log(x),
complete: () => console.log('done'),
});
range
python range랑 똑같다.
index 관련해서 헷갈리기 쉬운데, 정의 자체가 range(start, count, scheduler)로 되어 있다.
아래 코드는 0부터 5개를 출력한다. 즉, 0, 1, 2, 3, 4를 출력한다.
import { range } from 'rxjs';
range(0, 5).subscribe({
next: x => console.log(x),
complete: () => console.log('done'),
});
Error Handling Operators
catchError, retry, retryWhen
이 세 친구는 보통 엮어서 사용된다.
operator의 pipe 라인 중 어디에 catchError를 배치할 것인지에 대해 주의해야 한다. 한 번 에러가 발생하면, 별도의 retry를 하지 않으면 그대로 옵저버의 error 메서드가 실행되며 옵저버블이 끝나기 때문이다.
catchError의 사용례는 다음과 같다. err는 발생한 err 객체이며, caught는 cathError가 발생한 source이다.
catchError((err, caught) => ...)
err와 caught를 출력해보았다. 음.. 뭐 특별한 건 없다.
그렇다면 사용례를 살펴보도록 하자.
가장 간단한 형태의 catchError의 사용례이다. err가 발생되면 err를 리턴해서 observer의 error 메서드를 발생시키고 끝난다.
click$
.pipe(
switchMap(() =>
ajax("https://api.github.com/users/darrenkwondev").pipe(
catchError((err, caught) => {
console.log(err, caught);
return err;
})
)
)
)
.subscribe({
next: (x) => console.log(x),
error: (err) => console.log(err),
complete: () => console.log("done"),
});
여기서 만약, 단순히 에러 메세지를 발생시키지 않고, 재시도하길 원한다면, 에러가 발생한 옵저버블을 다시 리턴하면 됩니다.
그러나 단순이 아래와 같이 작성한다면 무한히 retry하게 됩니다.
click$
.pipe(
switchMap(() =>
ajax("https://api.github.com/users/darrenkwondev").pipe(
catchError((err, caught) => {
console.log(err, caught);
return caught; // 무한히 retry함. 왜? 에러가 발생한 obsv를 다시 반환하기 떄문
})
)
)
)
네트워크 탭을 열어보면 아래처럼 계속해서, 에러가 발생하는 observable을 재실행함을 확인할 수 있다.
이는 명백히 메모리 누수이며 치명적인 결과를 가져온다.
Utility Operators
tap
값을 단순히 확인하고 싶을 때 사용합니다.
값을 도중에 확인하고 싶은 경우에는 map이 아닌 tap을 이용하도록합시다.
tab(observer) 꼴로 작성하기 때문에 도중에 발생한 error, complete를 캐치할 수도 있습니다.
of(1, 2, 3, 4)
.pipe(
tap((v) => console.log("before", v)),
map((v) => v * 2),
tap((v) => console.log("after", v))
)
.subscribe(console.log);
Mathematical and Aggregate Operators
reduce
일반적으로 써 왔던 reduce의 역할입니다.
of(1, 2, 3, 4)
.pipe(
reduce((acc, cur, i) => {
console.log(`${i}번째 acc : ${acc}, cur: ${cur}`);
return acc + cur;
}, 0)
)
.subscribe(console.log);
count, max, min
설명이 필요 없죠
of(1, 2, 3, 4).pipe(count()).subscribe(console.log); // 4 4개니까
of(1, 2, 3, 4).pipe(max()).subscribe(console.log); // 4 제일 크니까
of(1, 2, 3, 4).pipe(min()).subscribe(console.log); // 1 제일 작으니까