import pDefer from 'p-defer';
import PLazy from 'p-lazy';
import { BehaviorSubject, fromEvent, map, takeUntil, filter, Observable, mergeMap, tap, Subject, merge, of, concatWith, throwError, ignoreElements, switchMap, finalize, dematerialize, firstValueFrom, combineLatest, EMPTY, startWith, from, first, shareReplay, defer, share, ReplaySubject, catchError, materialize, isObservable } from 'rxjs';
import { callOrGet } from 'value-or-factory';
import { Batcher } from './batcher.mjs';
import { generateId, ObservableAndPromise } from './wrap.mjs';

var Channel;
(function (Channel) {
    /**
     * A pre-created channel from this window's globalThis.
     */
    Channel.SELF = port(globalThis);
    /**
     * Wraps another channel and batches any messages sent to it. Also treats incoming messages as batches.
     */
    function batching(channel, options) {
        return () => {
            const connection = channel();
            const batcher = new Batcher(connection.next.bind(connection), options);
            return Connection.from({
                observable: connection.pipe(mergeMap(items => items)),
                next: batcher.add.bind(batcher),
                close: () => {
                    batcher.process();
                    connection.close();
                }
            });
        };
    }
    Channel.batching = batching;
    function logging(channel, name = "Untitled") {
        return () => {
            const connection = channel();
            return Connection.from({
                observable: connection.pipe(tap(emission => console.log("Received emission on channel " + name + ".", emission))),
                next: value => {
                    console.log("Sending emission on channel " + name + ".", value);
                    connection.next(value);
                },
                close: connection.close,
            });
        };
    }
    Channel.logging = logging;
    /**
     * A broadcast channel.
     * @param name The name of the channel.
     * @returns A channel.
     */
    function broadcast(name) {
        return port(() => new BroadcastChannel(name), channel => channel.close());
    }
    Channel.broadcast = broadcast;
    /**
     * TODO closing is weird
     * if we want to just close the sender, we have to leave the channel open so we can unsubscribe from everything
     *
     */
    function port(open, close) {
        return () => {
            const connection = callOrGet(open);
            const closed = new BehaviorSubject(false);
            return Connection.from({
                observable: fromEvent(connection, "message").pipe(map(event => event.data), takeUntil(closed.pipe(filter(closed => closed)))),
                next: value => {
                    if (closed.getValue()) {
                        throw new Error("This channel is closed.");
                    }
                    connection.postMessage(value);
                },
                close: () => {
                    closed.next(true);
                    closed.complete();
                    if (close !== undefined) {
                        close(connection);
                    }
                }
            });
        };
    }
    Channel.port = port;
})(Channel || (Channel = {}));
class Connection extends Observable {
    observer;
    close;
    constructor(observable, observer, close = () => void 0) {
        super(subscriber => {
            return observable.subscribe(subscriber);
        });
        this.observer = observer;
        this.close = close;
    }
    next(value) {
        return this.observer(value);
    }
    /**
     * Combine and observable and observer into a channel.
     */
    static from(from) {
        return new Connection(from.observable, from.next, from.close);
    }
}
class ChannelSender {
    config;
    closed = new Subject();
    channel;
    constructor(config) {
        this.config = config;
        this.channel = config.channel();
    }
    close() {
        this.closed.complete();
        this.channel.close();
    }
    call(command, ...data) {
        const channel = merge(of(this.channel), this.closed.pipe(concatWith(throwError(() => new Error("This remote is closed.")))).pipe(ignoreElements()));
        const observable = channel.pipe(switchMap(connection => {
            const id = generateId();
            const observable = connection.pipe(finalize(() => {
                connection.next({
                    kind: "unsubscribe",
                    id
                });
            }), dematerialize(), answer(id));
            connection.next({
                kind: "subscribe",
                id,
                command,
                data,
            });
            return observable;
        }));
        const promise = PLazy.from(async () => {
            return await firstValueFrom(channel.pipe(switchMap(connection => {
                const id = generateId();
                const observable = connection.pipe(dematerialize(), answer(id));
                connection.next({
                    kind: "execute",
                    id,
                    command,
                    data
                });
                return observable;
            })));
        });
        return new ObservableAndPromise(observable, promise);
    }
}
class VolatileSenderRemote {
    sender;
    proxy;
    constructor(sender) {
        this.sender = sender;
        this.proxy = oldProxy(this.sender);
    }
    watch(autoReconnect) {
        return this.sender.watch(autoReconnect).pipe(map(sender => oldProxy(sender)));
    }
    close() {
        return this.sender.close();
    }
}
function oldProxy(sender) {
    return new Proxy(sender, {
        get(target, key) {
            if (typeof key === "symbol") {
                throw new Error("No symbol calls on a proxy.");
            }
            return (...args) => target.call(key, ...args);
        }
    });
}
/**
 * A deferred observable that performs a cleanup action on unsubscribe.
 * @deprecated
 */
