import { ServiceId } from '@mirage/discovery/id';
import * as services from '@mirage/discovery/services';
import { tagged } from '@mirage/service-logging';
import { addDisplayStat, namespace } from '@mirage/service-operational-metrics';
import { logPageLoadMilestoneOnce } from '@mirage/service-operational-metrics/page-load';
import { getCombineAsyncRequestsFunc } from '@mirage/shared/util/combine-async-requests';
import {
  nonEmpty,
  nonNil,
  nowMicros,
  sleepMs,
} from '@mirage/shared/util/tiny-utils';
import { LargeKvStore } from '@mirage/storage/large-kv-store/interface';
import i18n from '@mirage/translations';
import cloneDeep from 'lodash/cloneDeep';
import { type Observable, Subject } from 'rxjs';
import { v4 as uuidv4 } from 'uuid';
import { StackItemCreationFields } from '../types';
import { StacksBolt } from './bolt';
import { StacksCache } from './cache';
import { LOAD_STACK_DELAY_MS, StacksSync } from './sync';
import { getTitleOfUrl } from './url-title';
import {
  asShortcut,
  generateStacksMaxSortKey,
  getStackItemChangedFieldsFromOldItems,
  reduceToStackItem,
  sortStacksByPinStatusAndHlc,
  stackHasShareId,
  toValidFileName,
} from './utils';

import type { stacks, users, version_tree } from '@dropbox/api-v2-client';
import type { APIv2Callable } from '@mirage/service-dbx-api/service';
import type { LogoutServiceConsumerContract } from '@mirage/service-logout';

const logger = tagged('service-stacks');

// Number of stacks to load in one batch during the initial load.
// A bigger number gives a faster loading speed, but increases the possibility
// of load failures when the users have huge stacks.
const LOAD_BATCH_SIZE = 15;

export interface StackUpsertId {
  type: 'nsId' | 'requestId';
  id: string;
}

// Used to return data for all stacks loaded
export type QueryStacksObservableResult = {
  numStacks: number;
  pinnedStackNsIds: string[];
};

export type StacksCompleteEvent = {
  initialLoad: boolean;
  stacks: stacks.Stack[];
};

interface OptimisticStackUpdateEvent {
  requestId: string;
  eventType: 'optimisticEdit';
  stack: stacks.Stack;
  stackItemsForStack: stacks.StackItem[];
}

interface StackCreationEvent {
  requestId: string;
  eventType: 'stackCreated';
  namespaceId: string;
}

export type StackMutationEvent =
  | StackCreationEvent
  | OptimisticStackUpdateEvent;

export type Service = ReturnType<typeof provideStacksService>;

type DbxApiServiceContract = {
  callApiV2: APIv2Callable;
};

type AuthServiceContract = {
  getCurrentAccount(refresh?: boolean): Promise<users.FullAccount | undefined>;
};

// Tracks created stacks via requestId to prevent multiple stack creations
type StackMutationRequestIdToCreatedNsId = Map<string, string>;

type StackNsIdToLastKnownHLC = Map<string, version_tree.Hlc | undefined>;

const getDefaultStackName = () => i18n.t('untitled_stack');
const DEFAULT_STACK_DESCRIPTION = '';

