import { Injectable } from '@angular/core';
import { IotMessage, types } from 'app/common/domain/iotmessage';
import { Observable, Subject } from 'rxjs';
import { filter, map, share, takeWhile } from 'rxjs/operators';
import { environment } from '../../environments/environment';
import { connect, IConnackPacket, MqttClient } from 'mqtt';
import { Buffer } from 'buffer';
import { StorageService } from "../shared/storage-service";
import { setInterval, clearInterval } from 'worker-timers'; // To avoid throttling of setInterval on inactive tabs
import { IMachine } from './domain-model';
import { id } from '@swimlane/ngx-datatable';
import Utils from 'app/common/Utils';

interface IMqttMsg {
	topic: string;
	payload: Buffer;
}

export enum SEND_RETAINED {
	ALWAYS = 0,
	ONLY_ON_FIRST_SUB = 1,
	NEVER = 2
}

interface ISubData {
	count: number;
	rh: SEND_RETAINED;
}

@Injectable({ providedIn: 'root' })
export class MqttService {
	public isOpen = false;

	private client: MqttClient;
	private clientId: string;

	private timer: number;

	private mqttSubsMap: Map<string, ISubData> = new Map<string, ISubData>();

	private websocketSubject: Subject<IMqttMsg> = new Subject<IMqttMsg>();
	private websocketData: Observable<IotMessage> = this.websocketSubject.pipe(
		map((msg: IMqttMsg): IotMessage => {
			return {
				...this.parseSecure(msg.payload.toString()),
				destinationName: msg.topic
			};
		})
	);

	private intervalId: any;
	private isFirstConnection = true;
	private isReconnecting = false;

	public constructor(private storageService: StorageService) {
		this.clientId = 'typ-angular-frontend_mqttjs_' + new Date().toISOString() + '_' + Math.floor(Math.random() * 999);
	}

	public stopPeriodicConnect() {
		if (this.intervalId) {
			clearInterval(this.intervalId);
			this.isReconnecting = false;
		}
	}

	public startPeriodicConnect() {
		// on first connecting, try to connect instantly (after login) without the 5s delay from the interval
		if (this.isFirstConnection) {
			this.tryConnectingToMqtt();
		}

		this.stopPeriodicConnect();

		this.intervalId = setInterval((_) => {
			this.tryConnectingToMqtt();
		}, 5000);
	}

	public disconnect() {
		if (!this.client || !this.isOpen) return;

		this.isOpen = false;

		// delete topics that are currently not subscribed
		for (const [key, val] of this.mqttSubsMap) {
			if (val.count <= 0) {
				this.mqttSubsMap.delete(key);
			}
		}

		console.log('Closing MQTT connection...');
		this.client.end();
		this.isFirstConnection = true;
		this.client = null;
	}

	public tryConnectingToMqtt() {
		// still no answer from 'client.connect', wait for answer until next retry
		if (this.isReconnecting) return;

		// already connected or user is logged out -> don't try to connect anymore
		if (this.client && this.client.connected || !this.storageService.getToken()) {
			this.stopPeriodicConnect();
			return;
		}

		console.log('Reconnecting to MQTT...');
		this.isReconnecting = true;

		try {
			this.client = connect(environment.realtimeServices.url, {
				clientId: this.clientId,
				protocolVersion: 5,
				reconnectPeriod: 0,
				username: 'noNeeded',
				password: this.storageService.getToken().split('"')[3]
			});

			this.client.on('connect', (ack) => this.onConnected(ack));
			this.client.on('error', (err) => this.onError(err));
			this.client.on('message', (topic, payload) => this.websocketSubject.next({ topic, payload }));
			this.client.on('close', () => this.onConnectionLost());

			this.isFirstConnection = false;
		} catch (err) {
			console.log('Could not connect to MQTT: ', err);
			this.isReconnecting = false;
		}
	}

	private trySubUnsubMqtt(topics: string[], count: number, rh: SEND_RETAINED) {
		let subAll = true;
		let unSubAll = true;
		for (const topic of topics) {
			let present = this.mqttSubsMap.get(topic);
			if (present) {
				present.count += count;
			} else {
				this.mqttSubsMap.set(topic, { count: count, rh: rh })
				present = this.mqttSubsMap.get(topic);
			}
			subAll = subAll && (present.count > 0);
			unSubAll = unSubAll && (present.count <= 0);
		}

		// skip in case of an unsub event when there are still active subs
		if (this.client && !this.client.connected || !this.isOpen || subAll && count < 0) {
			return;
		}

		try {
			if (subAll) {
				this.client.subscribe(topics, { qos: 0, rh: rh });
			} else if (unSubAll) {
				this.client.unsubscribe(topics);
				for (const topic of topics) {
					this.mqttSubsMap.delete(topic);
				}
			} else {
				for (const topic of topics) {
					let current = this.mqttSubsMap.get(topic);

					// skip in case of an unsub event when there are still active subs
					if (!current || current.count > 0 && count < 0) continue;

					if (current.count > 0) {
						this.client.subscribe(topic, { qos: 0, rh: rh });
					} else {
						this.client.unsubscribe(topic);
						this.mqttSubsMap.delete(topic);
					}
				}
			}
		} catch (err) {
			console.log('Could not SUB/UNSUB mqtt-topics ' + topics + ': ', err);
		}
	}

