import { BehaviorSubject, defer, from, Observable, of, Subject, throwError, timer } from 'rxjs';
import {
  catchError,
  delay,
  distinctUntilChanged,
  filter,
  first,
  map,
  repeat,
  repeatWhen,
  retryWhen,
  switchMap,
  takeUntil,
  takeWhile,
  timeout
} from 'rxjs/operators';
import { QueuingClientConnectionState, QueuingConnectionState } from './queuing-context-state';

import { HubConnection, HubConnectionBuilder, ILogger, LogLevel } from '@microsoft/signalr';
import { LogLevel as SLogLevel } from '@models/log/const';
import { Logger } from '@services/logger';
import { ILoginResult, IRefreshTokenStoreItem } from '@techwan/security';
import { CancellationTokenSource } from './cancellation-token-source';

export class QueuingContext implements ILogger {
  private connection: HubConnection;
  private isRegistrationRequired = false;
  private startedCts: CancellationTokenSource;

  private _state = new BehaviorSubject(QueuingClientConnectionState.Initialized);
  get state(): Observable<any> {
    return this._state.asObservable().pipe(distinctUntilChanged());
  }

  private _messages = new Subject();
  get messages(): Observable<any> {
    return this._messages.asObservable();
  }

  private _queueDeleted = new Subject();
  get queueDeleted(): Observable<any> {
    return this._queueDeleted.asObservable();
  }

  private _sessionExpired = new Subject();
  get sessionExpired(): Observable<any> {
    return this._sessionExpired.asObservable();
  }

  private _tokenExpired = new Subject();
  get tokenExpired(): Observable<any> {
    return this._tokenExpired.asObservable();
  }

  private _onError = new Subject();
  get onError(): Observable<any> {
    return this._onError.asObservable();
  }

  private withRetries = false;
  private token = null;

  constructor(
    private baseUrl: string,
    private connectionName: string,
    private maximumPendingReconnectInSeconds: number = 60,
    private retryDelay: number = 1000,
    private logger: ILogger = null
  ) {
    this.connection = this.createConnection();
  }

  private createConnection(): HubConnection {
    const connection: HubConnection = new HubConnectionBuilder()
      .withUrl(this.baseUrl, {
        accessTokenFactory: () => {
          return this.token;
        }
      })
      .configureLogging(this.logger || this)
      .build();

    connection.on('RenewRegistration', () => this.onRenewRegistration());
    connection.on('HandleMessage', messageContainer => this.onHandleMessage(messageContainer));
    connection.on('SessionExpired', () => this.onQueueDeleted()); // Queue was deleted on backend
    connection.on('Disconnect', () => this.onDisconnect());
    connection.onclose(() => this.onClose());

    return connection;
  }

  private onRenewRegistration() {
    Logger.debug(
      'Received request to renew the registration. This usually happens after push service is recycled or after reconnecting to a different server.',
      `QueuingContext - ${this.connectionName}`
    );

    if (this.startedCts && !this.startedCts.isCancellationRequested() && this._state.getValue().isConnected) {
      this._state.next(QueuingClientConnectionState.Connected);
      this.registerAsync();
    }
  }

  private onHandleMessage(messageContainer) {
    this._messages.next(messageContainer);
    this.acknowledgeAsync(messageContainer.deliveryTag);
  }

  private acknowledgeAsync(deliveryTag) {
    return from(
      this.connection
        .invoke('Acknowledge', deliveryTag)
        .catch(err =>
          Logger.error(
            `ACK of the message with delivery tag "${deliveryTag}" failed (${err.message}).`,
            `QueuingContext - ${this.connectionName}`
          )
        )
    );
  }

  private onQueueDeleted() {
    this._queueDeleted.next(undefined);
    this.stopAsync();
  }

  private onDisconnect() {
    this.stopAsync();
  }

  private onClose() {
    Logger.warn('SignalR connection has been closed.', `QueuingContext - ${this.connectionName}`);

    if (this.startedCts && !this.startedCts.isCancellationRequested()) {
      if (this.withRetries) {
        this.reconnectAsync(this.startedCts);
      } else {
        this.stopAsync();
      }
    }
  }

  onRefreshTokenChanged(refreshToken: IRefreshTokenStoreItem) {
    if (refreshToken) {
      const loginResult: ILoginResult = refreshToken.login;
      if (loginResult) {
        this.token = loginResult.TokenString;
      }
    } else if (this.token) {
      this._tokenExpired.next();
    }
  }

  isConnected() {
    return this._state.getValue().isConnected;
  }

  currentState(): QueuingConnectionState {
    return this._state.getValue();
  }

  log(logLevel: LogLevel, message: string): void {
    Logger.output(message, `QueuingContext - ${this.connectionName}`, this.signalrLogToMobile(logLevel));
  }

  private signalrLogToMobile(level: LogLevel): SLogLevel {
    if (level === LogLevel.Critical || level === LogLevel.Error) {
      return SLogLevel.ERROR;
    } else if (level === LogLevel.Debug || level === LogLevel.Trace || level === LogLevel.None) {
      return SLogLevel.DEBUG;
    } else if (level === LogLevel.Information) {
      return SLogLevel.INFO;
    } else if (level === LogLevel.Warning) {
      return SLogLevel.WARN;
    }
  }

  startAsync(withRetries: boolean = true): Observable<any> {
    if (this.startedCts) {
      return of(undefined);
    }

    this.startedCts = new CancellationTokenSource();

    if (withRetries) {
      this.withRetries = true;
      return this.connectWithRetriesAsync(this.getTimeout(), this.startedCts);
    } else {
      return this.connectAsync(this.startedCts);
    }
  }

