import {Observable} from 'rxjs/internal/Observable';
import {StompConnectionStatus} from '../ws/stomp-connection-status.enum';
import {BehaviorSubject} from 'rxjs';
import {StompSubscription} from '../ws/stomp-subscription';
import {StompFrame} from '../ws/stomp-frame';
import {OnDestroy} from '@angular/core';
import {StompClose, StompEvent, StompEventType} from './stomp-event';
import {fromWorker} from 'observable-webworker';
import {Subject} from 'rxjs/internal/Subject';
import {StompWorker} from './stomp.worker';
import {take, tap} from 'rxjs/operators';
import {trimNull} from '../helpers/kluh';

export abstract class StompService implements OnDestroy {
    protected _connectionStatus$: BehaviorSubject<StompConnectionStatus> = new BehaviorSubject<StompConnectionStatus>(StompConnectionStatus.DISCONNECTED);
    protected _username$: BehaviorSubject<string | null> = new BehaviorSubject<string | null>(null);
    private stompSubscriptions: StompSubscription<any>[] = [];
    private isConnected: boolean;
    private isConnecting: boolean;
    private stompSubject: Subject<StompEvent>;
    private heartbeatInterval = 5000;
    private connectionTimeout: number;
    private connectionDelay = 0;
    private connectionTry = 0;
    private ob: Observable<StompEvent> | null;

    // private subscription: Subscription | null;

    protected constructor() {
        this.isConnected = false;
        this.isConnecting = false;
        this.stompSubject = new Subject<StompEvent>();
    }

    private initWorker(): void {
        if (this.ob) {
            return;
        }
        const input$ = this.stompSubject.asObservable()
            .pipe(tap((stompEvent) => {
                console.log('sending event');
                console.log(stompEvent);
            }));
        if (typeof Worker !== 'undefined') {
            console.log('using worker');
            this.ob = fromWorker<StompEvent, StompEvent>(() => new Worker('./stomp.worker', {type: 'module'}), input$);
        } else {
            console.log('worker not available');
            this.ob = new StompWorker().work(input$);
        }
        this.ob
            .pipe(tap((stompEvent) => {
                // console.log('received event');
                // console.log(stompEvent);
            })).subscribe((stompEvent) => {
            this.onStompEvent(stompEvent);
        }, (error) => {
        }, () => {
            console.log('worker complete');
            this.ob = null;
        });
    }

    protected abstract url(): Observable<string>;

    get connectionStatus$(): Observable<StompConnectionStatus> {
        return this._connectionStatus$.asObservable();
    }

    get username$(): Observable<string | null> {
        return this._username$;
    }

    stompSubscribe<T>(destination: string): Observable<T> {
        return this.stompTopic(destination);
    }

    publish<T>(destination: string, object: T): void {
        this.send(destination, object);
    }

    stompTopic<T>(destination: string): Observable<T> {
        console.log(`stomp topic: ${destination}`);
        let stompSubscription = this.stompSubscriptions.find((o) => o.destination === destination);
        if (!stompSubscription) {
            console.log(`creating new stomp subscription for ${destination}, isConnected ${this.isConnected}`);
            stompSubscription = new StompSubscription<T>(destination, (o => {
                this.sendUnsubscribe(o);
                const index = this.stompSubscriptions.findIndex((item) => {
                    return item.id === o.id;
                });
                if (index > -1) {
                    this.stompSubscriptions.splice(index, 1);
                }
                console.log('subscriptions length: ' + this.stompSubscriptions.length);
                if (this.stompSubscriptions.length === 0) {
                    this.disconnect();
                }
            }));
            this.stompSubscriptions.push(stompSubscription);
            if (this.isConnected) {
                this.sendSubscribe(stompSubscription);
            } else {
                this.connect();
            }
        }
        return stompSubscription.observable;
    }

    send<T>(destination: string, object: T): void {
        if (this.isConnected) {
            const message = JSON.stringify(object);
            this.stompSubject.next({
                type: StompEventType.FRAME,
                frame: {command: 'SEND', headers: {destination: destination}, body: message}
            });
        }
    }

    protected connect(): void {
        if (this.connectionTry > 0) {
            if (this.connectionDelay === 0) {
                this.connectionDelay = 1000;
            }
            if (this.connectionDelay > 10000) {
                this.connectionDelay = 10000;
            } else {
                this.connectionDelay = this.connectionDelay * 2;
            }
        }
        if (!this.isConnected && !this.isConnecting) {
            this.connectionTimeout = window.setTimeout(() => {
                console.log('get url');
                this.url().pipe(take(1)).subscribe((url) => {
                    this.initWorker();
                    console.log(`get url done, ${url}`);
                    this.isConnecting = true;
                    this.connectionTry++;
                    this.stompSubject.next({
                        type: StompEventType.OPEN,
                        connect: {
                            url: url,
                            heartbeat: this.heartbeatInterval
                        }
                    });
                });
            }, this.connectionDelay);
        }
        console.log(`stomp connect, isConnected: ${this.isConnected}, isConnecting: ${this.isConnecting}`);

    }