	private tryResubMqtt() {
		// subscribe to active subscriptions again after a reconnect
		const rhMap = new Map<SEND_RETAINED, string[]>();
		rhMap.set(SEND_RETAINED.ALWAYS, []);
		rhMap.set(SEND_RETAINED.ONLY_ON_FIRST_SUB, []);
		rhMap.set(SEND_RETAINED.NEVER, []);
		for (const [key, value] of this.mqttSubsMap) {
			rhMap.get(value.rh).push(key);
		}
		for (const [rh, topics] of rhMap) {
			try {
				this.client.subscribe(topics, { qos: 0, rh: rh });
			} catch (err) {
				console.log('Could not SUB/UNSUB mqtt-topic ' + topics + ': ', err);
			}
		}
	}

	private onConnected(ack: IConnackPacket): void {
		this.stopPeriodicConnect();
		this.isOpen = true;
		console.log('Connected to MQTT');

		// subscribe to active subscriptions again after a reconnect
		this.tryResubMqtt();

		// To resolve reconnection problems, specially on DEV
		// https://github.com/mqttjs/MQTT.js/issues/1257#issuecomment-1671891246
		this.timer = setInterval(() => {
			(this.client as any)._sendPacket({ cmd: 'pingreq' });
		}, 15 * 1000);
	}

	private onConnectionLost(): void {
		console.log('Lost connection to MQTT');
		this.isReconnecting = false;

		// if an unexpected connection lost happens then reconnect again
		if (this.client && !this.client.connected && this.isOpen) {
			this.startPeriodicConnect();
		}

		// clear mqtt keep alive timer
		clearInterval(this.timer);
	}

	private onError(err: Error): void {
		console.log('MQTT Error:', err.message);

		if (err.message.includes('Connection refused')) {
			this.isReconnecting = false;
		}
	}

	public subscribeValue(machine: IMachine, name: string, rh: SEND_RETAINED = SEND_RETAINED.ALWAYS) {
		if (name.indexOf('.') !== -1) {
			console.log('subscription contains .');
			return;
		}
		if (name.indexOf('+') !== -1) {
			console.log('subscription contains +');
			return;
		}
		if (name.indexOf('*') !== -1) {
			console.log('subscription contains *');
			return;
		}

		const identifier = `${Utils.getMqttPrefixFromMachine(machine)}/${name}`;
		this.trySubUnsubMqtt([identifier], 1, rh);
	}

	public unsubscribeValue(machine: IMachine, name: string, rh: SEND_RETAINED = SEND_RETAINED.ALWAYS) {
		const identifier = `${Utils.getMqttPrefixFromMachine(machine)}/${name}`;
		this.trySubUnsubMqtt([identifier], -1, rh);
	}

	public subscribeValueArray(machine: IMachine, nameArray: Array<string>, rh: SEND_RETAINED = SEND_RETAINED.ALWAYS) {
		if (!nameArray || nameArray.length === 0) {
			//console.log('Warning: mqtt-sub-array array empty');
			return;
		}

		const identifiers = nameArray.map((name) => `${Utils.getMqttPrefixFromMachine(machine)}/${name}`);
		//console.log('Subscribing to array: ', identifiers);
		this.trySubUnsubMqtt(identifiers, 1, rh);
	}

	public unsubscribeValueArray(machine: IMachine, nameArray: Array<string>, rh: SEND_RETAINED = SEND_RETAINED.ALWAYS) {
		if (!nameArray || nameArray.length === 0) {
			//console.log('Warning: mqtt-unsub-array empty');
			return;
		}

		const identifiers = nameArray.map((name) => `${Utils.getMqttPrefixFromMachine(machine)}/${name}`);
		this.trySubUnsubMqtt(identifiers, -1, rh);
	}

	public subscribeValueArrayForDailyAndBatch(machine: IMachine, nameArray: Array<Array<string>>, rh: SEND_RETAINED = SEND_RETAINED.ALWAYS) {
		if (!nameArray || nameArray.length === 0) {
			//console.log('wtf sub array empty');
			return;
		}

		let neededValues = [];
		for (const names of nameArray) {
			const dailyName = names[0]; // first item is daily variable name
			const batchName = names[1]; // second item is batch variable name

			neededValues.push(dailyName);
			if (dailyName !== batchName) {
				neededValues.push(batchName);
			}
		}

		this.subscribeValueArray(machine, neededValues, rh);
	}

