import { fromHack } from "adapters/rxjs"
import { CompiledQuery, DatabaseConnection, Kysely, QueryResult, SqliteAdapter, SqliteIntrospector, SqliteQueryCompiler } from "kysely"
import { Ramda } from "namespaces/Ramda"
import { RxJS } from "namespaces/RxJS"
import { Observable, Subject } from "rxjs"
import { ObservableDialect, ObservableDriver } from "services/db/kysely/ObservableKysely"
import { SqliteConnection } from "services/db/sqlite/connection"
import { sqliteTest, zipSqliteResults } from "services/db/sqlite/util"

export class SAHWorkerDatabaseConnection implements DatabaseConnection {

    constructor(readonly connection: SqliteConnection, readonly pool: ConnectionPool) {
    }

    async executeQuery<R>(compiledQuery: CompiledQuery): Promise<QueryResult<R>> {
        /*
        const results = this.connection.pipe(
            singleOrError(() => new Error("The database tab died while a query was executing. Please retry.")),
            X.switchMap(async connection => {
            })
        )*/
        if (!compiledQuery.parameters.every(sqliteTest)) {
            throw new Error("Invalid parameter for this driver.")
        }
        const results = await this.connection.query(compiledQuery.sql, compiledQuery.parameters)
        return {
            rows: zipSqliteResults(results) as R[]
        }
    }

    streamQuery<R>(): AsyncIterableIterator<QueryResult<R>> {
        throw new Error("This driver does not support streaming.");
    }

    release() {
        this.pool.release(this.connection)
    }

}

export class SAHWorkerDialect implements ObservableDialect {

    constructor(private readonly connection: (autoReconnect: boolean) => Observable<() => Promise<SqliteConnection>>, private readonly pipe: <T>(observable: Observable<T>) => Observable<T> = Ramda.identity) {
    }

    createDriver() {
        return new SAHWorkerDriver(this.connection, this.pipe)
    }
    createQueryCompiler() {
        return new SqliteQueryCompiler()
    }
    createAdapter() {
        return new SqliteAdapter()
    }
    createIntrospector(db: Kysely<any>) {
        return new SqliteIntrospector(db)
    }

}

class ConnectionPool {

    private readonly availableConnections = new Array<SqliteConnection>()
    private readonly connections = new Array<SqliteConnection>()

    constructor(readonly opener: () => Promise<SqliteConnection>) {
    }

    async acquire() {
        const connection = this.availableConnections.pop()
        if (connection === undefined) {
            const newConnection = await this.opener()
            this.connections.push(newConnection)
            return newConnection
        }
        return connection
    }
    release(connection: SqliteConnection) {
        this.availableConnections.push(connection)
    }
    async close() {
        await Promise.all(this.connections.map(connection => {
            connection.close()
        }))
    }

}

export class SAHWorkerDriver implements ObservableDriver {

    private readonly closed = new Subject<void>()
    private readonly primaryConnection
    private readonly pool

    constructor(connection: (autoReconnect: boolean) => Observable<() => Promise<SqliteConnection>>, private readonly pipe: <T>(observable: Observable<T>) => Observable<T>) {
        this.pool = connection(false).pipe(RxJS.map(opener => new ConnectionPool(opener)), RxJS.takeUntil(this.closed), RxJS.shareReplay(1))
        this.primaryConnection = connection(true).pipe(RxJS.switchMap(async proxy => await proxy()), RxJS.takeUntil(this.closed), RxJS.shareReplay(1))
    }

    async acquireConnection() {
        const pool = await RxJS.firstValueFrom(this.pool)
        return new SAHWorkerDatabaseConnection(await pool.acquire(), pool)
    }
    async releaseConnection(connection: SAHWorkerDatabaseConnection) {
        connection.release()
    }

    async beginTransaction(connection: SAHWorkerDatabaseConnection) {
        await connection.executeQuery(CompiledQuery.raw("begin immediate"))
    }
    async commitTransaction(connection: SAHWorkerDatabaseConnection) {
        await connection.executeQuery(CompiledQuery.raw("commit"))
    }
    async rollbackTransaction(connection: SAHWorkerDatabaseConnection) {
        await connection.executeQuery(CompiledQuery.raw("rollback"))
    }

    observeQuery<R>(compiledQuery: CompiledQuery): Observable<QueryResult<R>> {
        if (compiledQuery.query.kind !== "SelectQueryNode") {
            throw new Error("You can only observe select queries.")
        }
        const parameters = compiledQuery.parameters
        if (!parameters.every(sqliteTest)) {
            throw new Error("Invalid parameter for this driver.")
        }
        return this.primaryConnection.pipe(
            RxJS.switchMap(connection => {
                return fromHack(connection.observe(compiledQuery.sql, parameters))
            })
        ).pipe(
            RxJS.map(results => {
                return {
                    rows: zipSqliteResults(results) as R[]
                }
            }),
            RxJS.distinctUntilChanged((a, b) => Ramda.equals(a, b)),
            this.pipe
        )
    }

    async init() {
    }
    async destroy() {
        this.closed.next()
    }

}
