import _ from 'lodash/fp';
import { EMPTY, Observable, of } from 'rxjs';
import { take } from 'rxjs/operators';
import { EEntityStatus } from '../../../redux/redux-entity';
import { ofValue } from './of-value';
import { reject } from './reject';
/*
 * @example
 * toggleStream(() => observable$);
 * @warning
 * doesn't work for instantly completed observables e.g. of(1)
 * */
export const toggleStream = (callback, { closer: closeStream$, opener: openStream$, ignoreSourceCompletion }) => (source$) => {
    // we return an artificial observable replacing the source in pipeline
    return new Observable(observer => {
        let subscriptionGuard;
        let blocked = false;
        const subscriptionClose = closeStream$.subscribe({
            next: () => {
                if (subscriptionGuard) {
                    subscriptionGuard.unsubscribe();
                }
                blocked = true;
            }
        });
        const subscriptionOpen = openStream$.subscribe({
            next: () => {
                blocked = false;
            }
        });
        const subscriptionSource = source$.subscribe({
            // when source emits next
            next: (value) => {
                // console.log('source$.next(...)')
                // source$.next(...)
                // it works like exhaustMap not like switchMap
                if (blocked) {
                    return;
                }
                if (subscriptionGuard) {
                    subscriptionGuard.unsubscribe();
                }
                // return state$ observable
                const bufferStream$ = callback(value);
                // subscribe to switcher e.g. state$.pipe(selector)
                subscriptionGuard = bufferStream$.subscribe(v => {
                    // console.log('replay source$.next() ---> ', v)
                    // unlock pipeline and replay the value emitted by source$.next(value) to go to switchMap in epic
                    observer.next(value);
                });
            },
            // when source emits an error
            error: (err) => {
                // console.log('ERROR')
                observer.error(err);
                // TODO: maybe we can add ignore here to not block the source ignoreSourceCompletion in case of state$ or other observable$ error emmits?
                complete();
            },
            // when source emits complete
            complete: () => {
                // console.log('COMPLETE')
                if (ignoreSourceCompletion) {
                    // we ignore source completion!!! it's important here
                    return;
                }
                complete();
            }
        });
        function complete() {
            // console.log('toggleStream COMPLETED')
            observer.complete();
            if (subscriptionGuard) {
                subscriptionGuard.unsubscribe();
            }
            subscriptionClose.unsubscribe();
            subscriptionOpen.unsubscribe();
            subscriptionSource.unsubscribe();
        }
    });
};
export const waitForState = (state$, selector, config) => {
    const opener = _.pathOr(of(true), ['opener'], config);
    const closer = _.pathOr(EMPTY, ['closer'], config);
    const ignoreSourceCompletion = _.pathOr(false, ['ignoreSourceCompletion'], config);
    const rejectPendingAndFailedReduxEntities = reject((v) => {
        const status = _.path('status', v);
        return [EEntityStatus.idle, EEntityStatus.loading, EEntityStatus.error].includes(status);
    });
    return toggleStream(() => ofValue(state$, selector).pipe(rejectPendingAndFailedReduxEntities, take(1)), {
        opener,
        closer,
        ignoreSourceCompletion
    });
};