    protected disconnect(): void {
        console.log(`stomp disconnect, isConnected: ${this.isConnected}, isConnecting: ${this.isConnecting}`);
        if (this.isConnected || this.isConnecting) {
            this.stompSubject.next({
                type: StompEventType.CLOSE
            });
        }
    }

    private onClose(closeEvent: StompClose): void {
        console.log(`stomp close, code: ${closeEvent.code}, reason: ${closeEvent.reason}, wasClean: ${closeEvent.wasClean}`);
        this.isConnected = false;
        this.isConnecting = false;
        console.debug('stomp CloseEvent:');
        console.debug(closeEvent);
        if (closeEvent.code === 1000) {
            switch (closeEvent.reason) {
                case 'Duplicate login':
                    this._connectionStatus$.next(StompConnectionStatus.DUPLICATE_LOGIN);
                    break;
                case 'Exceeded Maximum Users':
                    this._connectionStatus$.next(StompConnectionStatus.EXCEEDED_MAXIMUM_USERS);
                    break;
                default:
                    // volta o delay inicial caso a conexão seja encerrada por motivo X
                    this.connectionDelay = 0;
                    this._connectionStatus$.next(StompConnectionStatus.CONNECTION_ENDED);
                    this.connect();
            }
        } else {
            this._connectionStatus$.next(StompConnectionStatus.DISCONNECTED);
            this.connect();
        }
    }

    private onStompConnect(): void {
        this.isConnected = true;
        this.isConnecting = false;
        if (this.connectionTimeout) {
            clearTimeout(this.connectionTimeout);
            this.connectionTimeout = null;
        }
        this.connectionDelay = 0;
        this.connectionTry = 0;
        this._connectionStatus$.next(StompConnectionStatus.OK);
        for (const observer of this.stompSubscriptions) {
            this.sendSubscribe(observer);
        }
    }

    private onOpen(): void {
        this.stompSubject.next({
            type: StompEventType.FRAME,
            frame: {command: 'CONNECT', headers: {'accept-version': '1.2', 'heart-beat': `${this.heartbeatInterval},${this.heartbeatInterval}`}}
        });
    }

    private onError(error: string): void {
        console.debug('stomp onError:');
        console.error(error);
        // if (this.subjectMap) {
        //   this.subjectMap.forEach((value, key) => {
        //     value.error(error);
        //     this.client.unsubscribe(key);
        //   });
        // }
    }

    private onStompFrame(frame: StompFrame): void {
        // if (frame.heartbeat) {
        //     this.sendHeartbeat();
        // }
        switch (frame.command) {
            case 'CONNECTED':
                this._username$.next(frame.headers['user-name']);
                this.onStompConnect();
                break;
            case 'MESSAGE':
                console.debug('received message');
                console.debug(frame);
                const subscriptionId = frame.headers['subscription'];
                const stompSubscription = this.stompSubscriptions.find((observer) => {
                    return observer.id === subscriptionId;
                });
                if (stompSubscription) {
                    console.debug('emitting message to observer:');
                    console.debug(stompSubscription);
                    stompSubscription.emit(trimNull(frame.body));
                }
                break;
            case 'ERROR':
                this.onError(JSON.stringify(frame));
                break;
            case 'DISCONNECT':
                this.disconnect();
                break;
        }
    }

    private sendSubscribe(stompSubscription: StompSubscription<any>): void {
        console.log('sendSubscribe');
        console.log(stompSubscription);
        if (this.isConnected) {
            this.stompSubject.next({
                type: StompEventType.FRAME,
                frame: {
                    command: 'SUBSCRIBE', headers: {
                        id: stompSubscription.id,
                        destination: stompSubscription.destination
                    }
                }
            });
        }
    }

    private sendUnsubscribe(stompSubscription: StompSubscription<any>): void {
        if (this.isConnected) {
            this.stompSubject.next({
                type: StompEventType.FRAME,
                frame: {command: 'UNSUBSCRIBE', headers: {id: stompSubscription.id}}
            });
        }
    }

    ngOnDestroy(): void {
        this._connectionStatus$.next(StompConnectionStatus.DESTROYED);
        this._connectionStatus$.complete();
        for (const stompSubscription of this.stompSubscriptions) {
            stompSubscription.complete();
        }
        this.stompSubject.next({
            type: StompEventType.CLOSE
        });
    }

    private onStompEvent(stompEvent: StompEvent): void {
        switch (stompEvent.type) {
            case StompEventType.OPEN:
                this.onOpen();
                break;
            case StompEventType.CLOSE:
                this.onClose(stompEvent.close);
                break;
            case StompEventType.FRAME:
                this.onStompFrame(stompEvent.frame);
                break;
            case StompEventType.ERROR:
                this.onError(stompEvent.error);
                break;
        }
    }
}
