import delay from 'lodash/delay';
import isFunction from 'lodash/isFunction';
import { authenticateRequest, getDefaultAuthSession, IAuthSessionProvider } from './helpers/authenticate';
import { StreamClient } from './StreamClient';
import {
	IClientResult,
	IRpcServiceResponse,
	IStreamClientListeners,
	IStreamController,
	IStreamMethodOptions,
	RpcClientConstructor,
	RpcOptions,
	RpcStreamStatus,
} from './types/client';
import {
	IAutoRestartInputProps,
	IStream,
	IStreamError,
	IStreamOptions,
	IStreamState,
	IStreamSubscriber,
	IValidateStartResult,
	StreamStatus,
	StreamSubscribers,
} from './types/stream';

type AutoRestartProps = IAutoRestartInputProps & {
	// When TRUE the auto-restart feature has been enabled.
	enabled: boolean;
	// Base delay between restart attempts.
	delayMs: number;
	// Maximum number of restart attempts (per cycle) before we give up
	maxAttempts: number;
	// When TRUE the stream is allowed to restart when an error occurs.
	restartOnError: boolean;
	// When TRUE will use a linear exponential backoff when calculating the delay between attempts
	applyExponentialBackoff: boolean;
	// When TRUE we are currently attempting an auto-restart
	isRestarting: boolean;
	// ID of the restart delay timer (when active)
	restartTimerId: number;
	// If this is FALSE then we should not be allowed to attempt to auto-restart - transient state version of `enabled`
	// allowAttempts: boolean;
	// Total number of times the stream has been auto-restarted (across the lifetime of the stream)
	totalTimesRestarted: number;
	// Number of attempts that have been made in the current auto-restart cycle
	attemptsThisCycle: number;
	// Applied delay between restart attempts.
	applyDelayMs: number;
};

/**
 * @returns Defaults used for auto-restart props.
 */
const autoRestartDefaults = (): AutoRestartProps => ({
	// Input options
	enabled: true,
	delayMs: 1500,
	maxAttempts: 10,
	restartOnError: false,
	applyExponentialBackoff: true,

	// Current state props
	isRestarting: false,
	restartTimerId: 0,
	// allowAttempts: true, // <-- TODO: Review & fix this
	totalTimesRestarted: 0,
	attemptsThisCycle: 0,
	applyDelayMs: 1000,
});

abstract class Stream<RpcClientType, StreamDataType, StreamRequestProps> implements IStream {
	// The stream's service URL.
	protected url: string = '';

	// The current data stream controller object.
	protected dataStream: Nullable<IStreamController> = null;

	// Stream client instance used to connect to the server via RPC.
	/* @ts-ignore : Because this gets instantiated in the streams that inherit the class and strict mode complains. */
	protected client: StreamClient<RpcClientType>;

	// Stream subscribers. These are things that are listening to activity on this stream.
	protected subscribers: StreamSubscribers<StreamDataType> = [];

	protected _currentState: IStreamState = {
		// TRUE when this stream is enabled. Disabled streams cannot start/restart or auto-restart.
		isEnabled: true,
		// TRUE when this stream is currently live and connected.
		isActive: false,
		// Stream status
		status: StreamStatus.NOT_STARTED,
		// Message associated with the status
		message: 'Stream not started',
	};

	// Auto restart properties
	protected autoRestart: AutoRestartProps = autoRestartDefaults();

	// Listener for the stream client callbacks. This allows this stream to listen to RPC stream activity.
	/* @ts-ignore : Because this gets instantiated in the streams that inherit the class and strict mode complains. */
	protected streamClientListener: IStreamClientListeners<IRpcServiceResponse>;

	// Authentication session provider
	protected readonly _session: IAuthSessionProvider;

	// Holds the last request properties that were used when starting/restarting the stream.
	protected lastRequestProps: Nullable<StreamRequestProps> = null;