function closing(factory, close) {
    return new Observable(subscriber => {
        const value = factory();
        subscriber.next(value);
        return () => {
            close(value);
        };
    });
}
var OldChannel;
(function (OldChannel) {
    //TODO rm
    /**
     * Creates a channel from a port.
     */
    function oldPort(open) {
        return new Observable(subscriber => {
            const subscription = open.subscribe(object => {
                subscriber.next(Connection.from({
                    observable: fromEvent(object, "message").pipe(map(_ => _.data)),
                    next: value => {
                        try {
                            object.postMessage(value);
                        }
                        catch (e) {
                            console.error(e);
                            subscriber.error(e);
                        }
                    },
                    close: () => void 0,
                }));
            });
            return () => {
                subscription.unsubscribe();
            };
        });
    }
    OldChannel.oldPort = oldPort;
    /**
     * Creates a channel from a broadcast channel.
     */
    function oldBroadcast(name) {
        return oldPort(closing(() => new BroadcastChannel(name), channel => () => channel.close()));
    }
    OldChannel.oldBroadcast = oldBroadcast;
    /*
    export function broadcast2<T>(name: string): Channel<T, T> {
        const channel = new BroadcastChannel(name)
        return {
            channel,
            close: () => channel.close(),
        }
    }
    */
    /**
     * Creates a channel from a two broadcast channels, one for input and one for output.
     */
    function oldDualBroadcast(input, output) {
        return combineLatest([oldBroadcast(input), oldBroadcast(output)]).pipe(map(([a, b]) => Connection.from({ observable: a, next: b.next.bind(b), close: b.close })));
    }
    OldChannel.oldDualBroadcast = oldDualBroadcast;
    /**
     * Creates a channel from a worker.
     */
    function oldWorker(url, options) {
        return oldPort(closing(() => new Worker(url, options), worker => worker.terminate()));
    }
    OldChannel.oldWorker = oldWorker;
    function unbatching(channel) {
        return channel.pipe(map(connection => {
            return Connection.from({ observable: connection.pipe(mergeMap(items => items)), next: value => connection.next([value]), close: () => void 0 });
        }));
    }
    OldChannel.unbatching = unbatching;
    /**
     * Wraps another channel and batches any messages sent to it. Also treats incoming messages as batches.
     */
    function batchingOld(channel, options) {
        return channel.pipe(map(connection => {
            const batcher = new Batcher(connection.next.bind(connection), options);
            return Connection.from({
                observable: connection.pipe(mergeMap(items => items)),
                next: v => {
                    console.log("Sending object.", v);
                    batcher.add(v);
                },
                close: () => void 0,
            });
        }));
    }
    OldChannel.batchingOld = batchingOld;
})(OldChannel || (OldChannel = {}));
const DEFAULT_CONTEXT = "default";
/**
 * An observable that combines other observables, but also allows removing them.
 */
