import { Injectable } from '@angular/core';
import { defer, from, iif, Observable, of } from 'rxjs';
import { concatMap, delay, finalize, retryWhen, switchMap, tap } from 'rxjs/operators';
import { KeycloakService } from '@aviatar/sso';
import { EventSourcePolyfill } from 'event-source-polyfill';

@Injectable({
  providedIn: 'root'
})
export class StreamService {

  constructor(private keycloakService: KeycloakService) {
  }

  public subscribe<T>(url: string): Observable<T> {
    return defer(() => from(this.keycloakService.getToken())).pipe(
      switchMap(token => {
        const eventSource = new EventSourcePolyfill(`${url}`, {
          withCredentials: false,
          heartbeatTimeout: 20 * 1000, // reconnect if within 20 seconds no update was streamed (heartbeat is expected every 10 seconds)
          headers: {
            authorization: 'Bearer ' + token
          }
        });
        return new Observable<T>(observer => {
          eventSource.onmessage = event => {
            const data: T = JSON.parse(event.data);
            if (!!data['id'] && data['id'] === 'Heartbeat') {
              return;
            }
            observer.next(data);
          };
          eventSource.onerror = error => {
            eventSource.close();
            observer.error(error);
          };
        }).pipe(
          finalize(() => eventSource.close())
        )
      }),
      retryWhen(error => error.pipe(
        tap(err => console.log('event source stream was closed with error. reopening connection.', err)),
        concatMap((e, i) =>
          iif(
            () => i > 10,
            of(e).pipe(delay(15000)), // throttle reconnect attempts after 10 retries to every 15 seconds
            of(e).pipe(delay(500)) // retry every 500ms for the first attempts
          ))
      ))
    );
  }
}