	/**
	 * CONSTRUCTOR.
	 *
	 * @param url   Stream service url to connect to.
	 * @param opts  Options to use when creating this stream.
	 */
	constructor(url: string, opts?: IStreamOptions) {
		if (url === '') {
			throw new Error('Stream service url must be specified');
		}

		this.url = url;
		this.isEnabled = opts?.isEnabled ?? this.isEnabled;

		const ar = this.autoRestart;
		ar.enabled = opts?.autoRestart?.enabled ?? ar.enabled;
		ar.delayMs = opts?.autoRestart?.delayMs ?? ar.delayMs;
		ar.maxAttempts = opts?.autoRestart?.maxAttempts ?? ar.maxAttempts;
		ar.restartOnError = opts?.autoRestart?.restartOnError ?? ar.restartOnError;
		ar.applyExponentialBackoff = opts?.autoRestart?.applyExponentialBackoff ?? ar.applyExponentialBackoff;

		this._session = opts?.session ?? getDefaultAuthSession();
	}

	// ---- Abstract Methods --------------------------------------------------------------------------------------------

	/**
	 * Must override. Will attempt to start the stream.
	 *
	 * @returns TRUE if the attempt to start the stream succeeded. Note that this does NOT mean the stream actually
	 *          started and received data - you must subscribe to the stream to know that.
	 */
	public abstract start(requestProps?: Maybe<StreamRequestProps>): boolean;

	/**
	 * Must override. Implements the minimum logic needed to create and start a new stream.
	 *
	 * @returns TRUE if successfully able to create and start the stream.
	 */
	protected abstract runStream(requestProps?: Maybe<StreamRequestProps>): boolean;

	/**
	 * Must override. Implements logic used to determine if we are allowed to start/restart the stream.
	 *
	 * @returns The result of the validation. This also includes the processed request props to apply.
	 */
	protected abstract validateStart(requestProps?: Maybe<StreamRequestProps>): IValidateStartResult<StreamRequestProps>;

	// ---- Getters & Setters -------------------------------------------------------------------------------------------

	/**
	 * Authentication session.
	 */
	public get session() {
		return this._session;
	}

	/**
	 * Authentication token.
	 */
	public get token(): string {
		return this.session.token;
	}
	protected set token(value: string) {
		this.session.token = value;
	}

	public get subscriberCount(): number {
		return this.subscribers.length;
	}

	public get hasSubscribers(): boolean {
		return this.subscriberCount > 0;
	}

	/**
	 * Current state of the stream
	 */
	public get currentState(): IStreamState {
		return this._currentState;
	}
	protected set currentState(state: IStreamState) {
		this._currentState = { ...this._currentState, ...state };
	}

	/**
	 * TRUE when the stream currently active and running.
	 */
	public get isActive() {
		return this.currentState.isActive ?? false;
	}

	/**
	 * Gets the method keys supported by the RPC client.
	 */
	public get rpcClientMethods() {
		return this.client.rpcClientMethods;
	}

	/**
	 * TRUE when this stream is enabled. Disabled streams cannot start/restart or auto-restart.
	 */
	public get isEnabled(): boolean {
		return this.currentState.isEnabled ?? false;
	}
	public set isEnabled(val: boolean) {
		this.currentState.isEnabled = val;
	}

	// ---- Primary Methods ----------------------------------------------------------------------

	/**
	 * Attempts to manually restart this stream using the specified props.
	 *
	 * @returns TRUE if the attempt to restart the stream succeeded. Note that this does NOT mean the stream
	 *          actually re-connected and received data - you must subscribe to the stream to know that.
	 */
	public restart(props?: Maybe<StreamRequestProps>): boolean {
		// Disable restarts if the stream is disabled
		if (!this.isEnabled) {
			return false;
		}

		const { isValid, requestProps } = this.validateStart(props);
		if (!isValid) {
			return false;
		}

		// Manual restarts will clear any auto-restart cycle that might be active
		this.clearAutoRestarts();

		if (!this.cancelStream('Stream restarting', StreamStatus.RESTARTING)) {
			return false;
		}

		if (!this.runStream(requestProps)) {
			return false;
		}

		this.afterStart(requestProps);

		return true;
	}

	/**
	 * Attempts to manually stop this stream.
	 *
	 * @returns TRUE if the attempt to stop succeeded.
	 */
	public stop(reason?: string): boolean {
		reason = reason ?? 'Stream stopped';

		// Manual stops will clear any auto-restart cycle that might be active
		this.clearAutoRestarts();

		const didStop = this.cancelStream(reason);
		if (didStop) {
			this.setState({ isActive: false, status: StreamStatus.STOPPED, message: reason });
		}

		return didStop;
	}