	public unSubscribeValueArrayForDailyAndBatch(machine: IMachine, nameArray: Array<Array<string>>, rh: SEND_RETAINED = SEND_RETAINED.ALWAYS) {
		if (!nameArray || nameArray.length === 0) {
			//console.log('wtf unsub array empty');
			return;
		}

		let neededValues = [];
		for (const names of nameArray) {
			const dailyName = names[0]; // first item is daily variable name
			const batchName = names[1]; // second item is batch variable name

			neededValues.push(dailyName);
			if (dailyName !== batchName) {
				neededValues.push(batchName);
			}
		}

		this.unsubscribeValueArray(machine, neededValues, rh);
	}


	// --------- deprecated
	/* private subscribeValueForMachine(machine: IMachine, name: string, rh: SEND_RETAINED = SEND_RETAINED.ALWAYS) {
		// Empty check for avoid the unwanted subscription
		if (name !== '') {
			this.subscribeValue(machine, name, rh);
		} else {
			console.log(`empty subscription in machine:${machine.name} for value: ${name}`);
		}
	}

	private unSubscribeValueForMachine(machineName: string, name: string, rh: SEND_RETAINED = SEND_RETAINED.ALWAYS) {
		// Empty check for avoid the unwanted subscription
		if (name !== '') {
			this.unsubscribeValue(machineName, name, rh);
		} else {
			console.log(`empty subscription in machine:${machineName} for value: ${name}`);
		}
	}
	*/



	public subscribeComms(userId: number, rh: SEND_RETAINED = SEND_RETAINED.ALWAYS) {
		this.trySubUnsubMqtt(['comms/user/' + userId], 1, rh);
	}

	public subscribeCompleteMachine(machine: IMachine, rh: SEND_RETAINED = SEND_RETAINED.ALWAYS) {
		this.trySubUnsubMqtt([`${Utils.getMqttPrefixFromMachine(machine)}/#`], 1, rh);
	}

	public unsubscribeCompleteMachine(machine: IMachine, rh: SEND_RETAINED = SEND_RETAINED.ALWAYS) {
		this.trySubUnsubMqtt([`${Utils.getMqttPrefixFromMachine(machine)}/#`], -1, rh);
	}

	private parseSecure(data) {
		try {
			return JSON.parse(data);
		} catch (err) {
			console.log('received unparsable data from stream: ' + err, data);
			return {
				value: 0,
				timestamp_created: 0,
				timestamp_measurement: 0,
				identifier: 'NOMATCH',
				type: types.uint16
			};
		}
	}

	// This function returns a new filtered observable on the two provided functions (machinename and the array:neededValues)
	public filterStream(
		getMachineName: () => string,
		getNeededValues: () => string[]
	): Observable<IotMessage> {
		return this.websocketData.pipe(
			filter((msg) => {
				return msg.destinationName.startsWith(getMachineName())
			}),
			filter((msg) =>
				getNeededValues().some((neededValue) => {
					if (Array.isArray(neededValue)) {
						// for daily/batch variables, needed value is an array
						return (
							msg.destinationName.endsWith(neededValue[0]) ||
							msg.destinationName.endsWith(neededValue[1])
						);
					} else {
						return msg.destinationName.endsWith(neededValue);
					}
				})
			),
			share()
		);
	}

	public filterStreamUntilTrue(
		getMachineName: () => string,
		getNeededValues: () => string[],
		until: () => boolean
	): Observable<IotMessage> {
		return this.websocketData.pipe(
			takeWhile((msg) => until()),
			filter((msg) => msg.destinationName.startsWith(getMachineName())),
			filter((msg) =>
				getNeededValues().some((neededValue) => {
					return msg.destinationName.endsWith(neededValue);
				})
			)
		);
	}

	public filterStreamCompleteMachine(getMachineName: () => string): Observable<IotMessage> {
		return this.websocketData.pipe(
			filter((msg) => msg.destinationName.startsWith(getMachineName()))
		);
	}

	public filterStreamForComms(): Observable<IotMessage> {
		return this.websocketData.pipe(
			filter((msg) => msg.destinationName.startsWith('comms'))
		);
	}

	private oldMqttSubsMap: Map<string, ISubData> = new Map<string, ISubData>();
	private newMqttSubsMap: Map<string, ISubData> = new Map<string, ISubData>();

	// method to help checking if all sub/unsubs are working as intended
	public printCurrentSubs() {
		this.newMqttSubsMap.clear();

		for (const [key, value] of this.mqttSubsMap.entries()) {
			if (!this.oldMqttSubsMap.has(key)) {
				this.newMqttSubsMap.set(key, value);
			}
		}

		this.oldMqttSubsMap = new Map(this.mqttSubsMap);
		console.log('Creating snapshot of current mqtt subs: ', this.oldMqttSubsMap);

		console.log('MqttSub Difference: ', this.newMqttSubsMap);
	}

	public resubAllActive() {
		console.log('RESUB ALL ACTIVE');
		console.log(this.mqttSubsMap.keys());

		// subscribe to active subscriptions again after a reconnect
		this.tryResubMqtt();
	}
}
