import { ActionsObservable, ofType, StateObservable } from 'redux-observable';
import { RootState } from 'redux/reducers';
import { set } from 'redux/rest/actions';
import { defer, from, ObservableInput, ObservedValueOf, of, OperatorFunction } from 'rxjs';
import {
  catchError,
  concatMap,
  delay,
  filter as filterOperator,
  map,
  mergeMap,
  repeatWhen,
  switchMap,
  takeUntil,
} from 'rxjs/operators';
import { Action, AsyncActionCreators } from 'typescript-fsa';
import { ActionCreator } from 'typescript-fsa/src/index';

/**
 *
 * @param filterFn the filter function. If returns true, the action will be discarded.
 * @param project The mergeMap's project function. The logic will be executed only if filterFn returns false
 */

export function mergeMapFiltered<T extends Action<any>, O extends ObservableInput<any>>(
  filterFn: (value: T) => boolean,
  project: (value: T, index: number) => O,
): OperatorFunction<T, ObservedValueOf<O>> {
  return filtered(filterFn, project, mergeMap);
}

export function concatMapFiltered<T extends Action<any>, O extends ObservableInput<any>>(
  filterFn: (value: T) => boolean,
  project: (value: T, index: number) => O,
): OperatorFunction<T, ObservedValueOf<O>> {
  return filtered(filterFn, project, concatMap);
}

export function filtered<T extends Action<any>, O extends ObservableInput<any>>(
  filterFn: (value: T) => boolean,
  project: (value: T, index: number) => O,
  mappingFunction: typeof concatMap,
): OperatorFunction<T, ObservedValueOf<O>> {
  return mappingFunction((action, index) => {
    if (!filterFn(action)) {
      return of(
        set({
          type: action.type.replace('_STARTED', '_DONE'),
          payload: { params: action.payload, result: undefined! },
        }),
      ) as unknown as O;
    }
    return project(action, index);
  });
}

const POLLING_INTERVAL = 5000;

export const mergeMapEpic = <S, T, U>({ asyncAction, api, filter }: Omit<GenericEpicParams<S, T, U>, 'mapFunction'>) =>
  genericEpic({ asyncAction, api, mapFunction: mergeMapFiltered, filter });

export interface GenericEpicParams<S, T, U> {
  asyncAction: AsyncActionCreators<S, T, U>;
  api: (action: Action<S>) => Promise<T>;
  mapFunction: typeof mergeMapFiltered | typeof concatMapFiltered;
  filter?: (action: Action<S>, state$: StateObservable<RootState>) => boolean;
}

const genericEpic =
  <S, T, U>({ asyncAction, api, mapFunction, filter }: GenericEpicParams<S, T, U>) =>
  (action$: ActionsObservable<ReturnType<(typeof asyncAction)['started']>>, state$: StateObservable<RootState>) =>
    action$.pipe(
      ofType(asyncAction.started.type),
      mapFunction(
        action => (filter ? filter(action, state$) : true),
        action =>
          from(api(action)).pipe(
            map(result =>
              asyncAction.done({
                params: action.payload,
                result,
              }),
            ),
            catchError(error =>
              of(
                asyncAction.failed({
                  params: action.payload,
                  error,
                }),
              ),
            ),
          ),
      ),
    );

export const genericPolling =
  <V, R, E>(
    api: (arg: ReturnType<ActionCreator<V>>) => Promise<R>,
    startPollingAction: ActionCreator<V>,
    stopPollingAction: ActionCreator<V>,
    fetchAction: AsyncActionCreators<V, R, E>,
    { singleFetch } = { singleFetch: false },
  ) =>
  (action$: ActionsObservable<ReturnType<ActionCreator<V>>>) =>
    action$.pipe(
      ofType(startPollingAction.type),
      (singleFetch ? switchMap : mergeMap)(action =>
        defer(() => api(action)).pipe(
          map((result: R) =>
            fetchAction.done({
              params: action.payload,
              result,
            }),
          ),
          catchError(error =>
            of(
              fetchAction.failed({
                params: action.payload,
                error,
              }),
            ),
          ),
          repeatWhen(done => done.pipe(delay(POLLING_INTERVAL))),
          takeUntil(
            action$
              .ofType(stopPollingAction.type)
              .pipe(filterOperator(stopAction => action.payload === stopAction.payload)),
          ),
        ),
      ),
    );
