import {Inject, Injectable} from '@angular/core';
import {RxStomp, RxStompConfig, RxStompState} from '@stomp/rx-stomp';
import {Observable, timer} from 'rxjs';
import {StompjsConnectionStatus} from '../models/stompjs-connection-status.enum';
import {finalize, map, switchMap, tap} from 'rxjs/operators';

const SOCKET_DISCONNECTION_TIME = 1000; // in milliseconds
@Injectable()
export class StompjsService {
	private readonly rxStomp: RxStomp;
	private connectionStatus;
	private manuallyDisconnected;

	constructor(@Inject('rxStompConfig') private rxStompConfig: RxStompConfig) {
		this.rxStomp = new RxStomp();
		this.rxStomp.configure(rxStompConfig);
	}

	getConnectionStatus(): Observable<StompjsConnectionStatus> {
		return (this.rxStomp.connectionState$.asObservable()).pipe(map((connectionState: RxStompState) => {
			if (connectionState === RxStompState.OPEN) {
				if (!this.connectionStatus) {
					this.connectionStatus = StompjsConnectionStatus.Connected;
				} else if (this.connectionStatus === StompjsConnectionStatus.Disconnected && this.manuallyDisconnected) {
					this.connectionStatus = StompjsConnectionStatus.Connected;
					this.manuallyDisconnected = false;
				} else if (this.connectionStatus === StompjsConnectionStatus.Disconnected) {
					this.connectionStatus = StompjsConnectionStatus.Reconnected;
				}
			}
			if (connectionState === RxStompState.CLOSED && this.connectionStatus) {
				setTimeout(() => {
					this.connectionStatus = StompjsConnectionStatus.Disconnected;
				}, SOCKET_DISCONNECTION_TIME);
			}
			return this.connectionStatus;
		}));
	}

	watch(path: string): Observable<Object> {
		const disconnectionWaitTime = this.manuallyDisconnected ? SOCKET_DISCONNECTION_TIME : 0;
		return timer(disconnectionWaitTime).pipe(
			tap(() => this.rxStomp.activate()),
			switchMap(() => this.rxStomp.watch(path).pipe(
				map((message: any) => JSON.parse(message.body)),
				finalize(() => {
					this.rxStomp.deactivate();
					this.manuallyDisconnected = true;
				}))
			)
		);
	}

	publish(destination: string, payload: Object): void {
		if (this.connectionStatus !== StompjsConnectionStatus.Disconnected) {
			this.rxStomp.publish({destination: destination, body: JSON.stringify(payload)});
		}
	}

	getNativeRxStomp(): RxStomp {
		return this.rxStomp;
	}
}
