import { fromEvent, Observable, Subject, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import { WritableStream } from 'streamsaver';

import { encodeHexToUint8 } from '../../../utils/string/encode-hex-to-uint';
import { ExportStreamEncoder } from '../encoders/export-stream.encoder';

import { WriteStream } from './write.stream';

export class ExportStream<T> extends WriteStream<T, Uint8Array> {
  protected _export$ = new Subject<boolean>();
  protected _progress$ = new Subject<number>();

  constructor(
    protected override readonly _stream: WritableStream,
    protected override readonly _encoder: ExportStreamEncoder<T>
  ) {
    super(_stream, _encoder);
  }

  get export$() {
    return this._export$;
  }

  get progress$() {
    return this._progress$;
  }

  override onClose = () => {
    this._progress$.next(100);
    this._progress$.complete();
    this._export$.next(false);
    this._export$.complete();
  };

  export({
    total,
    timeout,
    obs$,
    limit,
  }: {
    total: number;
    limit: number;
    timeout: number;
    obs$: (skip: number, limit: number) => Observable<T[]>;
  }): void {
    if (this.closed) {
      return;
    }
    this._export$.next(true);
    this._progress$.next(0);
    // encode \xEF\xBB\xBF symbols to the beginning of the csv file for excel compatibility
    this._writer.write(encodeHexToUint8('EFBBBF'));
    this._encoder.setup();
    this.writeBefore();
    let _totalLoaded = 0;
    let _totalWritten = 0;
    fromEvent(window, 'beforeunload')
      .pipe(takeUntil(this._export$))
      .subscribe((event: BeforeUnloadEvent) => {
        if (_totalWritten < total) {
          event.returnValue = true;
        }
      });
    const _export = (skip: number) => {
      if (this.closed) {
        return;
      }
      obs$(skip, limit)
        .pipe(takeUntil(this._export$))
        .subscribe(async (result) => {
          if (result.length) {
            for (const r of result) {
              await this.write(r);
              _totalWritten++;
              this._progress$.next(Math.round((_totalWritten * 100) / total));
              timer(timeout)
                .pipe(takeUntil(this._progress$))
                .subscribe(() => {
                  this.close();
                  this.onClose();
                });
              if (_totalWritten === total) {
                await this.close();
              }
            }
            _totalLoaded += result.length;
            if (_totalLoaded < total) {
              _export(_totalLoaded);
            }
          } else {
            await this.close();
          }
        });
    };
    _export(0);
  }

  protected writeBefore(): void {
    try {
      this._writer.write(this._encoder.encodeBefore());
    } catch (_) {}
  }
}
