import * as Yup from 'yup'

import {
  ContentStreamMessageSchema,
  ContextStreamMessageSchema,
  DelimiterStreamMessageSchema,
  ErrorStreamMessageSchema,
} from 'utils/streamingValidationSchemas'

import {
  APIError,
  ChunkFetchTimeoutError,
  ConnectionTimeoutError,
  ConversationNotExistsError,
  GPTError,
  NetworkTimeoutError,
  TokenRateLimitingError,
} from './appError'
import { sleep } from './sleep'
import { validateMessagesWithSchemaMap } from './validationUtils'

export type StreamBuffer = {
  lastChunk: string
  chunkProcessed: number
  lastmId?: string
}

export const extractData = async (
  data: Uint8Array,
  buffer: StreamBuffer
): Promise<{ answer: string; delimiter: string; context: string }> => {
  const newChunk = new TextDecoder().decode(data)

  let chunkAsString = buffer.lastChunk
  buffer.chunkProcessed++
  chunkAsString += newChunk

  const lastObjectIndex = chunkAsString.lastIndexOf('===AA!@A===')
  if (lastObjectIndex < chunkAsString.length) {
    if (lastObjectIndex === -1) {
      buffer.lastChunk = chunkAsString
      chunkAsString = ''
    } else {
      buffer.lastChunk = chunkAsString.substring(lastObjectIndex + '===AA!@A==='.length)
      chunkAsString = chunkAsString.substring(0, lastObjectIndex)
    }
  }

  const messages = chunkAsString.split('===AA!@A===')

  // validate extracted messages
  await validateMessages(messages)
  // process validated messages
  return extractMessages(messages, buffer)
}

const validateMessages = async (messages: string[]) => {
  const schemaMap: Record<string, Yup.AnyObjectSchema> = {
    message: ContentStreamMessageSchema,
    context: ContextStreamMessageSchema,
    delimiter: DelimiterStreamMessageSchema,
    error: ErrorStreamMessageSchema,
  }

  await validateMessagesWithSchemaMap(messages, schemaMap)
}

const extractMessages = (messages: string[], buffer: StreamBuffer) => {
  const { answer, delimiter, context } = messages.reduce(
    (acc, currentValue): { answer: string; delimiter: string; context: string } => {
      if (currentValue.trim().length !== 0) {
        let currentMessageObject
        try {
          currentMessageObject = JSON.parse(currentValue)
        } catch (err) {
          throw new APIError(1.1, '', 'Failed to parse chunk', err as Error)
        }
        if (currentMessageObject.type === 'message') {
          if (buffer.lastmId) {
            if (buffer.lastmId !== currentMessageObject.id) {
              if (!currentMessageObject.delta) {
                currentMessageObject.delta = '' //Its teh "empty" message we send in the API.TBD if/how to remove
              }
            }
          } else {
            buffer.lastmId = currentMessageObject.id
          }
          return {
            answer: (acc.answer += currentMessageObject.delta || ''),
            delimiter: acc.delimiter,
            context: acc.context,
          }
        } else if (currentMessageObject.type === 'delimiter') {
          return {
            answer: acc.answer,
            delimiter: acc.delimiter ? `${acc.delimiter}${currentMessageObject.delta}` : currentMessageObject.delta,
            context: acc.context,
          }
        } else if (currentMessageObject.type === 'context') {
          return {
            answer: acc.answer,
            delimiter: acc.delimiter,
            context: acc.context
              ? `${acc.context}${currentMessageObject.context.context}`
              : currentMessageObject.context.context,
          }
        } else if (currentMessageObject.type === 'error') {
          if (currentMessageObject?.errType === 'gpt') {
            throw new GPTError(currentMessageObject.code, undefined, currentMessageObject.delta)
          } else if (currentMessageObject?.errType === 'chunk_timeout') {
            throw new ChunkFetchTimeoutError(currentMessageObject.code, undefined, currentMessageObject.delta)
          } else if (currentMessageObject?.errType === 'connection_timeout') {
            throw new ConnectionTimeoutError(currentMessageObject.code, undefined, currentMessageObject.delta)
          } else if (currentMessageObject?.errType === 'USAGE') {
            throw new TokenRateLimitingError(currentMessageObject.code, undefined, currentMessageObject.delta)
          } else if (currentMessageObject?.errType === 'CONV') {
            throw new ConversationNotExistsError(currentMessageObject.code, undefined)
          } else {
            throw new Error('generic-api-error')
          }
        } else {
          console.error('Error: Unsupported currentMessageObject.type')
          return acc
        }
      } else {
        return acc
      }
    },
    { answer: '', delimiter: '', context: '' }
  )

  return { answer, delimiter, context }
}

export const readFromStream = async (reader: ReadableStreamDefaultReader) => {
  const promises = [reader.read(), sleep(60000)]
  const finishedPromise = await Promise.race(promises)
  if (finishedPromise === 'sleepMethod') {
    throw new NetworkTimeoutError('Read from reader failed', undefined, 60000)
  }

  return finishedPromise
}

// export const throwNetworkError = (m: string, e: unknown, startTime: number, timeOutTime: number) => {
//   const endTimeFetch = new Date().getTime()
//   if (endTimeFetch - startTime > timeOutTime - 1) {
//     //Giving a slack rather precise 60 seconds timeout. anything above 59 seconds is timeout.
//     throw new NetworkTimeoutError(m, e as Error, endTimeFetch - startTime)
//   } else {
//     throw new NetworkError(m, e as Error, endTimeFetch - startTime)
//   }
// }