  private getTimeout() {
    const time = new Date();
    time.setSeconds(time.getSeconds() + this.maximumPendingReconnectInSeconds);
    return time;
  }

  private connectWithRetriesAsync(time, cts) {
    return defer(() => this.connectAsync(cts)).pipe(
      repeat(),
      retryWhen(errors => {
        return errors.pipe(
          switchMap(err => {
            if (!err.isCancelled && err.statusCode !== 401 && new Date() < time) {
              Logger.debug(
                'Connecting to server failed. Retrying in ' + this.retryDelay + 'ms ...',
                `QueuingContext - ${this.connectionName}`
              );
              return timer(this.retryDelay);
            }

            return throwError(err);
          })
        );
      }),
      first()
    );
  }

  private connectAsync(cts) {
    if (cts.isCancellationRequested()) {
      return throwError({ message: 'Cancelled by caller.', isCancelled: true });
    }
    Logger.debug('Connecting ...', `QueuingContext - ${this.connectionName}`);

    return from(
      this.connection.start().then(
        () => {
          if (cts.isCancellationRequested()) {
            return Promise.reject({ message: 'Cancelled by caller.', isCancelled: true });
          }
          this._state.next(QueuingClientConnectionState.Connected);
        },
        err => {
          if (err && err.statusCode === 401) {
            Logger.error('Error http statusCode 401', `QueuingContext - ${this.connectionName}`);
            this._sessionExpired.next(undefined);
          }

          return Promise.reject(err);
        }
      )
    );
  }

  protected reconnectAsync(cts) {
    if (!cts.isCancellationRequested()) {
      this._state.next(QueuingClientConnectionState.Reconnecting);

      const time = this.getTimeout();

      return this.connectWithRetriesAsync(time, cts)
        .pipe(
          map(() => {
            if (this.isRegistrationRequired) {
              return this.registerWithRetriesAsync(time, cts);
            }

            return true;
          }),
          catchError(() => {
            return this.stopAsync(true);
          })
        )
        .subscribe(isRegistered => {
          if (!isRegistered) {
            return this.stopAsync(true);
          }
        });
    } else {
      this._state.next(QueuingClientConnectionState.Disconnected);
    }

    return of(undefined);
  }

  private registerWithRetriesAsync(time, cts) {
    return this.registerAsync().pipe(
      repeatWhen(registrationAttempts => registrationAttempts.pipe(delay(this.retryDelay))),
      takeUntil(this.state.pipe(filter(s => !s.isConnected))),
      filter(success => !!success),
      takeWhile(() => !cts.isCancellationRequested()),
      timeout(time),
      first()
    );
  }

  registerClientAsync(withRetries: boolean = true): Observable<any> {
    if (!this.startedCts || this.startedCts.isCancellationRequested()) {
      return throwError(new Error('Cancelled by caller'));
    }

    this.isRegistrationRequired = true;

    if (withRetries) {
      return this.registerWithRetriesAsync(this.getTimeout(), this.startedCts);
    }

    return this.registerAsync();
  }

  registerAsync(): Observable<any> {
    if (this.startedCts.isCancellationRequested()) {
      return throwError(new Error('Cancelled by caller'));
    }

    Logger.debug('Registering ...', `QueuingContext - ${this.connectionName}`);

    return from(
      this.connection.invoke('RegisterClient').then(success => {
        if (this.startedCts.isCancellationRequested()) {
          return throwError(new Error('Cancelled by caller'));
        }
        if (success) {
          Logger.debug('Registered', `QueuingContext - ${this.connectionName}`);
          this._state.next(QueuingClientConnectionState.Registered);
        } else {
          Logger.debug('Registration failed.', `QueuingContext - ${this.connectionName}`);
        }
        return success;
      })
    );
  }

  stopAsync(emitOnError: boolean = false) {
    if (this.startedCts) {
      this.isRegistrationRequired = false;

      const obs = from(this.connection.stop());
      this._state.next(QueuingClientConnectionState.Disconnected);

      this.startedCts.cancel();
      this.startedCts = null;

      if (emitOnError) {
        this._onError.next();
      }

      return obs;
    }
    return of(undefined);
  }

  protected ensureValidConnectionAsync(): Observable<any> {
    if (!this.isConnected()) {
      return throwError(new Error('QueuingClient not connected to push service - call connect() methods first.'));
    }
    return of(undefined);
  }

  registerChannelAsync(channelName) {
    this.ensureValidConnectionAsync()
      .pipe(map(() => this.connection.invoke('RegisterChannel', channelName)))
      .subscribe(isRegistered => {
        if (isRegistered) {
          Logger.debug('Registered to channel: ' + channelName + '.', `QueuingContext - ${this.connectionName}`);
        } else {
          Logger.warn('Registration to channel "' + channelName + '" failed.', `QueuingContext - ${this.connectionName}`);
        }

        return isRegistered;
      });
  }

  unregisterChannelAsync(channelName) {
    this.ensureValidConnectionAsync()
      .pipe(map(() => this.connection.invoke('UnregisterChannel', channelName)))
      .subscribe(isUnregistered => {
        if (isUnregistered) {
          Logger.debug('Unregistered from channel: ' + channelName + '.', `QueuingContext - ${this.connectionName}`);
        } else {
          Logger.warn('Unregistration from channel "' + channelName + '" failed.', `QueuingContext - ${this.connectionName}`);
        }

        return isUnregistered;
      });
  }
}