export default function provideStacksService(
  largeKvStore: LargeKvStore,
  dbxApiService: DbxApiServiceContract,
  authService: AuthServiceContract,
  logoutService: LogoutServiceConsumerContract,
  isDev: boolean,
  isTest: boolean,
  boltOrigin?: string,
) {
  const stacksCompleteSubject = new Subject<StacksCompleteEvent>();
  const cache = new StacksCache(largeKvStore);
  const sync = new StacksSync(cache, dbxApiService);
  const stackMutationSubject = new Subject<StackMutationEvent>();
  const metrics = namespace('stacks');

  const stackMutationRequestIdToCreatedNsId: StackMutationRequestIdToCreatedNsId =
    new Map<string, string>();
  const stackNsIdToLastKnownHLC: StackNsIdToLastKnownHLC = new Map<
    string,
    version_tree.Hlc
  >();

  const bolt = boltOrigin
    ? new StacksBolt(boltOrigin, isDev, isTest, (hlcMicros) =>
        sync.syncToLatest(hlcMicros),
      )
    : undefined;

  function getStacks() {
    return cache.getStacks();
  }

  // Avoid circular dependency with bolt.
  sync.resetAllBoltData = bolt
    ? (data) => bolt.resetAllBoltData(data)
    : undefined;

  /**
   * Returns true when the stacks data is not undefined. This does not mean
   * that the stacks data is fully loaded.
   */
  async function areStacksReady(): Promise<boolean> {
    const stacks = await cache.getStacks();
    return stacks !== undefined;
  }

  /** Returns true when the stacks data is fully loaded. */
  function areStacksComplete(): boolean {
    return sync.started;
  }

  function stacksCompleteObservable(): Observable<StacksCompleteEvent> {
    return stacksCompleteSubject.asObservable();
  }

  async function getFullStacksOrEmptyArray() {
    return (await cache.getStacks()) ?? [];
  }

  function emitStackMutationEvent(event: StackMutationEvent): void {
    // When the stack is created, update the created nsId for this requestId
    if (event.eventType === 'stackCreated') {
      stackMutationRequestIdToCreatedNsId.set(
        event.requestId,
        event.namespaceId,
      );
    }
    stackMutationSubject.next(event);
  }

  async function loadAllStacksOneByOne(partialStacks: stacks.Stack[]) {
    logPageLoadMilestoneOnce('loadAllStacksOneByOne start');

    let isFirst = true;
    const initialLoad = !(await areStacksReady());

    partialStacks.sort((a, b) => {
      const aIsPinned = a.user_data?.is_pinned ?? false;
      const bIsPinned = b.user_data?.is_pinned ?? false;

      if (aIsPinned === bIsPinned) {
        // Load most recent first.
        return b.max_hlc_micros ?? 0 - (a.max_hlc_micros ?? 0);
      }

      // Load pinned stacks first.
      return aIsPinned ? -1 : 1;
    });

    const nsIdsToLoad = partialStacks.map((partialStack) =>
      nonEmpty(partialStack.namespace_id, 'partialStack.namespace_id'),
    );

    let nsIdsToRetry: string[] = [];

    let currentBatchSize = LOAD_BATCH_SIZE;
    do {
      while (nsIdsToLoad.length) {
        const nsIds = nsIdsToLoad.splice(0, currentBatchSize);

        // Avoid overloading the server and getting the user throttled.
        if (isFirst) {
          isFirst = false;
        } else {
          await sleepMs(LOAD_STACK_DELAY_MS);
        }
        try {
          await queryFullStacksById({ namespaceIds: nsIds });
        } catch (e) {
          // Resume load on failure, otherwise huge stacks will cause all stacks
          // to fail loading.
          logger.warn(
            `Failed to load ${nsIds.length} stacks ${JSON.stringify(nsIds)}`,
            e,
          );
          nsIdsToRetry.push(...nsIds);
        }
      }

      // If the batch size is already 1, then stop retrying, because if we get
      // an unloadable stack, then this loop will be stuck forever.
      if (currentBatchSize === 1) {
        break;
      }

      // If there is anything to retry, halve the batch size and redo loads.
      if (nsIdsToRetry.length) {
        nsIdsToLoad.push(...nsIdsToRetry);
        nsIdsToRetry = [];

        currentBatchSize = Math.max(Math.floor(currentBatchSize / 2), 1);

        logger.info(
          `Retrying load for ${nsIdsToRetry.length} stacks with in batches of ${currentBatchSize}`,
        );
      }
    } while (nsIdsToLoad.length);

    const stacks = await getFullStacksOrEmptyArray();
    bolt?.initOnce(await cache.getUserBoltData(), stacks, 'skipFirstSync');

    sync.startSyncJob();
    stacksCompleteSubject.next({ initialLoad, stacks });
  }

  // No need to run more than one load concurrently. This is needed despite of
  // using `getCombineAsyncRequestsFunc` because we leave a background job
  // runnning after returning from `listStacksIncrementallyCombined`.
  let isLoadingOneByOne = false;
  let shouldLogFirstLoadStackMetrics = false;
  let numStacksToFetchBeforeLogging = 0;

  // Combine all concurrent listStacks calls.
  const listStacksIncrementallyCombined = getCombineAsyncRequestsFunc(
    async () => {
      if (!isLoadingOneByOne) {
        logPageLoadMilestoneOnce('listStacksIncrementallyCombined start');
        shouldLogFirstLoadStackMetrics = (await getStacks()) === undefined;
        const loadStacksStartTime = performance.now();
        isLoadingOneByOne = true;

        try {
          const response = await dbxApiService.callApiV2('stacksQueryStacks', {
            // Need max_hlc to sort stacks.
            fields: { max_hlc: true },
          });

          logPageLoadMilestoneOnce('stacksQueryStacks end');

          cache.setUserBoltData(response.bolt_data);

          if (!response.stacks?.length) {
            // Important: Set stacks to empty when user has no stacks at all.
            cache.setStacks([]);
          } else {
            const partialStacks = response.stacks ?? [];
            numStacksToFetchBeforeLogging = partialStacks.length;

            // Load in the background, don't await for results.
            void loadAllStacksOneByOne(partialStacks).finally(() => {
              logPageLoadMilestoneOnce('loadAllStacksOneByOne end');
              isLoadingOneByOne = false;

              if (
                shouldLogFirstLoadStackMetrics &&
                numStacksToFetchBeforeLogging > 0
              ) {
                const totalLoadTime = performance.now() - loadStacksStartTime;
                metrics.stats('freshLoginStacksLoadTime', totalLoadTime);
              }
            });
          }
        } catch (e) {
          // Clear flag on error, but not in finally.
          isLoadingOneByOne = false;
          throw e;
        }
      }

      // This always return an empty array in the first run. Callers can then
      // listen to stack updates to update the UI automatically.
      return await getFullStacksOrEmptyArray();
    },
  );

  /** Entry point for bolt init. */
  async function listStacksIncrementally(
    refresh = false,
  ): Promise<stacks.Stack[]> {
    if (!refresh) {
      const startMs = performance.now();
      logPageLoadMilestoneOnce('cache.getStacks() start');
      const stacks = await cache.getStacks();
      addDisplayStat(`cache.getStacks() latency`, performance.now() - startMs);

      const boltData = await cache.getUserBoltData();
      if (stacks) {
        if (!isLoadingOneByOne) {
          sync.startSyncJob();
          stacksCompleteSubject.next({
            initialLoad: false, // Cannot be initial load if we have cached stacks.
            stacks: stacks,
          });
          bolt?.initOnce(boltData, stacks, 'doFirstSync');
        }

        logPageLoadMilestoneOnce('Return cached stacks');
        return stacks;
      }
    }

    return await listStacksIncrementallyCombined();
  }

  let firstListStackItemsFetchTime = -1;
  let numStacksFetched = 0;

  // Combine all concurrent listStackItems calls.
  const listStackItemsCombined = getCombineAsyncRequestsFunc(
    async (namespaceId: string) => {
      if (firstListStackItemsFetchTime === -1) {
        firstListStackItemsFetchTime = performance.now();
      }
      const response = await dbxApiService.callApiV2('stacksListStackItems', {
        namespace_id: namespaceId,
      });
      const items = response?.items ?? [];
      await cache.setStackItems(namespaceId, items);

      numStacksFetched += 1;

      if (
        numStacksFetched === numStacksToFetchBeforeLogging &&
        shouldLogFirstLoadStackMetrics
      ) {
        const totalListStacksTime =
          performance.now() - firstListStackItemsFetchTime;
        metrics.stats('freshLoginStackItemsLoadTime', totalListStacksTime);
      }

      return items;
    },
  );

  async function listStackItems(
    namespaceId: string,
    refresh = false,
  ): Promise<{ items: stacks.StackItem[]; cached: boolean }> {
    if (!refresh) {
      const cached = await cache.getStackItems(namespaceId);
      if (cached) return { items: cached, cached: true };
    }

    return { items: await listStackItemsCombined(namespaceId), cached: false };
  }

  async function listStackItemsFromCache(
    namespaceId: string,
  ): Promise<stacks.StackItem[] | undefined> {
    return await cache.getStackItems(namespaceId);
  }

  async function listAllStackItemsFromCache(): Promise<
    { [nsid: string]: stacks.StackItem[] } | undefined
  > {
    logPageLoadMilestoneOnce('listAllStackItemsFromCache start');
    const stacks = await cache.getStacks();
    if (!stacks?.length) return undefined;

    const itemsByNsId = await cache.multiGetStackItems(
      stacks.map((stack) => stack.namespace_id ?? ''),
    );

    logPageLoadMilestoneOnce('listAllStackItemsFromCache end');
    return itemsByNsId;
  }

  async function listAllStackItemsOneByOne(): Promise<void> {
    const stacks = await cache.getStacks();
    if (!stacks?.length) return;

    // Important: Load the pinned stacks first.
    const sortedStacks = sortStacksByPinStatusAndHlc(stacks);
    const pinnedStacks = sortedStacks.filter(
      (stack) => stack.user_data?.is_pinned,
    );
    const unpinnedStacks = sortedStacks.filter(
      (stack) => !stack.user_data?.is_pinned,
    );

    // Load pinned stacks quickly.
    if (pinnedStacks.length) {
      await Promise.all(
        pinnedStacks.map(async (stack) => {
          const nsId = stack.namespace_id ?? '';

          // Results will be saved to cache directly.
          await listStackItems(nsId);
        }),
      );
    }

    // Load unpinned stacks slowly.
    if (unpinnedStacks.length) {
      for (const stack of unpinnedStacks) {
        const nsId = stack.namespace_id ?? '';
        await sleepMs(50);
        await listStackItems(nsId);
      }
    }
  }

  /** Fetch full stacks by namespace ids only. */
  async function queryFullStacksById({
    namespaceIds,
    hlc,
  }: {
    namespaceIds: string[];
    hlc?: version_tree.Hlc;
  }): Promise<stacks.QueryStacksResponse> {
    const response = await dbxApiService.callApiV2('stacksQueryStacks', {
      // If the caller provides an empty list, the api will list all stacks,
      // which is equivalent to call listStacks, which is exactly what we are
      // trying to avoid doing with this function.
      namespace_ids: nonEmpty(namespaceIds, 'args.namespaceIds'),
      hlc: hlc,
      // Always fetch all fields so that we can update the cache in full.
      fields: { all: true },
    });

    // Always update the cache automatically.
    const stacks = [
      ...(await getFullStacksOrEmptyArray()),
      ...(response.stacks ?? []),
    ];
    await cache.setStacks(stacks);

    return response;
  }

  async function callPreviewPublicStack(
    shareId: string,
    hlcMicros?: number,
  ): Promise<{ stack: stacks.Stack; items: stacks.StackItem[] }> {
    const response = await dbxApiService.callApiV2(
      'stacksPreviewPublicStack',
      {
        sharing_id: shareId,
        hlc: { unix_micros: hlcMicros },
      },
      'app',
    );

    const stack = nonNil(response.stack, 'response.stack');
    const items = nonNil(response.items, 'response.items');

    return { stack, items };
  }

  async function previewPublicStack(
    shareId: string,
    options?: {
      onError?: (e: Error) => void;
      hlcMicros?: number;
    },
  ): Promise<{ stack: stacks.Stack; items: stacks.StackItem[] }> {
    const { onError, hlcMicros } = options || {};
    try {
      const response = await callPreviewPublicStack(shareId, hlcMicros);
      return response;
    } catch (e) {
      if (onError) {
        onError(e as Error);
      }
      throw e;
    }
  }

  const previewStackCombined = getCombineAsyncRequestsFunc(
    async (
      shareId: string,
      hlcMicros: number | undefined,
    ): Promise<stacks.PreviewStackResponse> => {
      return await dbxApiService.callApiV2('stacksPreviewStack', {
        sharing_id: shareId,
        hlc: {
          unix_micros: hlcMicros,
        },
      });
    },
  );

  async function callPreviewStack(
    shareId: string,
    hlcMicros?: number,
  ): Promise<{ stack: stacks.Stack; items: stacks.StackItem[] }> {
    const response = await previewStackCombined(shareId, hlcMicros);

    const stack = nonNil(response.stack, 'response.stack');
    const items = nonNil(response.items, 'response.items');

    cache.setStack(stack);
    cache.setStackItems(
      nonNil(stack.namespace_id, 'namespace_id'),
      response.items,
    );

    if (stack.publish_data) {
      cache.setPublishStack(stack);
    }

    return { stack, items };
  }

  /**
   * The error has to be handled async because we may potentially return the
   * cached data first, and then return the error later. But if not already
   * cached, then we will still throw the error.
   *
   * Entry point for bolt init.
   */
  async function previewStack(
    shareId: string,
    options?: {
      // Set to true to not use cached data.
      refresh?: boolean;
      // Set to true to always fetch even when data is cached.
      // Only used when refresh=false.
      fetchInBackground?: boolean;
      onError?: (e: Error) => void;
      hlcMicros?: number;
    },
  ): Promise<{ stack: stacks.Stack; items: stacks.StackItem[] }> {
    const { refresh, fetchInBackground, onError, hlcMicros } = options || {};
    if (!refresh) {
      // Init bolt.
      const stacks = await listStacksIncrementally();
      const stack = stacks?.find((s) => stackHasShareId(s, shareId));
      const items = stack?.namespace_id
        ? await cache.getStackItems(stack.namespace_id)
        : undefined;

      if (stack && items) {
        // Refresh in background, but don't await.
        if (fetchInBackground) {
          void callPreviewStack(shareId, hlcMicros).catch(onError);
        }

        return { stack, items };
      }
    }

    try {
      const response = await callPreviewStack(shareId, hlcMicros);

      // Init bolt, but no need to wait for completion.
      void listStacksIncrementally();

      return response;
    } catch (e) {
      if (onError) {
        onError(e as Error);
      }
      throw e;
    }
  }

  /**
   * Creates a stack. Calling flow should ensure to acquire the user write lock
   */
  async function createStackWithLock(
    name: string,
    description: string,
    emoji?: string,
    colorIndex?: number,
    isPinned: boolean = false,
    creationTypeTag: stacks.StackCreationType['.tag'] = 'manual',
  ): Promise<{ stack: stacks.Stack; hlc: version_tree.Hlc | undefined }> {
    name = nonEmpty(toValidFileName(name), 'name');

    let namespaceId: string | undefined = undefined;

    try {
      const response = await dbxApiService.callApiV2('stacksCreateStack', {
        stack_data: {
          name,
          description,
          emoji,
          color_index: colorIndex,
          creation_type: { '.tag': creationTypeTag },
        },
      });

      const stack = nonNil(response.stack, 'response.stack');
      namespaceId = nonEmpty(stack.namespace_id, 'stack.namespace_id');

      cache.setStack(stack);
      cache.setStackItems(namespaceId, []);
      return { stack, hlc: response.hlc };
    } finally {
      if (namespaceId && isPinned) {
        toggleStackIsPinned(namespaceId);
      }
    }
  }

  async function createStack(
    name: string,
    description: string,
    emoji?: string,
    colorIndex?: number,
    isPinned: boolean = false,
    creationTypeTag: stacks.StackCreationType['.tag'] = 'manual',
  ): Promise<stacks.Stack> {
    let stack: stacks.Stack;
    const releaseLock = await sync.acquireUserWriteLock();
    try {
      stack = (
        await createStackWithLock(
          name,
          description,
          emoji,
          colorIndex,
          isPinned,
          creationTypeTag,
        )
      ).stack;
    } finally {
      releaseLock();
    }
    return stack;
  }

  async function createStackSection(
    namespaceId: string,
    section: string,
  ): Promise<stacks.UpdateStackResponse> {
    section = section.trim();

    const oldStack = await cache.requireStack(namespaceId);
    const newStack = cloneDeep(oldStack);

    if (!newStack.stack_data) {
      newStack.stack_data = {};
    }

    const sections = [
      ...(<[]>newStack?.stack_data.section_data?.sections),
      {
        id: uuidv4(),
        title: section.trim(),
      },
    ];

    return updateStack(namespaceId, [
      {
        field: {
          '.tag': 'section_data_update',
          sections,
        },
      },
    ]);
  }

  /**
   * Updates a stack. Calling flow should ensure to acquire the user write lock
   * @param providedOldStack The old stack to use for the update. If not provided,
   * the stack will be fetched from the cache.
   */
  async function updateStackWithLock(
    namespaceId: string,
    updates: stacks.StackDataUpdate[],
    hlc?: version_tree.Hlc,
    providedOldStack?: stacks.Stack,
  ): Promise<stacks.UpdateStackResponse> {
    let undo: (() => void) | undefined;

    try {
      const oldStack =
        providedOldStack || (await cache.requireStack(namespaceId));
      const newStack = cloneDeep(oldStack);
      if (!newStack.stack_data) {
        newStack.stack_data = {};
      }

      // Apply new updates.
      for (const update of updates) {
        const field = update.field;

        switch (field?.['.tag']) {
          case 'name_update':
            newStack.stack_data.name = nonEmpty(
              toValidFileName(field.name_update),
              'name_update',
            );
            break;
          case 'description_update':
            newStack.stack_data.description = field.description_update;
            break;
          case 'emoji_update':
            newStack.stack_data.emoji = field.emoji_update;
            break;
          case 'color_index_update':
            newStack.stack_data.color_index = field.color_index_update;
            break;
          case 'section_data_update':
            newStack.stack_data.section_data = { sections: field.sections };
            break;
          case 'version_update':
            newStack.stack_data.version = field.version_update;
            break;
          case 'archive_update':
            newStack.stack_data.archive_data = {
              ...newStack.stack_data.archive_data,
              is_archived: field.is_archived,
            };
            break;
          case 'other':
          case undefined:
            break;
          default:
            field satisfies never;
        }
      }

      newStack.max_hlc_micros = nowMicros();

      // Eager write to store before calling api.
      cache.setStack(newStack);

      undo = () => cache.revertStack(oldStack, newStack);

      const response = await dbxApiService.callApiV2('stacksUpdateStack', {
        namespace_id: namespaceId,
        stack_data_updates: updates,
        hlc,
      });

      // Warning: response.stack does not contain the sharing data,
      // so we have to fill it in.
      const stack = nonNil(response.stack, 'response.stack');
      stack.sharing_data = newStack.sharing_data;

      cache.setStack(stack);

      if (stack.publish_data) {
        cache.setPublishStack(stack);
      }

      return response;
    } catch (e) {
      undo?.();
      throw e;
    }
  }

  async function updateStack(
    namespaceId: string,
    updates: stacks.StackDataUpdate[],
  ): Promise<stacks.UpdateStackResponse> {
    const releaseLock = await sync.acquireUserWriteLock();
    let response: stacks.UpdateStackResponse;

    try {
      response = await updateStackWithLock(namespaceId, updates);
    } finally {
      releaseLock();
    }
    return response;
  }

  /**
   * Updates a stack with the provided namespaceId. Alteratively will create a
   * stack and perform the update if upsertData type is `requestId`.
   * The optimistic stack update and the eventually
   * created stack will be emitted to the upsertStacksObservable.
   * @param upsertData The namespaceId or requestId for the stack.
   * namespaceId is used to update an existing stack, requestId is used to create
   * a new stack. requestId will be emitted in the stack edit progress event.
   * @param updates The updates to apply to the stack.
   */
  async function upsertStack(
    upsertData: StackUpsertId,
    updates: stacks.StackDataUpdate[],
  ): Promise<stacks.UpdateStackResponse> {
    let response: stacks.UpdateStackResponse;
    const releaseLock = await sync.acquireUserWriteLock();
    const alreadyCreatedNsIdForRequest =
      upsertData.type === 'requestId'
        ? stackMutationRequestIdToCreatedNsId.get(upsertData.id)
        : undefined;
    try {
      if (upsertData.type === 'requestId' && !alreadyCreatedNsIdForRequest) {
        const tempStackData: stacks.StackData = {
          name: getDefaultStackName(),
          description: DEFAULT_STACK_DESCRIPTION,
        };

        const creationUpdates: stacks.StackDataUpdate[] = [];

        for (const update of updates) {
          if (update.field) {
            switch (update.field['.tag']) {
              case 'name_update':
                tempStackData.name = update.field.name_update;
                creationUpdates.push(update);
                break;
              case 'description_update':
                tempStackData.description = update.field.description_update;
                creationUpdates.push(update);
                break;
              case 'emoji_update':
                tempStackData.emoji = update.field.emoji_update;
                creationUpdates.push(update);
                break;
              case 'color_index_update':
                tempStackData.color_index = update.field.color_index_update;
                creationUpdates.push(update);
                break;
              case 'section_data_update':
                tempStackData.section_data = {
                  sections: update.field.sections,
                };
                break;
              case 'version_update':
                tempStackData.version = update.field.version_update;
                break;
              case 'other':
              case 'archive_update': // Archive update is not allowed for creation
              case undefined:
                break;
              default:
                update.field satisfies never;
            }
          }
        }
        emitStackMutationEvent({
          requestId: upsertData.id,
          eventType: 'optimisticEdit',
          stack: { stack_data: tempStackData } as stacks.Stack,
          stackItemsForStack: [],
        });

        const { stack: newStack, hlc: createdHlc } = await createStackWithLock(
          tempStackData.name || getDefaultStackName(),
          tempStackData.description || DEFAULT_STACK_DESCRIPTION,
          tempStackData.emoji,
          tempStackData.color_index,
        );
        const namespaceId = nonEmpty(
          newStack.namespace_id,
          'newStack.namespace_id',
        );

        // Apply any additional updates not covered in creation
        const additionalUpdates = updates.filter(
          (update) => !creationUpdates.includes(update),
        );
        if (additionalUpdates.length > 0) {
          response = await updateStackWithLock(
            namespaceId,
            additionalUpdates,
            createdHlc,
            newStack,
          );
          stackNsIdToLastKnownHLC.set(namespaceId, response.hlc);
        } else {
          stackNsIdToLastKnownHLC.set(namespaceId, createdHlc);
          response = { stack: newStack };
        }
        emitStackMutationEvent({
          requestId: upsertData.id,
          eventType: 'stackCreated',
          namespaceId,
        });
      } else {
        // If we have already created a stack for this requestId, update that
        // created nsId instead
        if (alreadyCreatedNsIdForRequest) {
          // Sleep to ensure the stack is created before updating
          await sleepMs(LOAD_STACK_DELAY_MS);

          const lastKnownHLC = stackNsIdToLastKnownHLC.get(
            alreadyCreatedNsIdForRequest,
          );
          response = await updateStackWithLock(
            alreadyCreatedNsIdForRequest,
            updates,
            lastKnownHLC,
          );
          stackNsIdToLastKnownHLC.set(
            alreadyCreatedNsIdForRequest,
            response.hlc,
          );
        } else {
          const lastKnownHLC = stackNsIdToLastKnownHLC.get(upsertData.id);
          response = await updateStackWithLock(
            upsertData.id,
            updates,
            lastKnownHLC,
          );
          stackNsIdToLastKnownHLC.set(upsertData.id, response.hlc);
        }
      }
    } finally {
      releaseLock();
    }
    return response;
  }

  async function togglePinWithLock({
    namespaceId,
    hlc,
    providedStack,
    sortKey: providedSortKey,
    isPinned: providedIsPinned,
  }: {
    namespaceId: string;
    hlc?: version_tree.Hlc;
    providedStack?: stacks.Stack;
    sortKey?: string;
    isPinned?: boolean;
  }): Promise<stacks.PinStackResponse> {
    const oldStack = providedStack || (await cache.requireStack(namespaceId));
    const newStack = cloneDeep(oldStack);

    const isPinned = providedIsPinned ?? !oldStack.user_data?.is_pinned;
    let sortKey = providedSortKey;
    if (!sortKey) {
      const pinnedStacks = (await getFullStacksOrEmptyArray()).filter(
        (s) => s.user_data?.is_pinned,
      );
      sortKey = generateStacksMaxSortKey(pinnedStacks);
    }

    newStack.user_data = {
      ...newStack.user_data,
      is_pinned: isPinned,
      pinned_sort_key: sortKey,
      pinned_ms: Date.now(),
    };

    cache.setStack(newStack);
    const undo = () => cache.revertStack(oldStack, newStack);

    try {
      const response = await dbxApiService.callApiV2('stacksPinStack', {
        namespace_id: namespaceId,
        is_pinned: isPinned,
        sort_key: sortKey,
        hlc,
      });

      newStack.max_hlc_micros = nonNil(
        response.hlc,
        'response.hlc',
      ).unix_micros;
      cache.setStack(newStack);

      return response;
    } catch (e) {
      undo();
      throw e;
    }
  }

  async function toggleStackIsPinned(
    namespaceId: string,
  ): Promise<stacks.PinStackResponse> {
    const releaseLock = await sync.acquireUserWriteLock();
    try {
      return await togglePinWithLock({ namespaceId });
    } finally {
      releaseLock();
    }
  }

  async function sortPinnedStack(
    namespaceId: string,
    sortKey: string,
  ): Promise<stacks.PinStackResponse> {
    const releaseLock = await sync.acquireUserWriteLock();
    try {
      return await togglePinWithLock({ namespaceId, sortKey, isPinned: true });
    } finally {
      releaseLock();
    }
  }

  /**
   * Toggles the pinned state of a stack. If the stack does not exist, it will
   * be created first and pinned.
   * @param upsertData The namespaceId or requestId for the stack.
   * namespaceId is used to toggle an existing stack, requestId is used to create
   * a new stack. requestId will be emitted in the stack edit progress event.
   */
  async function upsertToggleStackIsPinned(
    upsertData: StackUpsertId,
  ): Promise<stacks.PinStackResponse & { nsId?: string }> {
    const releaseLock = await sync.acquireUserWriteLock();
    const alreadyCreatedNsIdForRequest =
      upsertData.type === 'requestId'
        ? stackMutationRequestIdToCreatedNsId.get(upsertData.id)
        : undefined;
    try {
      if (upsertData.type === 'requestId' && !alreadyCreatedNsIdForRequest) {
        emitStackMutationEvent({
          requestId: upsertData.id,
          eventType: 'optimisticEdit',
          stack: {
            stack_data: {
              name: getDefaultStackName(),
              description: DEFAULT_STACK_DESCRIPTION,
            },
            user_data: {
              // Assume newly created stacks are not pinned
              is_pinned: true,
            },
          },
          stackItemsForStack: [],
        });

        // Create a new stack if it does not exist
        const { stack, hlc } = await createStackWithLock(
          getDefaultStackName(),
          DEFAULT_STACK_DESCRIPTION,
          undefined,
          undefined,
        );
        const namespaceId = nonEmpty(stack.namespace_id, 'stack.namespace_id');

        const response = await togglePinWithLock({
          namespaceId,
          hlc,
          providedStack: stack,
        });

        emitStackMutationEvent({
          requestId: upsertData.id,
          eventType: 'stackCreated',
          namespaceId,
        });

        return { ...response, nsId: namespaceId };
      } else {
        // If we have already created a stack for this requestId, update that
        // created nsId instead
        if (alreadyCreatedNsIdForRequest) {
          await sleepMs(LOAD_STACK_DELAY_MS);
          return await togglePinWithLock({
            namespaceId: alreadyCreatedNsIdForRequest,
          });
        } else {
          return await togglePinWithLock({ namespaceId: upsertData.id });
        }
      }
    } finally {
      releaseLock();
    }
  }

  async function deleteStack(
    namespaceId: string,
  ): Promise<stacks.DeleteStacksResponse> {
    let undo: (() => void) | undefined;

    const releaseLock = await sync.acquireUserWriteLock();
    try {
      const oldStack = await cache.requireStack(namespaceId);
      const oldItems = await cache.getStackItems(namespaceId);

      const oldPublishedStack = await cache.getPublishedStack(namespaceId);

      // Eager write to store before calling api.
      cache.removeStack(namespaceId);
      cache.setStackItems(namespaceId, undefined);

      cache.unpublishStack(namespaceId);

      undo = () => {
        cache.setStack(oldStack);
        cache.setStackItems(namespaceId, oldItems);

        if (oldPublishedStack) {
          cache.setPublishStack(oldPublishedStack);
        }
      };

      return await dbxApiService.callApiV2('stacksDeleteStacks', {
        namespace_ids: [namespaceId],
      });
    } catch (e) {
      undo?.();
      throw e;
    } finally {
      releaseLock();
    }
  }

  async function leaveStack(
    namespaceId: string,
    email: string,
  ): Promise<stacks.RemoveStackSharingMembersResponse> {
    let undo: (() => void) | undefined;

    const releaseLock = await sync.acquireUserWriteLock();
    try {
      const oldStack = await cache.requireStack(namespaceId);
      const oldItems = await cache.getStackItems(namespaceId);

      // Eager write to store before calling api.
      cache.removeStack(namespaceId);
      cache.setStackItems(namespaceId, undefined);

      undo = () => {
        cache.setStack(oldStack);
        cache.setStackItems(namespaceId, oldItems);
      };

      return await dbxApiService.callApiV2('stacksRemoveStackSharingMembers', {
        namespace_id: namespaceId,
        members: [{ email: email, '.tag': 'email' }],
      });
    } catch (e) {
      undo?.();
      throw e;
    } finally {
      releaseLock();
    }
  }

  async function createStackItemWithLock(
    namespaceId: string,
    url: string,
    metadataTitle?: string,
    sectionId?: string,
    sortKey?: string,
    id3p?: string,
    brandedType?: string,
  ): Promise<stacks.StackItem> {
    const account = await authService.getCurrentAccount();
    const createResponse = await dbxApiService.callApiV2(
      'stacksCreateNativeStackItem',
      {
        parent_ns_id: namespaceId,
        filename: metadataTitle,
        data: {
          url,
          parent_section_id: sectionId,
          creator_email: account?.email,
          // Note: In scout mirage ipc, the sortKey gets passed as `null` even
          // though the UI sent `undefined`. We are unable to root cause this
          // so far, but putting a one-off fix to unblock this scenario for now.
          sort_key: sortKey ?? undefined,
          metadata_title: metadataTitle,
          id_3p: id3p,
          branded_type: brandedType,
        },
      },
    );
    const listResponse = await dbxApiService.callApiV2('stacksListStackItems', {
      namespace_id: namespaceId,
      hlc: createResponse.hlc,
    });
    const newItems = nonNil(listResponse.items, 'listResponse.items');
    const newItem = nonNil(
      newItems.find(
        (item) =>
          item['.tag'] === 'shortcut' &&
          item.api_file_id === createResponse.api_file_id,
      ),
      'newItem',
    );
    await cache.setStackItems(namespaceId, newItems);
    // We could technically update the stack hlc here, but since it will only
    // be slightly different from the hlc we generated, in practice, it would
    // not make any difference at all.
    return newItem;
  }

  async function createStackItem(
    namespaceId: string,
    url: string,
    metadataTitle?: string,
    sectionId?: string,
    sortKey?: string,
    brandedType?: string,
    id3p?: string,
  ): Promise<stacks.StackItem> {
    const releaseLock = await sync.acquireUserWriteLock();
    try {
      const effectiveMetadataTitle =
        metadataTitle || getTitleOfUrl(url, metadataTitle);
      const validMetadataTitle = nonEmpty(
        toValidFileName(effectiveMetadataTitle),
        'metadataTitle',
      );

      return await createStackItemWithLock(
        namespaceId,
        url,
        validMetadataTitle,
        sectionId,
        sortKey,
        id3p,
        brandedType,
      );
    } finally {
      releaseLock();
    }
  }

  async function ensureStackAndCreateItem(
    upsertData: StackUpsertId,
    url: string,
    metadataTitle?: string,
    sectionId?: string,
    sortKey?: string,
    id3p?: string,
    brandedType?: string,
  ): Promise<stacks.StackItem & { nsId?: string }> {
    const releaseLock = await sync.acquireUserWriteLock();
    const alreadyCreatedNsIdForRequest =
      upsertData.type === 'requestId'
        ? stackMutationRequestIdToCreatedNsId.get(upsertData.id)
        : undefined;

    try {
      let namespaceId = upsertData.id;
      let newItem;
      const effectiveMetadataTitle =
        metadataTitle || getTitleOfUrl(url, metadataTitle);
      const validMetadataTitle = nonEmpty(
        toValidFileName(effectiveMetadataTitle),
        'metadataTitle',
      );

      if (upsertData.type === 'requestId' && !alreadyCreatedNsIdForRequest) {
        const optimisticItem: stacks.StackItem = {
          '.tag': 'shortcut',
          url,
          metadata_title: validMetadataTitle,
          parent_section_id: sectionId,
          sort_key: sortKey,
          api_file_id: uuidv4(), // Temporary id
          id_3p: id3p,
          branded_type: brandedType,
        };

        // Emit the optimistic edit before creating the stack
        emitStackMutationEvent({
          requestId: upsertData.id,
          eventType: 'optimisticEdit',
          stack: {
            stack_data: {
              name: getDefaultStackName(),
              description: DEFAULT_STACK_DESCRIPTION,
            },
          },
          stackItemsForStack: [optimisticItem],
        });

        // Create a new stack
        const { stack } = await createStackWithLock(
          getDefaultStackName(),
          DEFAULT_STACK_DESCRIPTION,
        );
        namespaceId = nonEmpty(stack.namespace_id, 'stack.namespace_id');

        // Create a new item associated with the new stack
        newItem = await createStackItemWithLock(
          namespaceId,
          url,
          validMetadataTitle,
          sectionId,
          sortKey,
          id3p,
          brandedType,
        );

        // Emit stack creation event
        emitStackMutationEvent({
          requestId: upsertData.id,
          eventType: 'stackCreated',
          namespaceId,
        });
      } else {
        // If we have already created a stack for this requestId, update that
        // created nsId instead
        if (alreadyCreatedNsIdForRequest) {
          await sleepMs(LOAD_STACK_DELAY_MS);
          newItem = await createStackItemWithLock(
            alreadyCreatedNsIdForRequest,
            url,
            validMetadataTitle,
            sectionId,
            sortKey,
            id3p,
            brandedType,
          );
        } else {
          // For existing stacks, create the item directly
          newItem = await createStackItemWithLock(
            namespaceId,
            url,
            validMetadataTitle,
            sectionId,
            sortKey,
            id3p,
            brandedType,
          );
        }
      }

      return { ...newItem, nsId: namespaceId };
    } finally {
      releaseLock();
    }
  }

  // TODO: add creator email, parent_section_id
  async function createStackItemsBatch(
    namespaceId: string,
    items: StackItemCreationFields[],
  ): Promise<void> {
    const releaseLock = await sync.acquireUserWriteLock();
    try {
      const createResponse = await dbxApiService.callApiV2(
        'stacksCreateNativeStackItemBatch',
        {
          parent_ns_id: namespaceId,
          item_list: items.map((item): stacks.StackItemWithFilename => {
            return {
              filename: item.metadata_title,
              data: {
                metadata_title: item.metadata_title,
                url: item.url,
                sort_key: item.sort_key,
              },
            };
          }),
        },
      );
      const listResponse = await dbxApiService.callApiV2(
        'stacksListStackItems',
        {
          namespace_id: namespaceId,
          hlc: createResponse.hlc,
        },
      );
      const newItems = nonNil(listResponse.items, 'listResponse.items');
      await cache.setStackItems(namespaceId, newItems);
      return;
    } finally {
      releaseLock();
    }
  }

  async function callUpdateStackItem(
    stackItem: Partial<stacks.StackItemShortcut>,
    update_last_modified_time?: boolean,
  ) {
    const fileId = nonNil(stackItem.api_file_id, 'api_file_id');
    const updates: stacks.StackItemDataUpdate[] = [];

    if (stackItem.url) {
      updates.push({
        item_field: {
          '.tag': 'url_update',
          url_update: stackItem.url,
        },
      });
    }

    if (stackItem.sort_key) {
      updates.push({
        item_field: {
          '.tag': 'sort_key_update' as const,
          sort_key_update: stackItem.sort_key,
        },
      });
    }

    if (stackItem.parent_section_id) {
      updates.push({
        item_field: {
          '.tag': 'parent_section_id_update' as const,
          parent_section_id_update: stackItem.parent_section_id,
        },
      });
    }

    if (stackItem.description) {
      // Only include this if current user is also the description owner
      // otherwise update will fail due to an invalid user trying to update
      // the description
      const account = await authService.getCurrentAccount();
      if (account?.email === stackItem.description.creator_email) {
        updates.push({
          item_field: {
            '.tag': 'description_update' as const,
            type: stackItem.description.type,
            last_modified_time_utc_sec:
              stackItem.description.last_modified_time_utc_sec,
            content: stackItem.description.content,
            creator_email: stackItem.description.creator_email,
          },
        });
      }
    }

    if (stackItem.custom_name) {
      updates.push({
        item_field: {
          '.tag': 'custom_name_update' as const,
          custom_name_update: stackItem.custom_name,
        },
      });
    }

    if (stackItem.metadata_title) {
      updates.push({
        item_field: {
          '.tag': 'metadata_title_update' as const,
          metadata_title_update: stackItem.metadata_title,
        },
      });
    }

    return dbxApiService.callApiV2('stacksUpdateStackItem', {
      file_id: fileId,
      stack_data_updates: updates,
      update_last_modified_time: update_last_modified_time,
    });
  }

  async function updateStackItem<T extends stacks.StackItem>(
    namespaceId: string,
    stackItemSuperset: T,
    updateLastModifiedTime?: boolean,
  ): Promise<stacks.UpdateStackItemResponse | undefined> {
    let undo: (() => void) | undefined;
    // Sometimes a stackItem comes populated with unwanted metadata title fields, which must be stripped out
    const stackItem = reduceToStackItem(stackItemSuperset);

    const fileId = nonEmpty(asShortcut(stackItem).api_file_id, 'fileId');

    const releaseLock = await sync.acquireUserWriteLock();
    try {
      const oldStack = await cache.requireStack(namespaceId);
      const oldItems = await cache.requireStackItems(namespaceId);

      const newItem = cloneDeep(stackItem);
      const newItemChangedFields = getStackItemChangedFieldsFromOldItems(
        newItem,
        oldItems,
      );
      const newItems = oldItems.map((item) =>
        asShortcut(item).api_file_id === fileId ? newItem : item,
      );

      const hlcMicros = nowMicros();

      asShortcut(newItem).hlc_micros = hlcMicros;
      const newStack = cloneDeep(oldStack);
      newStack.max_hlc_micros = hlcMicros;

      // Eager write to store before calling api.
      cache.setStackItems(namespaceId, newItems);
      cache.setStack(newStack);

      undo = () => {
        cache.revertStack(oldStack, newStack);
        cache.revertStackItems(namespaceId, oldItems, newItems);
      };

      return await callUpdateStackItem(
        newItemChangedFields,
        updateLastModifiedTime,
      );
    } catch (e) {
      undo?.();
      throw e;
    } finally {
      releaseLock();
    }
  }

  async function deleteStackItem(
    namespaceId: string,
    fileId: string,
  ): Promise<stacks.DeleteStackItemResponse> {
    let undo: (() => void) | undefined;

    const releaseLock = await sync.acquireUserWriteLock();
    try {
      const oldStack = await cache.requireStack(namespaceId);
      const oldItems = await cache.requireStackItems(namespaceId);

      const hlcMicros = nowMicros();

      const newStack = cloneDeep(oldStack);
      newStack.max_hlc_micros = hlcMicros;

      const newItems = oldItems.filter(
        (item) => asShortcut(item).api_file_id !== fileId,
      );

      // Eager write to store before calling api.
      cache.setStackItems(namespaceId, newItems);
      cache.setStack(newStack);

      undo = () => {
        cache.revertStack(oldStack, newStack);
        cache.revertStackItems(namespaceId, oldItems, newItems);
      };

      return await dbxApiService.callApiV2('stacksDeleteStackItem', {
        file_id: fileId,
      });
    } catch (e) {
      undo?.();
      throw e;
    } finally {
      releaseLock();
    }
  }

  async function publishStack(
    sharingId: string,
  ): Promise<stacks.PublishStackResponse> {
    const releaseLock = await sync.acquireUserWriteLock();
    try {
      const publishResponse = await dbxApiService.callApiV2(
        'stacksPublishStack',
        {
          sharing_id: sharingId,
        },
      );
      if (publishResponse.stack) {
        await cache.setPublishStack(publishResponse.stack);
      }
      return publishResponse;
    } catch (e) {
      logger.error(`Failed to publish stack for ${sharingId}`, e);
      throw e;
    } finally {
      releaseLock();
    }
  }

  async function unpublishStack(
    sharingId: string,
  ): Promise<stacks.UnpublishStackResponse> {
    const releaseLock = await sync.acquireUserWriteLock();
    try {
      const unpublishResponse = await dbxApiService.callApiV2(
        'stacksUnpublishStack',
        {
          sharing_id: sharingId,
        },
      );
      if (unpublishResponse.stack && unpublishResponse.stack.namespace_id) {
        await cache.unpublishStack(unpublishResponse.stack.namespace_id);
      }
      return unpublishResponse;
    } catch (e) {
      logger.error(`Failed to unpublish stack for ${sharingId}`, e);
      throw e;
    } finally {
      releaseLock();
    }
  }

  async function listPublishedContent(
    hlc?: version_tree.Hlc,
    refresh = false,
  ): Promise<stacks.PublishedContent[] | undefined> {
    try {
      if (refresh) {
        const listResponse = await dbxApiService.callApiV2(
          'stacksListPublishedContent',
          {
            hlc: hlc,
          },
        );
        if (listResponse) {
          await cache.setPublishedContents(listResponse.published || []);
          return listResponse.published;
        }
        return [];
      } else {
        return await cache.getPublishedContents();
      }
    } catch (e) {
      logger.error(`Failed to load list published contents`, e);
      throw e;
    }
  }

  function stacksObservable(): Observable<stacks.Stack[]> {
    return cache.stacksSubject.asObservable();
  }

  function stackItemsObservable(): Observable<{
    namespaceId: string;
    items: stacks.StackItem[];
  }> {
    return cache.stackItemsSubject.asObservable();
  }

  function stackPublishedContentsObservable(): Observable<
    stacks.PublishedContent[]
  > {
    return cache.publishedContentSubject.asObservable();
  }

  // NOTE: We don't actually need `tearDown` to be async in the stacks service,
  // but it's async in other services so that it can be awaited, so keep that
  // pattern up here for consistency
  async function tearDown() {
    sync.stopSyncJob();
    bolt?.stop();
  }

  async function clear() {
    await largeKvStore.clear();
  }

  logoutService.registerLogoutCallback(ServiceId.STACKS, async () => {
    await clear();
    await tearDown();
  });

  return services.provide(
    ServiceId.STACKS,
    {
      getStacks,
      areStacksReady,
      areStacksComplete,
      stacksCompleteObservable,
      listStacksIncrementally,
      queryFullStacksById,
      listStackItems,
      listStackItemsFromCache,
      listAllStackItemsFromCache,
      listAllStackItemsOneByOne,
      previewPublicStack,
      previewStack,
      createStack,
      updateStack,
      upsertStack,
      toggleStackIsPinned,
      upsertToggleStackIsPinned,
      sortPinnedStack,
      deleteStack,
      leaveStack,
      createStackItem,
      ensureStackAndCreateItem,
      createStackItemsBatch,
      updateStackItem,
      deleteStackItem,
      createStackSection,
      publishStack,
      unpublishStack,
      listPublishedContent,
      stacksObservable,
      stackMutationObservable: () => stackMutationSubject.asObservable(),
      stackItemsObservable,
      stackPublishedContentsObservable,
      tearDown,
      clear,
    },
    [ServiceId.DBX_API],
  );
}
