import { Observable, ReplaySubject, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
/*
 * The reason for this operator is that a standard shareReplay prevents ajax requests to be aborted on switchMap for example
 * And share() operator works weird after ajax abort
 * */
/**
 * `shareReplayAbortable` is an RxJS custom operator that shares a source Observable
 * while retaining the latest emitted value for new subscribers. Additionally, it offers
 * the capability to reset its internal state upon error, completion, or when all subscribers
 * have unsubscribed.
 *
 * @template T The type of elements in the source Observable.
 *
 * @returns {function(Observable): Observable} - A function that returns an `Observable`
 * which mirrors the source `Observable` with the described sharing and resetting behavior.
 *
 * @example
 * import { shareReplayAbortable } from './path-to-your-operator';
 *
 * yourObservable$.pipe(
 *   shareReplayAbortable()
 * );
 *
 * @description
 * - **Caching:** Retains the last emitted value, so new subscribers immediately receive
 * the most recent value from the source Observable.
 *
 * - **Error Handling:** Upon an error in the source Observable, it resets its internal state.
 * Future subscribers will then trigger a new subscription to the source Observable.
 *
 * - **Completion:** On source completion, the internal state is reset. Future subscribers
 * will trigger a new subscription to the source Observable.
 *
 * - **Unsubscription:** When all subscribers have unsubscribed (reference count drops to zero),
 * the operator cleans up resources and resets. The subsequent subscriber will cause a fresh
 * subscription to the source Observable.
 */
export function shareReplayAbortable() {
    // Count of current active subscribers
    let refCount = 0;
    // ReplaySubject used to cache and replay the last value/emission
    let replaySubject = new ReplaySubject(1);
    // Subscription to the source observable, so we can unsubscribe when needed
    let sourceSubscription = null;
    // This stop$ does the tricke in takeUntil
    // Subject used to signal when to stop/abort the source observable subscription
    const stop$ = new Subject();
    return (source) => {
        return new Observable(observer => {
            refCount++;
            // console.log('Subscriber added. Total:', refCount)
            if (!sourceSubscription) {
                // console.log('Subscribing to the source')
                sourceSubscription = source.pipe(takeUntil(stop$)).subscribe({
                    next: value => {
                        if (replaySubject) {
                            replaySubject.next(value);
                            // it's if we use operator for single request that is completed after the first next
                            // replaySubject.complete()
                        }
                    },
                    error: err => {
                        if (replaySubject) {
                            replaySubject.error(err);
                        }
                        // console.log('Error in source')
                        reset();
                    },
                    complete: () => {
                        // console.log('Source completed')
                        reset();
                    }
                });
            }
            const subscription = replaySubject.subscribe({
                next: observer.next.bind(observer),
                error: err => {
                    observer.error(err);
                    // console.log('Error in replay subject')
                    reset();
                },
                complete: () => {
                    observer.complete();
                    // console.log('ReplaySubject completed')
                    reset();
                }
            });
            return () => {
                refCount--;
                // console.log('Subscriber removed. Total:', refCount)
                if (refCount === 0) {
                    // console.log('No subscribers left. Resetting.')
                    reset();
                }
                subscription.unsubscribe();
            };
        });
    };
    /**
     * Helper function to reset and cleanup the resources.
     * - Sends a signal to stop the source subscription
     * - Unsubscribes from the source if there's a subscription
     * - Unsubscribes and re-creates the ReplaySubject for future subscribers
     */
    function reset() {
        stop$.next();
        if (sourceSubscription) {
            sourceSubscription.unsubscribe();
            sourceSubscription = null;
        }
        if (replaySubject) {
            replaySubject.unsubscribe();
            // If we trigger a couple of abort requests it will restore the initial functionality of the pipeline - allow caching again
            replaySubject = new ReplaySubject(1);
        }
    }
}
/*
Could you tell me why this doesn't work the same way?

 share({
     connector: () => new ReplaySubject(1),
     resetOnError: false,
     resetOnComplete: false,
     resetOnRefCountZero: true
   }),

* Sure. Let's compare and contrast the behavior of the `shareReplayAbortable` operator that was constructed in the above discussions with the behavior of the built-in `share` operator in RxJS when using the configuration you provided.

1. **Functionality**:

   - `shareReplayAbortable` is designed to share a source observable while caching its last emission, and it should reset its internal state upon errors, completions, and when the reference count drops to zero.

   - The configuration provided for the `share` operator is attempting to do something similar. It uses a `ReplaySubject(1)` to replay the last value, and it resets when the reference count goes to zero.

2. **Reference Count Behavior**:

   - `shareReplayAbortable`: When the number of subscribers goes from 1 to 0, it will unsubscribe from the source observable and cleanup resources, then if a new subscriber comes after that, it will start a fresh subscription to the source observable.

   - `share` with `resetOnRefCountZero: true`: It has similar behavior regarding the reference count. If the reference count goes to zero (no subscribers), it disconnects from the source. If a new subscription occurs after that, it will reconnect.

3. **Error & Completion Behavior**:

   - `shareReplayAbortable`: On error or completion from the source observable, the operator resets its internal state and cleans up resources, effectively behaving like the sequence ended.

   - `share` with `resetOnError: false` and `resetOnComplete: false`: The operator will _not_ reset its internal state on errors or completions. This means if the source completes or errors and another subscriber comes in afterward, it will immediately receive the completion or error notification without the source being resubscribed to.

Given the above contrasts, the main difference in behavior will be seen during errors and completions:

- If you use `shareReplayAbortable` and the source observable completes (or errors), and then you get a new subscription, the source observable will be resubscribed to, effectively starting it fresh.

- If you use the `share` operator with the given configuration, after the source observable completes (or errors), new subscribers will immediately receive the completion or error notification, and the source will not be resubscribed to.

These nuances might cause different behaviors in your application, especially if the source observable has side effects or if its execution behavior changes between subscriptions (e.g., network requests). It's crucial to consider the specific requirements of your application when choosing which operator or configuration to use.
* */
/*
* The `share` operator in RxJS provides a configurable mechanism for multicasting with reference counting. However, the specific reset behaviors you want from `shareReplayAbortable` are intricate and do not map 1:1 to the default configurations provided by the `share` operator, especially in the case of error handling and complete.

Your `shareReplayAbortable` has a specific logic:
- It resets its internal state and starts over upon error or completion.
- It retains a history of only the last value (due to the `ReplaySubject` with a buffer size of 1).
- It cleans up and starts from scratch when refCount goes to zero.

To try to achieve this behavior using `share`, you'd use:

```typescript
share({
  connector: () => new ReplaySubject(1),
  resetOnError: true,
  resetOnComplete: true,
  resetOnRefCountZero: true
})
```

However, there's a crucial difference in behavior:

- With `resetOnError: true`, the source observable will be resubscribed immediately upon error, without waiting for a new subscriber. This might not be desirable for many scenarios because it can lead to unexpected and frequent resubscriptions in case of a repeated error source.

This difference makes the out-of-the-box `share` operator not fully identical in behavior to `shareReplayAbortable`.

The `shareReplayAbortable` provides better control over the error flow, ensuring that after an error, the observable doesn't immediately resubscribe, but waits until there's a new subscriber. This behavior is important for scenarios where you don't want unintended side effects (like HTTP requests) being triggered without explicit demand.

In short, while you can get close with the `share` operator, for exact behavior to match `shareReplayAbortable`, you'd still need the custom operator.
* */