function registry(observable) {
    const observables = new Map();
    return observable.pipe(mergeMap(action => {
        if (action.action === "add") {
            return new Observable(subscriber => {
                const subscription = action.observable.pipe(map(value => [value, action.key])).subscribe(subscriber);
                observables.set(action.key, subscription);
                return () => {
                    subscription.unsubscribe();
                    observables.delete(action.key);
                };
            });
        }
        else if (action.action === "clear") {
            observables.forEach(subscription => {
                subscription.unsubscribe();
            });
            return EMPTY;
        }
        else {
            const observable = observables.get(action.key);
            if (observable !== undefined) {
                observable.unsubscribe();
            }
            return EMPTY;
        }
    }), finalize(() => {
        observables.forEach(observable => observable.unsubscribe());
        observables.clear();
    }));
}
function broadcastFinder(options = {}) {
    return buildBroadcastFinder(options.context ?? DEFAULT_CONTEXT, (id1, id2) => OldChannel.oldDualBroadcast(id1, id2));
}
function broadcastAdvertiser(options = {}) {
    return buildBroadcastAdvertiser(options.context ?? DEFAULT_CONTEXT, (id1, id2) => OldChannel.oldDualBroadcast(id1, id2));
}
function broadcastFinderBatching(options = {}) {
    return buildBroadcastFinder(options.context ?? DEFAULT_CONTEXT, (id1, id2) => OldChannel.batchingOld(OldChannel.oldDualBroadcast(id1, id2)));
}
function broadcastAdvertiserBatching(options = {}) {
    return buildBroadcastAdvertiser(options.context ?? DEFAULT_CONTEXT, (id1, id2) => OldChannel.batchingOld(OldChannel.oldDualBroadcast(id1, id2)));
}
function broadcastAdvertiserBatching2(options = {}) {
    return buildBroadcastAdvertiser(options.context ?? DEFAULT_CONTEXT, (id1, id2) => OldChannel.batchingOld(OldChannel.oldDualBroadcast(id1, id2)));
}
function buildBroadcastAdvertiser(context, createBroadcastChannel) {
    return observeWebLock(context).pipe(switchMap(() => {
        const registrationChannelId = generateId();
        const registration = OldChannel.oldBroadcast(registrationChannelId);
        return merge(combineLatest([
            OldChannel.oldBroadcast(context),
            observeRandomWebLock()
        ]).pipe(switchMap(([lookupConnection, lockId]) => {
            return lookupConnection.pipe(filter(message => message.type === "askIfWorkerIsAvailable"), map(() => {
                return {
                    type: "workerIsAvailable",
                    registrationChannelId,
                    lockId
                };
            }), startWith({
                type: "newWorkerStarted",
                registrationChannelId,
                lockId
            }), tap(lookupConnection));
        }), ignoreElements()), registration.pipe(switchMap(connection => {
            return connection.pipe(mergeMap(message => message.type === "registerClient" ? of(message) : EMPTY), mergeMap(message => {
                connection.next({
                    type: "clientRegistered",
                    clientId: message.clientId
                });
                const lock = waitForLock(message.lockId);
                return merge(of({
                    action: "add",
                    key: message.clientId,
                    observable: createBroadcastChannel(message.callChannelId, message.answerChannelId)
                }), from(lock).pipe(map(() => {
                    return {
                        action: "delete",
                        key: message.clientId
                    };
                })));
            }));
        })));
    }));
}
function buildBroadcastFinder(context, createBroadcastChannel) {
    return OldChannel.oldBroadcast(context).pipe(switchMap(lookup => {
        lookup.next({
            type: "askIfWorkerIsAvailable"
        });
        return lookup.pipe(mergeMap(message => message.type === "newWorkerStarted" || message.type === "workerIsAvailable" ? of(message) : EMPTY), first(), switchMap(message => {
            return merge(of(message), lookup.pipe(mergeMap(message => message.type === "newWorkerStarted" ? of(message) : EMPTY)));
        }), switchMap(server => {
            return observeRandomWebLock().pipe(switchMap(lockId => {
                const registration = OldChannel.oldBroadcast(server.registrationChannelId);
                return registration.pipe(switchMap(registration => {
                    const clientId = generateId();
                    const callChannelId = generateId();
                    const answerChannelId = generateId();
                    registration.next({
                        type: "registerClient",
                        clientId,
                        lockId,
                        callChannelId,
                        answerChannelId,
                    });
                    return registration.pipe(filter(message => message.type === "clientRegistered" && message.clientId === clientId), first(), map(() => {
                        return createBroadcastChannel(answerChannelId, callChannelId).pipe(takeUntil(waitForLock(server.lockId)), shareReplay(1)
                        //TODO adding this fixed a bug where responses would get duplicated 50+ times
                        //makes some kind of sense, but look into this a bit more to make sure best practices are being followed, here and in similar areas
                        //looks like it still might be doing this to some extent, just not as bad, could be dropped due to refCount?
                        );
                    }));
                }));
            }));
        }));
    }));
}
/**
 * A hack function to acquire a web lock and hold onto it.
 */