	/**
	 * Subscribes to this stream. By default will throw an error if the specified subscriber already exists.
	 *
	 * @param  subscriber  Set of callbacks to execute when the relevant stream client actions occur (eg. onData, onError, etc)
	 */
	public subscribe(subscriber: IStreamSubscriber<StreamDataType>, opts?: { throwError?: boolean }) {
		const { throwError = true } = opts || {};

		const index = this.findSubscriptionIndex(subscriber);
		if (index > -1) {
			const message = this.debugMsg('Specified listener already subscribed', 'subscribe');
			if (throwError) {
				throw new Error(message);
			}
			console.error(message);

			return;
		}

		this.subscribers.push(subscriber);
	}

	/**
	 * Unsubscribes (removes) the specified subscriber from this stream.
	 *
	 * @param  subscriber  The subscriber we want to remove.
	 */
	public unsubscribe(subscriber: IStreamSubscriber<StreamDataType>): boolean {
		const index = this.findSubscriptionIndex(subscriber);
		if (index === -1) {
			return false;
		}

		this.subscribers.splice(index, 1);

		return true;
	}

	/**
	 * Unsubscribes (removes) all the subscribers from this stream.
	 */
	public unsubscribeAll(): void {
		this.subscribers = [];
	}

	// ---- Stream event handlers ---------------------------------------------------------------------------------------

	/**
	 * Called when a stream successfully starts (ie. after it first connects and receives valid data).
	 */
	protected onStreamStart = () => {
		this.setState({
			isActive: true,
			status: StreamStatus.RUNNING,
			message: 'Stream started',
		});

		this.subscribers.forEach((subscriber) => {
			subscriber.onStart && subscriber.onStart();
		});

		// Clear any active auto-restart cycle
		const ar = this.autoRestart;
		if (ar.isRestarting) {
			this.clearAutoRestarts();
		}
	};

	/**
	 * Called when the stream is voluntarily terminated (by either client or server).
	 */
	protected onStreamStop = (reason?: string, status?: StreamStatus) => {
		this.dataStream = null;

		this.setState({
			isActive: false,
			status: status ?? StreamStatus.STOPPED_ENDED,
			message: reason ?? '',
		});

		this.subscribers.forEach((subscriber) => {
			subscriber.onStop && subscriber.onStop(reason);
		});

		const isError = status === StreamStatus.STOPPED_ERROR;

		// Clear any active auto-restart cycle if we are not supposed to restart on errors
		const ar = this.autoRestart;
		if (isError && ar.isRestarting && !ar.restartOnError) {
			this.clearAutoRestarts();
		}

		// This will do nothing if we stopped due to an error and auto-restart for errors is not enabled
		this.tryAutoRestart(isError);
	};

	/**
	 * Called when the stream successfully received valid data from the server.
	 */
	protected onStreamData = (data: Maybe<StreamDataType>) => {
		if (!data) {
			return;
		}

		this.subscribers.forEach((subscriber) => {
			subscriber.onData && subscriber.onData(data);
		});
	};

	/**
	 * Called when an error has occured with the stream.
	 */
	protected onStreamError = (err: IStreamError) => {
		if (this.isActive) {
			this.forceCancelStream(
				`Stream cancelled due to error: ${err.message || 'Unknown error'}`,
				StreamStatus.STOPPED_ERROR
			);
		}

		this.subscribers.forEach((subscriber) => {
			subscriber.onError && subscriber.onError(err);
		});

		// Clear any active auto-restart cycle if we are not supposed to restart on errors
		const ar = this.autoRestart;
		if (ar.isRestarting && !ar.restartOnError) {
			this.clearAutoRestarts();
		}

		// This will do nothing if auto-restart for errors is not enabled
		this.tryAutoRestart(true);
	};

	// ---- Utility Methods ----------------------------------------------------------------------

	/**
	 * Intended to be called after each start/restart.
	 *
	 * @param requestProps
	 */
	protected afterStart(requestProps?: Maybe<StreamRequestProps>) {
		if (requestProps) {
			this.lastRequestProps = requestProps ?? null;
		}
	}

	/**
	 * Graceful cancel of the stream. This will wait for the cancel callback
	 *
	 * @param reason
	 * @returns
	 */
	protected cancelStream = (reason?: string, status?: StreamStatus): boolean => {
		if (!this.isActive) {
			return false;
		}

		this.dataStream?.cancel(reason);

		this.setState({
			isActive: false,
			status: status ?? StreamStatus.STOPPED_CANCELLED,
			message: reason,
		});

		return true;
	};

