import { ServiceId } from '@mirage/discovery/id';
import * as services from '@mirage/discovery/services';
import { getCurrentAccount } from '@mirage/service-auth';
import { loadAssistKVDataAPI } from '@mirage/service-compose/service/apiDataLoader';
import { callApiV2, grpc } from '@mirage/service-dbx-api';
import { tagged } from '@mirage/service-logging';
import { AccessType } from '@mirage/shared/compose/assist-api';
import Sentry from '@mirage/shared/sentry';
import { isErrorWithMessage } from '@mirage/shared/util/error';
import WithDefaults from '@mirage/storage/with-defaults';
import { EMPTY, from, Observable, Subject } from 'rxjs';
import { catchError, concatMap, finalize, tap } from 'rxjs/operators';
import { v4 as uuid } from 'uuid';
import {
  convertToConversationMessage,
  convertToConversationsChatMessage,
} from '../utils/convertConversationMessages';

import type {
  ConversationAssistantMessage,
  ConversationMessage,
  ConversationUserMessage,
  NewUserMessage,
  WorkflowAgent,
  WorkflowValues,
} from '../types';
import type { context_engine } from '@dropbox/api-v2-client';
import type { ItemDataParams } from '@mirage/service-compose/service/apiDataLoader';
import type { ConversationsResult } from '@mirage/service-dbx-api/service/grpc/context_engine_apiv2/types';
import type { KVStorage } from '@mirage/storage';
import type { Subscription } from 'rxjs';

export type Service = ReturnType<typeof conversationService>;

const logger = tagged('conversation-service');

export enum StorageKey {
  Conversation = 'conversation',
  WorkflowAgents = 'workflow-agents',
}

export type StoredConversation = {
  conversations: { [key: string]: ConversationMessage[] };
  agents: { [key: string]: WorkflowAgent };
};

