import isFunction from 'lodash/isFunction';
import { Client } from './Client';
import { StreamEvents } from './constants';
import {
	IClientResult,
	IRpcResponseStream,
	IRpcServiceResponse,
	IStreamClientListeners,
	IStreamController,
	IStreamMethodOptions,
	RpcServiceStreamMethod,
	RpcStreamStatus,
} from './types/client';

/**
 * Generic streaming RPC client. It doesn't do a whole lot, but it reduces some complexity and allows us to hook in global
 * metadata and interceptors if we choose to.
 */
class StreamClient<RpcClientType> extends Client<RpcClientType> {
	// The client is tied to a specific RPC client (eg. SpokeClient)
	constructor(rpcClient: RpcClientType) {
		super(rpcClient);
	}

	/**
	 * Make the specified streaming call to the RPC client and return a manager object that allows the stream to be controlled.
	 *
	 * @param   rpcMethod  The RPC method to call on the RPC client.
	 * @param   request    The request object to pass to the RPC method.
	 * @param   listeners  Group of Listeners to attach to the stream behaviors.
	 * @param   meta       Additional metadata to pass to the RPC method.
	 * @returns A stream manager object that can be used to control the
	 */
	public stream = <RequestType, ResponseType extends IRpcServiceResponse>(
		rpcMethod: typeof this.rpcClientProps,
		request: RequestType,
		listeners?: IStreamClientListeners<ResponseType>,
		opts?: IStreamMethodOptions
	): IStreamController => {
		const rpcMethodFn = this.getRpcMethodFn<RequestType, ResponseType>(rpcMethod as string);

		if (rpcMethodFn == null) {
			throw new Error('RPC method unavailable for client');
		}

		let stream: Nullable<IRpcResponseStream<ResponseType>> = null;

		// When TRUE we have successfully received data from the server
		let hasReceivedData: boolean = false;

		// Called when an attempt to start the stream is made (before data/status)
		const onTryStart = () => {
			listeners && listeners.onTryStart && listeners.onTryStart();
		};

		// Called when a stream successfully starts (ie. after it first received valid data)
		const onStart = () => {
			listeners && listeners.onStart && listeners.onStart();
		};

		// Called when the `cancel` method is executed.
		const onCancel = (reason?: any) => {
			listeners && listeners.onCancel && listeners.onCancel(reason);
		};

		// Called when the stream sends back any status information
		const onStatus = (status: RpcStreamStatus) => {
			listeners && listeners.onStatus && listeners.onStatus(status);
		};

		// Called when the stream is voluntarily terminated (by either client or server). Not called if the stream is cancelled via `cancel`.
		const onEnd = (status?: RpcStreamStatus) => {
			const endStatus: RpcStreamStatus = { code: 0, details: '', ...(status || {}), isError: false };

			if (endStatus.code === 2 && endStatus.details.startsWith('Response closed') && !hasReceivedData) {
				const message = 'Unable to start stream - possible server url/transport issue, check RPC client';
				endStatus.isError = true;
				endStatus.details = message;
			}

			listeners && listeners.onEnd && listeners.onEnd(endStatus);
			if (endStatus.isError) {
				onError(this.makeErrorResult<ResponseType>({ message: endStatus.details }));
			}
		};

		// Called when the server successfully sends valid data to the client
		const onData = (dataResult: IClientResult<ResponseType>) => {
			if (!hasReceivedData) {
				onStart();
			}

			hasReceivedData = true;
			listeners && listeners.onData && listeners.onData(dataResult);
		};

		// Called when an error has occured
		const onError = (errResult: IClientResult<ResponseType>) => {
			listeners && listeners.onError && listeners.onError(errResult);
		};

		const handleData = (response: ResponseType) => {
			if (isFunction(response.getError)) {
				const errorData = this.extractReplyError(response.getError());

				if (errorData != null) {
					onError(this.makeErrorResult<ResponseType>({ error: errorData }));
					return;
				}
			}

			onData({ success: true, data: response });
		};

		const handleGeneralError = (err: Error) => {
			const errResult = this.makeErrorResult<ResponseType>({ message: err.message });
			onError(errResult);
		};

		const controller: IStreamController = {
			cancel: (reason?: any) => {
				if (!stream) {
					return;
				}

				// This IMMEDIATELY cuts the stream connection on the client, will not execute `onEnd` callback.
				stream.cancel();

				// We shall issue our own `onCancel` event
				onCancel(reason ?? undefined);
			},
		};

		try {
			stream = rpcMethodFn(request, opts?.meta);
			stream.on(StreamEvents.DATA, handleData);
			stream.on(StreamEvents.STATUS, onStatus);
			stream.on(StreamEvents.END, onEnd);

			onTryStart();
		} catch (e) {
			const err = e as Error;
			handleGeneralError(err);
		}

		return controller;
	};

	/**
	 * Dynamically gets a bound version of the named RPC stream method from the RPC client (if it exists).
	 *
	 * @param   rpcMethodName  Name of the method on the RPC client.
	 * @returns A bound version of the method, or NULL if not found.
	 */
	protected getRpcMethodFn = <RequestType, ResponseType>(
		rpcMethodName: string
	): Nullable<RpcServiceStreamMethod<RequestType, ResponseType>> => {
		const rpcMethod = super.getRpcMethodFn(rpcMethodName);

		if (rpcMethod == null) {
			return null;
		}

		return rpcMethod as RpcServiceStreamMethod<RequestType, ResponseType>;
	};
}

export { StreamClient as default };
export { StreamClient, StreamEvents };
