/*
 * Decompiled with CFR 0.152.
 */
package discord4j.common.operator;

import discord4j.common.sinks.EmissionStrategy;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.Logger;
import reactor.util.Loggers;

public class RateLimitOperator<T>
implements Function<Publisher<T>, Publisher<T>> {
    private static final Logger log = Loggers.getLogger("discord4j.limiter");
    private static final Supplier<Scheduler> DEFAULT_PUBLISH_SCHEDULER = () -> Schedulers.newSingle("d4j-limiter", true);
    private final AtomicInteger tokens;
    private final Duration refillPeriod;
    private final Scheduler delayScheduler;
    private final Sinks.Many<Integer> tokenSink;
    private final Scheduler tokenPublishScheduler;
    private final EmissionStrategy emissionStrategy;

    public RateLimitOperator(int capacity, Duration refillPeriod, Scheduler delayScheduler) {
        this(capacity, refillPeriod, delayScheduler, DEFAULT_PUBLISH_SCHEDULER.get());
    }

    public RateLimitOperator(int capacity, Duration refillPeriod, Scheduler delayScheduler, Scheduler publishScheduler) {
        this.tokens = new AtomicInteger(capacity);
        this.refillPeriod = refillPeriod;
        this.delayScheduler = delayScheduler;
        this.tokenSink = Sinks.many().replay().latestOrDefault(capacity);
        this.tokenPublishScheduler = publishScheduler;
        this.emissionStrategy = EmissionStrategy.park(Duration.ofNanos(10L));
    }

    private String id() {
        return Integer.toHexString(this.hashCode());
    }

    @Override
    public Publisher<T> apply(Publisher<T> source) {
        return Flux.from(source).flatMap(value -> this.availableTokens().next().doOnSubscribe(s2 -> {
            if (log.isTraceEnabled()) {
                log.trace("[{}] Subscribed to limiter", this.id());
            }
        }).map(token -> {
            this.acquire();
            Mono.delay(this.refillPeriod, this.delayScheduler).subscribe(__ -> this.release());
            return value;
        }));
    }

    private void acquire() {
        int token = this.tokens.decrementAndGet();
        if (log.isTraceEnabled()) {
            log.trace("[{}] Acquired a token, {} tokens remaining", this.id(), token);
        }
        this.emissionStrategy.emitNext(this.tokenSink, token);
    }

    private void release() {
        int token = this.tokens.incrementAndGet();
        if (log.isTraceEnabled()) {
            log.trace("[{}] Released a token, {} tokens remaining", this.id(), token);
        }
        this.emissionStrategy.emitNext(this.tokenSink, token);
    }

    private Flux<Integer> availableTokens() {
        return this.tokenSink.asFlux().publishOn(this.tokenPublishScheduler).filter(__ -> this.tokens.get() > 0);
    }
}