export default function conversationService(
  rawStorage: KVStorage<StoredConversation>,
) {
  const adapter = new WithDefaults(rawStorage, {
    conversations: {},
    agents: {},
  });
  const userMessages$ = new Subject<NewUserMessage>();
  const conversationUpdates = new Map<string, Subject<ConversationMessage[]>>();
  let workflowAgentsListApiInProgress:
    | Promise<[WorkflowAgent[], WorkflowAgent[]]>
    | undefined;

  // Process user messages as they come in
  // This allows us to queue up messages and process them one at a time
  userMessages$
    .pipe(
      // Ensure only one message is processed at a time
      concatMap((newMessage) => {
        return from(processUserMessage(newMessage)).pipe(
          catchError((error) => {
            logger.error('Failed to process user message:', error);
            Sentry.withScope((scope) => {
              scope.setTag(
                'errorMessage',
                isErrorWithMessage(error) ? error.message : 'unknown',
              );
              Sentry.captureException(error, {}, scope);
              Sentry.captureMessage(
                '[service-conversation] error processing user message',
                'error',
                {},
                scope,
              );
            });
            return EMPTY;
          }),
        );
      }),
    )
    .subscribe();

  // Gets called for each message in the queue
  async function processUserMessage(newMessage: NewUserMessage): Promise<void> {
    const { conversationId, documentIds, message, model } = newMessage;
    // Load the current conversation history from the local KV store
    const history = await getConversation(conversationId);
    // Convert the conversation history to the format required by the GRPC endpoint
    const historyConversationsChatMessage = history.map((message) =>
      convertToConversationsChatMessage(message),
    );

    // Create a pending message to represent the user's message
    const newUserMessage: ConversationMessage = {
      id: `pending-${Date.now()}`,
      timestamp: new Date(),
      messageType: {
        case: 'userMessage',
        value: {
          text: message,
          context: [],
          rephrased: '',
        },
      },
      status: 'pending',
    };

    // Get the subject for this conversation to emit updates
    const subject = getOrCreateConversationSubject(conversationId);

    // Emit the pending message to the conversation subject right away
    // so that it renders in the UI immediately
    const historyWithNewMessage = [...history, newUserMessage];
    subject.next(historyWithNewMessage);

    // Update the local KV store with the pending message while we wait for the GRPC response
    setConversation(conversationId, historyWithNewMessage);

    // Track the latest messages as they are streamed in from the GRPC endpoint
    // This is so we can update the local KV store with the final state once the stream completes
    let latestMessages: ConversationMessage[] = [];

    // Use the GRPC endpoint to get streaming responses
    return new Promise<void>((resolve, reject) => {
      const subscription = grpc
        .getConversationResponse(
          message,
          conversationId,
          historyConversationsChatMessage,
          documentIds,
          model,
        )
        .pipe(
          tap((result: ConversationsResult) => {
            if (result.error) {
              throw new Error(result.error);
            }

            const messages = result.messages.map((message, index, array) =>
              // Convert the GRPC message to a ConversationMessage
              convertToConversationMessage(
                message,
                // If the message is the last one, we assume its the currently streaming message
                // otherwise, we mark it as success (since the API sends down the full history every request)
                index === array.length - 1 ? 'streaming' : 'success',
              ),
            );

            // Store the latest messages every stream emission
            latestMessages = messages;

            // Emit full current messages history to subscribers
            subject.next(messages);
          }),
          finalize(async () => {
            // When the stream completes, update local KV storage with the final state
            // using the latestMessages
            try {
              if (latestMessages.length > 0) {
                setConversation(conversationId, latestMessages);
              }

              resolve();
            } catch (error) {
              reject(error);
            }
          }),
        )
        .subscribe();

      // Return a cleanup function
      return () => {
        subscription.unsubscribe();
      };
    });
  }

  /**
   * Append a manual user message to the conversation without submitting it to the API.
   * @param newMessage The new user message to append.
   */
  async function appendManualMessage(
    newMessage: NewUserMessage,
    messageTypeKey?: 'userMessage' | 'assistantMessage',
  ) {
    const { conversationId, message } = newMessage;
    const history = await getConversation(conversationId);
    let messageType: ConversationUserMessage | ConversationAssistantMessage = {
      case: 'userMessage',
      value: {
        text: message,
        rephrased: '',
        context: [],
      },
    };
    if (messageTypeKey === 'assistantMessage') {
      messageType = {
        case: 'assistantMessage',
        value: {
          text: message,
          cited: [],
          isCached: false,
        },
      };
    }

    const craftedMessage: ConversationMessage = {
      id: uuid(),
      messageType,
      timestamp: new Date(),
      status: 'success',
    };

    const historyWithPending = [...history, craftedMessage];
    await setConversation(conversationId, historyWithPending);
  }

  // Creates a new subject for a conversation id if one doesn't exist
  // otherwise, returns the existing subject
  function getOrCreateConversationSubject(
    conversationId: string,
  ): Subject<ConversationMessage[]> {
    if (!conversationUpdates.has(conversationId)) {
      conversationUpdates.set(conversationId, new Subject());
    }
    return conversationUpdates.get(conversationId)!;
  }

  /**
   * Observe a conversation. This will emit the full conversation history as it changes.
   * @param conversationId The ID of the conversation to observe.
   * @returns An observable that emits the full conversation history.
   */
  function observeConversation(
    conversationId: string,
  ): Observable<ConversationMessage[]> {
    const subject = getOrCreateConversationSubject(conversationId);
    return new Observable((subscriber) => {
      let subscription: Subscription | undefined;

      // First get and emit the initial state from storage
      // Then subscribe to future updates only after initial emission
      getConversation(conversationId)
        .then((messages) => {
          subscriber.next(messages);
          subscription = subject.subscribe(subscriber);
          return subscription;
        })
        .catch((error) => {
          return subscriber.error(error);
        });

      return () => {
        if (subscription) {
          subscription.unsubscribe();
        }
      };
    });
  }

  // Get the conversation history from the local KV store
  // Even once server-side persistence is implemented, we'll want to keep this
  // so that we can quickly load conversations. We will need a way to keep
  // this up to date with the server-side state.
  async function getConversation(
    conversationId: string,
  ): Promise<ConversationMessage[]> {
    const conversations = await adapter.get('conversations');
    return conversations[conversationId] ?? [];
  }

  // Set the conversation history in the local KV store
  async function setConversation(
    conversationId: string,
    messages: ConversationMessage[],
  ): Promise<void> {
    const conversations = await adapter.get('conversations');
    conversations[conversationId] = messages;
    await adapter.set('conversations', conversations);
  }

  /**
   * Post a new user message to the conversation.
   * @param newMessage The new user message to post.
   */
  function postUserMessage(newMessage: NewUserMessage): void {
    userMessages$.next(newMessage);
  }

  /**
   * Get workflow values for a specific agent.
   * @param agentId The ID of the agent to retrieve workflow values for.
   * @returns The workflow values for the specified agent, or an empty object if none exist.
   */
  async function getWorkflowAgent(agentId: string): Promise<WorkflowAgent> {
    const agents = await adapter.get('agents');
    return agents[agentId] ?? {};
  }

  /**
   * Set workflow values for a specific agent.
   * @param agentId The ID of the agent to set workflow values for.
   * @param values The workflow values to set for the agent.
   */
  async function setWorkflowAgent(
    agentId: string,
    values: WorkflowValues,
  ): Promise<WorkflowAgent> {
    const agents = await adapter.get('agents');
    const currentAccount = await getCurrentAccount();
    const existingAgent = agents[agentId];
    agents[agentId] = {
      dataId: existingAgent?.dataId,
      id: agentId,
      icon: values[5].icon as string,
      theme: values[5].theme as string,
      name: values[5].name as string,
      description: values[5].description as string,
      workflow: values,
      createdBy: {
        display_name: currentAccount?.name.display_name ?? 'Unknown',
        profile_photo_url: currentAccount?.profile_photo_url,
        email: currentAccount?.email ?? 'unknown',
      },
      createdAtMs: existingAgent?.createdAtMs ?? Date.now(),
      updatedAtMs: Date.now(),
      accessType: values[5].publish as 'private' | 'company',
    };
    await adapter.set('agents', agents);
    logger.debug(`Updated workflow values for agent: ${agentId}`);
    return agents[agentId];
  }

  async function setWorkflowAgentDataId(agentId: string, dataId: string) {
    const agents = await adapter.get('agents');
    agents[agentId].dataId = dataId;
    await adapter.set('agents', agents);
  }

  async function saveWorkflowAgent(agent: WorkflowAgent) {
    const args: context_engine.AssistSaveUserDataArg = {
      data_item: {
        user_data_type: StorageKey.WorkflowAgents,
        user_data_key: agent.id,
        user_data: JSON.stringify(agent),
      },
      data_id: agent.dataId,
      access_type: {
        '.tag': agent.accessType === 'private' ? 'individual' : 'team',
      },
    };
    logger.log('Saving workflow agent to API...', agent.id);
    const response = await callApiV2(
      'contextEngineAssistApiSaveUserData',
      args,
    );
    if (response.data_id) {
      agent.dataId = response.data_id;
      await setWorkflowAgentDataId(agent.id, agent.dataId);
      // Update the full agent data in storage
      const agents = await adapter.get('agents');
      agents[agent.id] = agent;
      await adapter.set('agents', agents);
    }
    logger.log('saved workflow agent', agent.id);
    return agent;
  }

  async function deleteWorkflowAgent(agent: WorkflowAgent) {
    const args: context_engine.AssistDeleteUserDataArg = {
      data_id: agent.dataId,
    };
    await callApiV2('contextEngineAssistApiDeleteUserData', args);

    const agents = await adapter.get('agents');
    delete agents[agent.id];
    await adapter.set('agents', agents);

    logger.log('deleted workflow agent', agent.id);

    return;
  }

  async function getWorkflowAgents(): Promise<WorkflowAgent[]> {
    const agents = await adapter.get('agents');
    const agentsArray = Object.values(agents);
    if (agentsArray.length === 0) {
      await loadApiWorkflowAgents();
      return getWorkflowAgents();
    }
    return agentsArray;
  }

  async function loadApiWorkflowAgents(): Promise<WorkflowAgent[]> {
    if (!workflowAgentsListApiInProgress) {
      workflowAgentsListApiInProgress = Promise.all([
        loadAssistKVDataAPI<WorkflowAgent>(
          StorageKey.WorkflowAgents,
          parseWorkflowAgentData,
        ),
        loadAssistKVDataAPI<WorkflowAgent>(
          StorageKey.WorkflowAgents,
          parseWorkflowAgentData,
          AccessType.TEAM,
        ),
      ]);
      const [userAgents, teamAgents] = await workflowAgentsListApiInProgress;
      const agents = [...userAgents, ...teamAgents];

      // Convert array to object with agent.id as keys
      const agentsMap = agents.reduce(
        (acc, agent) => {
          if (agent) {
            acc[agent.id] = agent;
          }
          return acc;
        },
        {} as Record<string, WorkflowAgent>,
      );
      await adapter.set('agents', agentsMap);
      workflowAgentsListApiInProgress = undefined;
      return agents;
    }

    // If API call is already in progress, wait for it to complete and return flattened agents
    const [userAgents, teamAgents] = await workflowAgentsListApiInProgress;
    return [...userAgents, ...teamAgents];
  }

  function parseWorkflowAgentData(
    itemData: ItemDataParams,
  ): WorkflowAgent | undefined {
    try {
      const agent: WorkflowAgent = {
        ...JSON.parse(itemData.data),
        dataId: itemData.key,
      };
      return agent;
    } catch (error) {
      logger.error('Failed to parse workflow agent data:', error);
      return undefined;
    }
  }

  return services.provide(
    ServiceId.CONVERSATION,
    {
      postUserMessage,
      observeConversation,
      appendManualMessage,
      getWorkflowAgent,
      setWorkflowAgent,
      getWorkflowAgents,
      saveWorkflowAgent,
      deleteWorkflowAgent,
      loadApiWorkflowAgents,
    },
    [ServiceId.DBX_API],
  );
}