async function acquireWebLock(name, options) {
    return new Promise((resolve, reject) => {
        navigator.locks.request(name, options ?? {}, () => {
            const defer = pDefer();
            resolve(defer.resolve);
            return defer.promise;
        }).catch(e => {
            reject(e);
        });
    });
}
async function waitForLock(name) {
    try {
        return await navigator.locks.request(name, { mode: "shared" }, async () => void 0);
    }
    catch (error) {
        //TODO code is deprecated, whats the alternative
        if (error instanceof DOMException && error.code === error.ABORT_ERR) {
            return;
        }
        throw error;
    }
}
/**
 * Acquire a web lock as an observable. Releases when unsubscribed.
 */
function observeWebLock(name, options) {
    return new Observable(subscriber => {
        const controller = new AbortController();
        const lock = acquireWebLock(name, { ...options, signal: controller.signal });
        lock.then(() => subscriber.next()).catch(error => {
            if (error instanceof DOMException && error.code === error.ABORT_ERR) {
                return;
            }
            subscriber.error(error);
        });
        return () => {
            controller.abort();
            lock.then(release => release());
        };
    });
}
/**
 * A hack function to acquire a randomly named web lock as an observable. Releases when unsubscribed.
 */
function observeRandomWebLock(tag = "") {
    return defer(() => {
        const lockId = tag + generateId();
        return observeWebLock(lockId).pipe(map(() => lockId));
    });
}
function withVolatility(retryOnInterrupt) {
    return (observable) => {
        if (retryOnInterrupt) {
            return observable; //TODO timeout somehow
        }
        return observable.pipe(map(observable => {
            return observable.pipe(concatWith(throwError(() => new RemoteError("worker-disappeared", "The worker disappeared. Please try again."))));
        }));
    };
}
class VolatileChannelSender {
    config;
    closed = new BehaviorSubject(false);
    channel;
    constructor(config) {
        this.config = config;
        this.channel = this.closed.pipe(switchMap(closed => {
            if (closed) {
                return throwError(() => new Error("This remote is closed."));
            }
            return config.channel;
        }), share({
            connector: () => new ReplaySubject(1),
            resetOnRefCountZero: false,
            resetOnComplete: false,
            resetOnError: true,
        }));
    }
    watch(autoReconnect = true) {
        return this.channel.pipe(withVolatility(autoReconnect), map(channel => {
            return new VolatileChannelSender({ ...this.config, channel: of(channel) });
        }));
    }
    close() {
        this.closed.next(true);
        this.closed.complete();
    }
    withOptions(options) {
        return new VolatileChannelSender({ ...this.config, ...options, channel: this.channel });
    }
    call(command, ...data) {
        const observable = this.channel.pipe(withVolatility(true), switchMap(_ => _), switchMap(connection => {
            const id = generateId();
            const send = {
                kind: "subscribe",
                id,
                command,
                data,
            };
            connection.next(send);
            return connection.pipe(finalize(() => {
                //TODO how do we make it so this only issues if the inner observable is unsubscribed from directly?
                //if the channel closes, this fails - its not necessary in that case
                connection.next({
                    kind: "unsubscribe",
                    id
                });
            }), dematerialize(), answer(id));
        }));
        const promise = PLazy.from(async () => {
            return await firstValueFrom(this.channel.pipe(withVolatility(false), switchMap(_ => _), switchMap(connection => {
                const id = generateId();
                const send = {
                    kind: "execute",
                    id,
                    command,
                    data
                };
                const observable = connection.pipe(dematerialize(), answer(id));
                connection.next(send);
                return observable;
            })));
        });
        return new ObservableAndPromise(observable, promise);
    }
}
function answer(id, command, ackTimeout) {
    return (observable) => {
        return observable.pipe(dematerialize(), mergeMap(answer => {
            if (answer.id === id) {
                return of(answer);
            }
            return EMPTY;
        }), dematerialize());
        /*
        return merge(
            observable.pipe(
                mergeMap(answer => {
                        if (answer.id === id) {
                            return of(answer.response)
                        }
                    return EMPTY
                }),
                dematerialize(),
            ),
            (() => {
                if (ackTimeout !== undefined) {
                    return observable.pipe(
                        mergeMap(answer => {
                            if (answer.kind === "A") {
                                if (answer.id === id) {
                                    return of(void 0)
                                }
                            }
                            return EMPTY
                        }),
                        timeout({
                            first: ackTimeout,
                            with: () => {
                                return throwError(() => new RemoteError("timeout", "The remote call to \"" + command + "\" was not acknowledged within " + ackTimeout.toLocaleString() + "ms."))
                            }
                        }),
                        ignoreElements()
                    )
                }
                else {
                    return EMPTY
                }
            })()
        )*/
    };
}
function exposeMigrating(config) {
    return registry(config.advertiser.pipe(map(action => {
        if (action.action === "add") {
            return {
                ...action,
                observable: new Observable(s => {
                    const x = expose({
                        channel: action.observable,
                        target: callOrGet(config.target, action.key)
                    });
                    return x;
                })
            };
        }
        return action;
    }))).subscribe();
}
function wrapMigrating(config) {
    return wrapVolatile({
        channel: config.finder,
    });
}
function wrapVolatile(options) {
    return new VolatileSenderRemote(new VolatileChannelSender(options));
}
function expose(config) {
    const subscription = config.channel.pipe(switchMap(connection => {
        return connection.pipe(mergeMap(req => {
            if (req.kind === "unsubscribe") {
                return of({
                    action: "delete",
                    key: req.id
                });
            }
            else {
                const observable = defer(() => {
                    const input = call(config.target, req.command, req.data);
                    if (req.kind === "subscribe") {
                        if (typeof input === "function") {
                            throw new RemoteError("invalid-message", "Trying to treat a promise as an observable.");
                        }
                        else {
                            return input;
                        }
                    }
                    else {
                        if (typeof input === "object") {
                            throw new RemoteError("invalid-message", "Trying to treat an observable as a promise.");
                        }
                        else {
                            return defer(input);
                        }
                    }
                });
                return of({
                    action: "add",
                    key: req.id,
                    observable: observable.pipe(catchError(error => {
                        return throwError(() => {
                            return error;
                        });
                        //return throwError(() => new RemoteError("call-failed", "Remote call to \"" + req.command + "\" failed.", { cause: error }))
                    }), materialize())
                });
            }
        }), registry, map(([answer, id]) => {
            return {
                kind: "N",
                value: {
                    id,
                    ...answer,
                }
            };
        }), materialize(), map(value => {
            connection.next(value);
        }));
    })).subscribe(() => void 0);
    return () => {
        subscription.unsubscribe();
    };
}
function call(target, command, data) {
    if (!(command in target)) {
        throw new Error("Command " + command.toString() + " does not exist.");
    }
    const property = target[command];
    const returned = (() => {
        if (typeof property === "function") {
            return property.call(target, ...data);
        }
        return property;
    })();
    if (isObservable(returned)) {
        return returned;
    }
    else {
        return async () => await returned;
    }
}
//TODO evaluate these, which do we need?
/**
 * A special error with a retryable property, used when a migrating worker dies.
 */
class RemoteError extends Error {
    code;
    constructor(code, message, options) {
        super(message, options);
        this.code = code;
    }
}

export { ChannelSender, DEFAULT_CONTEXT, VolatileChannelSender, VolatileSenderRemote, acquireWebLock, broadcastAdvertiser, broadcastAdvertiserBatching, broadcastAdvertiserBatching2, broadcastFinder, broadcastFinderBatching, closing, exposeMigrating, observeRandomWebLock, observeWebLock, oldProxy, registry, waitForLock, withVolatility, wrapMigrating, wrapVolatile };
