package discord4j.core.event;

import discord4j.common.LogUtil;
import discord4j.core.event.EventDispatcher;
import discord4j.core.event.SinksEventDispatcher;
import discord4j.core.event.domain.Event;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.reactivestreams.Subscription;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;

/* loaded from: input_file:discord4j/core/event/DefaultEventDispatcher.class */
public class DefaultEventDispatcher implements EventDispatcher {
    private static final Logger log = Loggers.getLogger((Class<?>) DefaultEventDispatcher.class);
    private final FluxProcessor<Event, Event> eventProcessor;
    private final FluxSink<Event> sink;
    private final Scheduler eventScheduler;

    /* loaded from: input_file:discord4j/core/event/DefaultEventDispatcher$Builder.class */
    public static class Builder implements EventDispatcher.Builder {
        protected FluxProcessor<Event, Event> eventProcessor;
        protected FluxSink.OverflowStrategy overflowStrategy = FluxSink.OverflowStrategy.BUFFER;
        protected Scheduler eventScheduler;

        @Override // discord4j.core.event.EventDispatcher.Builder
        public SinksEventDispatcher.Builder eventSink(Function<Sinks.ManySpec, Sinks.Many<Event>> function) {
            SinksEventDispatcher.Builder eventSink = new SinksEventDispatcher.Builder().eventSink((Function) Objects.requireNonNull(function));
            if (this.eventScheduler != null) {
                eventSink.eventScheduler(this.eventScheduler);
            }
            return eventSink;
        }

        @Override // discord4j.core.event.EventDispatcher.Builder
        public Builder eventProcessor(FluxProcessor<Event, Event> fluxProcessor) {
            this.eventProcessor = (FluxProcessor) Objects.requireNonNull(fluxProcessor);
            return this;
        }

        @Override // discord4j.core.event.EventDispatcher.Builder
        public Builder overflowStrategy(FluxSink.OverflowStrategy overflowStrategy) {
            this.overflowStrategy = (FluxSink.OverflowStrategy) Objects.requireNonNull(overflowStrategy);
            return this;
        }

        @Override // discord4j.core.event.EventDispatcher.Builder
        public Builder eventScheduler(Scheduler scheduler) {
            this.eventScheduler = (Scheduler) Objects.requireNonNull(scheduler);
            return this;
        }

        @Override // discord4j.core.event.EventDispatcher.Builder
        public EventDispatcher build() {
            if (this.eventProcessor == null) {
                this.eventProcessor = EmitterProcessor.create(Queues.SMALL_BUFFER_SIZE, false);
            }
            if (this.eventScheduler == null) {
                this.eventScheduler = EventDispatcher.DEFAULT_EVENT_SCHEDULER.get();
            }
            return new DefaultEventDispatcher(this.eventProcessor, this.overflowStrategy, this.eventScheduler);
        }
    }

    public DefaultEventDispatcher(FluxProcessor<Event, Event> fluxProcessor, FluxSink.OverflowStrategy overflowStrategy, Scheduler scheduler) {
        this.eventProcessor = fluxProcessor;
        this.sink = fluxProcessor.sink(overflowStrategy);
        this.eventScheduler = scheduler;
    }

    @Override // discord4j.core.event.EventDispatcher
    public <E extends Event> Flux<E> on(Class<E> cls) {
        AtomicReference atomicReference = new AtomicReference();
        return this.eventProcessor.publishOn(this.eventScheduler).ofType(cls).handle((event, synchronousSink) -> {
            if (log.isTraceEnabled()) {
                log.trace(LogUtil.format(synchronousSink.currentContext().put(LogUtil.KEY_SHARD_ID, Integer.valueOf(event.getShardInfo().getIndex())), "{}"), event.toString());
            }
            synchronousSink.next(event);
        }).doOnSubscribe(subscription -> {
            atomicReference.set(subscription);
            if (log.isDebugEnabled()) {
                log.debug("Subscription {} to {} created", Integer.toHexString(subscription.hashCode()), cls.getSimpleName());
            }
        }).doFinally(signalType -> {
            if (log.isDebugEnabled()) {
                log.debug("Subscription {} to {} disposed due to {}", Integer.toHexString(((Subscription) atomicReference.get()).hashCode()), cls.getSimpleName(), signalType);
            }
        });
    }

    @Override // discord4j.core.event.EventDispatcher
    public void publish(Event event) {
        this.sink.next(event);
    }

    @Override // discord4j.core.event.EventDispatcher
    public void shutdown() {
        this.sink.complete();
    }
}