	/**
	 * Forced cancel of the stream.
	 *
	 * @param reason
	 * @param status
	 */
	protected forceCancelStream = (reason?: string, status?: StreamStatus) => {
		this.dataStream?.cancel(reason);
		this.dataStream = null;

		this.setState({
			isActive: false,
			status: status ?? StreamStatus.STOPPED_CANCELLED,
			message: reason,
		});
	};

	/**
	 * @returns The set of listeners/handlers to use for stream client activity.
	 */
	protected createStreamClientListener = <
		ResponseType extends IRpcServiceResponse
	>(): IStreamClientListeners<ResponseType> => {
		return {
			// Called when a stream successfully starts (ie. after it first received valid data).
			onStart: () => {
				this.onStreamStart();
			},

			// Called when the stream `cancel` method is executed. This will immediately terminate the connection without calling `onEnd`.
			onCancel: (reason?: Maybe<unknown>) => {
				reason = reason ?? '';

				let stopReason: string = '';
				if (typeof reason !== 'string') {
					stopReason = JSON.stringify(reason);
				} else {
					stopReason = reason;
				}

				this.onStreamStop(stopReason || 'Stream was cancelled', StreamStatus.STOPPED_CANCELLED);
			},

			// Called when the stream is voluntarily terminated (by either client or server).
			onEnd: (status?: RpcStreamStatus) => {
				const reason = `Stream ended: ${status?.details || 'Unknown reason'}`;
				const endStatus = status?.isError ? StreamStatus.STOPPED_ERROR : StreamStatus.STOPPED_ENDED;

				this.onStreamStop(reason, endStatus);
			},

			// Called when the server successfully sends valid data to the client.
			onData: (reply: IClientResult<ResponseType>) => {
				let data: Nullable<StreamDataType> = null;
				if (isFunction(reply.data?.toObject)) {
					data = reply.data?.toObject() as StreamDataType;
				}

				this.onStreamData(data);
			},

			// Called when an error has occured with the stream.
			onError: (err: IClientResult<ResponseType>) => {
				err.error && this.onStreamError(err.error);
			},
		};
	};

	/**
	 * Authenticate the request by adding the token data to the request if we have it.
	 *
	 * @param request
	 * @returns
	 */
	protected authenticateRequest<RequestType>(request: RequestType): RequestType {
		return authenticateRequest<RequestType>(request, this.token);
	}

	/**
	 * The actual type of the class extending this.
	 */
	protected get className(): string {
		return this.constructor.name ?? 'Stream';
	}

	/**
	 * Make an stream call via the client attached to this service. Returns the raw RPC data in the `data` prop.
	 *
	 * @returns See `StreamClient.stream` method
	 */
	protected stream = <RequestType, ResponseType extends IRpcServiceResponse>(
		rpcMethod: typeof this.rpcClientMethods,
		request: RequestType,
		opts?: IStreamMethodOptions
	): IStreamController => {
		const authReq = this.authenticateRequest<RequestType>(request);

		return this.client.stream<RequestType, ResponseType>(rpcMethod, authReq, this.streamClientListener, opts);
	};

	/**
	 * @param  subscriber  Set of callbacks to execute when the relevant stream client actions occur (eg. onData, onError, etc)
	 * @returns TRUE if this subscription exists.
	 */
	protected hasSubscription(subscriber: IStreamSubscriber<StreamDataType>): boolean {
		return this.findSubscriptionIndex(subscriber) > -1;
	}

	/**
	 * @param subscriber
	 * @returns The index of the specified subscriber in the the current subscriptions. Returns -1 if not found.
	 */
	protected findSubscriptionIndex(subscriber: IStreamSubscriber<StreamDataType>): number {
		if (this.subscribers.length === 0) {
			return -1;
		}

		return this.subscribers.findIndex((s: IStreamSubscriber<StreamDataType>) => Object.is(s, subscriber));
	}

	/**
	 * Stops and fully clears any active auto-restart cycle.
	 */
	protected clearAutoRestarts = () => {
		const ar = this.autoRestart;
		if (!ar.enabled) {
			return;
		}

		this.stopAutoRestarts();
		ar.applyDelayMs = ar.delayMs;
		ar.attemptsThisCycle = 0;
	};

