import { forwardRef, Injectable } from '@angular/core';
import { Message, StompHeaders } from '@stomp/stompjs';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';
import { RxStompService } from './rx-stomp.service';

@Injectable({
  providedIn: 'root',
  useClass: forwardRef(() => CommsSocketService),
})
export abstract class CommsService {
  abstract onMessage<T>(topic: string, handler?: (message: Message) => T): Observable<T>;

  abstract send(topic: string, payload: unknown, config?: StompHeaders): void;

  abstract close(): void;
}

@Injectable({ providedIn: 'root' })
export class CommsSocketService implements CommsService {
  constructor(private stompService: RxStompService) {
    stompService.unhandledMessage$.subscribe((msg) => console.log('unhandled message ', msg));
    stompService.stompErrors$.subscribe((e) => console.error(`stomp error received error ${e}`));
    stompService.webSocketErrors$.subscribe((e) => console.error(`socket error received error ${e}`));
  }

  onMessage<T>(topic: string, handler = CommsSocketService.jsonHandler): Observable<T> {
    return this.stompService.watch(`${topic}`).pipe(map((m): T => handler(m) as T));
  }

  send(topic: string, payload: unknown, headers: StompHeaders = {}): void {
    this.stompService.publish({
      headers,
      destination: `/app${topic}`,
      body: JSON.stringify(payload),
    });
  }

  static jsonHandler(message: Message): unknown {
    return JSON.parse(message.body);
  }

  close(): Promise<void> {
    return this.stompService.stompClient.deactivate({ force: true });
  }
}
