import { SSE } from 'sse.js';
import { JsonObject } from '../helpers';
import Environment from '../environment';
import { AuthProvider } from './auth.service';
import {
  ThreadContext,
  ThreadMessage,
  getLockFromConversation,
  makeContextFromConversation,
  makeThreadMessage,
} from './thread.service';
import { Citation, UserEventMessage, UserEventRole } from '../models/user-event.model';
import { getTimestamp } from '../date-helpers';

export type StreamPayload = {
  conversationId: string;
  isComplete: boolean;
  messages: { role: string; content: string; citations?: Citation[] }[];
  error?: string;
  responseType: 'history' | 'messages';
  violations?: {
    intent: boolean;
    access: boolean;
    sensitiveData: boolean;
  };
};

export type StreamMessage = {
  threadId: string;
  isComplete: boolean;
  content: string;
  error?: string;
  threadMessage?: ThreadMessage;
  locked?: boolean;
};

export type StreamControlSignal = 'data-source-review' | 'test';

let sseClient: SSE | undefined = undefined;

const startStreamClient = (path: string, data: JsonObject): SSE => {
  sseClient = new SSE(`${Environment.SP_EDGE_API_URL}${path}`, {
    headers: {
      'Content-Type': 'application/json',
      'Surepath-Authorization': `Bearer ${AuthProvider.token}`,
    },
    payload: JSON.stringify(data),
    method: 'POST',
    start: true,
  });

  return sseClient;
};

export const stopStreamClient = (): boolean => {
  if (!sseClient) {
    return false;
  }

  if (sseClient.readyState === EventSource.CLOSED) {
    return false;
  }

  sseClient.close();
  return true;
};

export const streamUserMessage = (
  message: ThreadMessage,
  onMessage: (data: StreamMessage) => void,
  onContext?: (threadContext: ThreadContext) => void,
  onControl?: (controlSignal: StreamControlSignal | null) => void
): boolean => {
  const { threadId, content, shortcut, datasourceIds } = message;

  const postData = {
    messages: [{ role: 'user', content, shortcut, datasourceIds }],
    conversationId: threadId,
  };

  const streamClient = startStreamClient('/message/stream', postData);

  let controlSignal: StreamControlSignal | null = null;
  const handleOnControl = (updatedSignal: StreamControlSignal | null, force?: boolean) => {
    if (!force && updatedSignal === controlSignal) {
      return;
    }
    controlSignal = updatedSignal;
    onControl?.(updatedSignal);
  };

  // reset on start
  handleOnControl(null, true);

  // message events
  streamClient.addEventListener('message', (event: EventTarget & { data: string }) => {
    try {
      const data = JSON.parse(event.data) as StreamPayload;
      const { isComplete, messages, conversationId, responseType, violations } = data;

      if (responseType === 'history') {
        const threadContext = makeContextFromConversation(messages as UserEventMessage[]);
        onContext?.(threadContext);
        return;
      }

      if (responseType !== 'messages') {
        console.error('unknown sse response type', data);
        return;
      }

      // response validation, @todo handle multiple messages
      if (!messages?.length || messages[0].role !== 'assistant') {
        console.error('invalid sse response', data);
        return;
      }

      const content = messages[0].content;
      let threadMessage: ThreadMessage | undefined;
      let threadLocked = false;

      // @todo switch to a more discrete detection method once gateway supports it
      if (String(content).toLowerCase().startsWith('*reviewing data sources')) {
        handleOnControl('data-source-review');
        return;
      }

      if (controlSignal) {
        handleOnControl(null);
      }

      if (isComplete) {
        const systemMessage: UserEventMessage = {
          requestId: '',
          role: messages[0].role as UserEventRole,
          content,
          timestamp: getTimestamp(new Date()) || 0,
          violations: violations || {
            sensitiveData: false,
            intent: false,
            access: false,
          },
          sensitiveDataDetections: [],
          citations: messages[0].citations || [],
        };

        threadMessage = makeThreadMessage(threadId, systemMessage);
        threadLocked = getLockFromConversation([systemMessage]);
      }

      onMessage({
        threadId: conversationId,
        isComplete,
        content,
        error: '',
        threadMessage,
        locked: threadLocked,
      });
    } catch (err) {
      console.error(err);
    }
  });

  // error events
  streamClient.addEventListener('error', (event: EventTarget) => {
    console.error(event);

    onMessage({
      threadId,
      isComplete: true,
      content: '',
      error: 'stream error',
    });
  });

  return true;
};