	/**
	 * Stop any existing auto-restart cycle timer.
	 */
	protected stopAutoRestarts = () => {
		const ar = this.autoRestart;
		if (!ar.enabled) {
			return;
		}

		if (ar.restartTimerId > 0) {
			globalThis.clearTimeout(ar.restartTimerId);
			ar.restartTimerId = 0;
		}

		ar.isRestarting = false;
	};

	/**
	 * Attempt an auto-restart cycle.
	 *
	 * @param    isError  If this is TRUE we are attempting due to an error.
	 * @returns
	 */
	// TODO: Review this and fix the `ar.isRestarting` issue that cropped up
	protected tryAutoRestart(isError: boolean = false) {
		// Disable auto-restarts if the stream is disabled
		if (!this.isEnabled) {
			return;
		}

		const ar = this.autoRestart;

		// if (!ar.enabled || !ar.allowAttempts || ar.isRestarting) {
		if (!ar.enabled || ar.isRestarting) {
			return;
		}

		// console.log('tryAutoRestart.2', { ar: { ...ar } });

		// Blocks the attempt if we are starting the cycle due to an error and we are not allowed to auto-restart on errors
		if (isError && !ar.restartOnError) {
			return;
		}

		const stopAttempts = () => {
			this.clearAutoRestarts();
		};

		const attemptRestart = () => {
			const ar = this.autoRestart;

			// console.log('attemptRestart.1', { ar: { ...ar } });

			// if (!ar.enabled || !ar.allowAttempts || ar.attemptsThisCycle >= ar.maxAttempts || this.isActive) {
			if (!ar.enabled || ar.attemptsThisCycle >= ar.maxAttempts || this.isActive) {
				stopAttempts();
				return;
			}
			// Attempt the restart
			ar.attemptsThisCycle++;
			const isValid = this.doAutoRestart();

			// console.log('attemptRestart.2', { ar: { ...ar }, isValid, isActive: this.isActive });

			if (!isValid || this.isActive) {
				stopAttempts();
				return;
			}

			// Check active status again after a short delay
			delay(() => {
				this.isActive && stopAttempts();
			}, 200);

			if (ar.applyExponentialBackoff) {
				ar.applyDelayMs = (1 + ar.attemptsThisCycle) * ar.delayMs;
			}

			delay(attemptRestart, ar.applyDelayMs);
		};

		// ar.allowAttempts = true;
		ar.isRestarting = true;
		ar.restartTimerId = delay(attemptRestart, ar.applyDelayMs);

		// console.log('tryAutoRestart.3', { ar: { ...ar } });
	}

	/**
	 * Runs an auto-restart. A auto-restart is distinct from a manual restart.
	 *
	 * @returns TRUE if the auto-restart attempt was allowed.
	 */
	protected doAutoRestart(): boolean {
		// Disable auto-restarts if the stream is disabled
		if (!this.isEnabled) {
			return false;
		}

		const ar = this.autoRestart;
		// if (!ar.enabled || !ar.allowAttempts) {
		if (!ar.enabled) {
			return false;
		}

		const { isValid, requestProps } = this.validateStart();

		// console.log('doAutoRestart:', { ar: { ...ar }, isValid, requestProps, lastRequestProps: this.lastRequestProps });

		if (!isValid) {
			return false;
		}

		this.forceCancelStream('Stream auto-restarting', StreamStatus.RESTARTING);
		this.runStream(requestProps);

		ar.totalTimesRestarted++;

		return true;
	}

	protected createRpcClient(RpcClient: RpcClientConstructor<RpcClientType>, rpcOptions?: RpcOptions): RpcClientType {
		return new RpcClient(this.url, rpcOptions);
	}

	protected createStreamClient(
		RpcClient: RpcClientConstructor<RpcClientType>,
		rpcOptions?: RpcOptions
	): StreamClient<RpcClientType> {
		const rpcClient = this.createRpcClient(RpcClient, rpcOptions);

		return new StreamClient<RpcClientType>(rpcClient);
	}

	protected debugMsg(msg: string, method?: string) {
		if (msg === '') {
			return '';
		}

		const prefix = `[${this.className}]: ${method ? `${method} - ` : ''}`;

		return `${prefix}${msg}`;
	}

	protected setState(props: IStreamState) {
		this.currentState = { ...this.currentState, ...props };
	}
}

// ---- Exports ----------------------------------------------

export { Stream as default };
export { Stream };
