import { Inject, Injectable } from '@angular/core';
import { WINDOW } from '@recall2/globals';
import type { Observable } from 'rxjs';
import { of, race, Subject, throwError } from 'rxjs';
import { concatMap, delay, first, retryWhen, takeUntil, tap } from 'rxjs/operators';
import type { WebSocketSubject } from 'rxjs/webSocket';
import { webSocket } from 'rxjs/webSocket';

import type { IncomingMessage, Notification } from '../protocol/messages/incoming';
import type { ChannelSubscription, OutgoingMessage } from '../protocol/messages/outgoing';
import type { WebSocketMessage } from '../protocol/messages/websocket-message.model';

@Injectable({
  providedIn: 'root',
})
export class NotificationsService {
  private disconnect$ = new Subject<void>();

  private notificationsSubject$ = new Subject<Notification>();
  notifications$ = this.notificationsSubject$.asObservable().pipe(takeUntil(this.disconnect$));

  private openConnectionSubject$ = new Subject<void>();
  openConnection$ = this.openConnectionSubject$.asObservable();

  private closeConnectionSubject$ = new Subject<void>();
  closeConnection$ = this.closeConnectionSubject$.asObservable();

  private errorSubject$ = new Subject<void>();
  error$ = this.errorSubject$.asObservable();

  private readonly baseUrl = '/api-notifications/subscribe';
  private readonly retryDelayMs = 3000;
  private readonly retryMaxAttempts = 2;

  private connected = false;
  private userId = '';
  private websocket$!: WebSocketSubject<WebSocketMessage>;

  constructor(@Inject(WINDOW) private window: Window) {}

  connect(userId: string): Promise<void> {
    if (this.connected) {
      return Promise.reject(new Error('WebSocket channel already created'));
    }

    this.userId = userId;

    this.websocket$ = this.getWebsocketClient();

    (this.websocket$.asObservable() as Observable<IncomingMessage>)
      .pipe(
        takeUntil(this.disconnect$),
        tap(notification => {
          this.notificationsSubject$.next(notification);
        }),
        retryWhen(error =>
          error.pipe(
            concatMap((error, count) => (count === this.retryMaxAttempts ? throwError(() => error) : of(error))),
            delay(this.retryDelayMs),
          ),
        ),
      )
      .subscribe({
        error: error => {
          this.errorSubject$.error(error);
        },
      });

    return race([this.openConnection$, this.error$]).pipe(first()).toPromise();
  }

  subscribeToChannel(channel: string): void {
    if (!this.connected) {
      throw new Error('WebSocket channel not created');
    }

    const channelSubscriptionMessage: ChannelSubscription = {
      action: 'subscribe',
      channel,
    };
    this.sendMessage(channelSubscriptionMessage);
  }

  disconnect(): Promise<void> {
    if (!this.connected) {
      return Promise.reject(new Error('WebSocket channel already destroyed'));
    }

    this.connected = false;
    this.disconnect$.next();
    this.disconnect$.complete();
    return this.closeConnection$.pipe(first()).toPromise();
  }

  private sendMessage(message: OutgoingMessage): void {
    this.websocket$.next(message);
  }

  private getWebsocketClient(): WebSocketSubject<WebSocketMessage> {
    const url = this.getConnectionUrl();

    return webSocket({
      url,
      openObserver: {
        next: () => {
          this.connected = true;
          this.openConnectionSubject$.next();
        },
      },
      closeObserver: {
        next: () => {
          this.closeConnectionSubject$.next();
        },
      },
    });
  }

  private getConnectionUrl(): string {
    const protocol = this.window.location.protocol === 'https:' ? 'wss:' : 'ws:';
    return `${protocol}//${this.window.location.host}${this.baseUrl}/?user=${this.userId}`;
  }
}
