import {
	BehaviorSubject,
	ConnectableObservable, Observable,
	defer,
	throwError
} from 'rxjs';
import {
	catchError, finalize, map,
	publishBehavior,
	publishReplay, take, tap
} from 'rxjs/operators';

const connectReplay = <T>(observable: Observable<T>) => {
	const connectedObservable = observable.pipe(
		publishReplay(1)
	) as ConnectableObservable<T>;
	connectedObservable.connect();
	return connectedObservable;
};

const connectBehavior = <T>(
	observable: Observable<T>,
	value: T
): Observable<T> => {
	const connectedObservable = observable.pipe(
		publishBehavior(value)
	) as ConnectableObservable<T>;
	connectedObservable.connect();
	return connectedObservable;
};
const getLatestAndUpdate =
	<T>(subj: BehaviorSubject<T>) =>
	(fn: (t: T) => T) =>
		subj.pipe(take(1), map(fn)).subscribe((x) => subj.next(x));

function cpFinalizeSuccess<T>(callback: (value?: T) => void) {
	return (source: Observable<T>) =>
		defer(() => {
			let hasError = false;
			let result = null;
			return source.pipe(
				tap((res) => {
					result = res;
				}),
				catchError((value) => {
					hasError = true;
					return throwError(value);
				}),
				finalize(() => !hasError && callback(result))
			);
		});
}

export const ObservableUtil = {
	connectReplay,
	connectBehavior,
	getLatestAndUpdate,
	finalizeSuccess: cpFinalizeSuccess,
};
