package discord4j.voice;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iwebpp.crypto.TweetNaclFast;
import discord4j.common.JacksonResources;
import discord4j.common.LogUtil;
import discord4j.common.ResettableInterval;
import discord4j.common.close.CloseException;
import discord4j.common.close.CloseStatus;
import discord4j.common.close.DisconnectBehavior;
import discord4j.common.retry.ReconnectContext;
import discord4j.common.retry.ReconnectOptions;
import discord4j.common.sinks.EmissionStrategy;
import discord4j.common.util.Snowflake;
import discord4j.voice.VoiceConnection;
import discord4j.voice.json.Heartbeat;
import discord4j.voice.json.Hello;
import discord4j.voice.json.Identify;
import discord4j.voice.json.Ready;
import discord4j.voice.json.Resume;
import discord4j.voice.json.Resumed;
import discord4j.voice.json.SelectProtocol;
import discord4j.voice.json.SentSpeaking;
import discord4j.voice.json.SessionDescription;
import discord4j.voice.json.VoiceGatewayPayload;
import discord4j.voice.retry.VoiceGatewayException;
import discord4j.voice.retry.VoiceGatewayReconnectException;
import discord4j.voice.retry.VoiceGatewayRetrySpec;
import discord4j.voice.retry.VoiceServerUpdateReconnectException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaderNames;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.function.TupleUtils;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.WebsocketClientSpec;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;
import reactor.util.context.ContextView;
import reactor.util.retry.Retry;
import reactor.util.retry.RetrySpec;

/* loaded from: input_file:discord4j/voice/DefaultVoiceGatewayClient.class */
public class DefaultVoiceGatewayClient {
    private static final Logger log = Loggers.getLogger((Class<?>) DefaultVoiceGatewayClient.class);
    private static final Logger senderLog = Loggers.getLogger("discord4j.voice.protocol.sender");
    private static final Logger receiverLog = Loggers.getLogger("discord4j.voice.protocol.receiver");
    private final Snowflake guildId;
    private final Snowflake selfId;
    private final Function<VoiceGatewayPayload<?>, Mono<ByteBuf>> payloadWriter;
    private final Function<ByteBuf, Mono<? super VoiceGatewayPayload<?>>> payloadReader;
    private final VoiceReactorResources reactorResources;
    private final ReconnectOptions reconnectOptions;
    private final ReconnectContext reconnectContext;
    private final AudioProvider audioProvider;
    private final AudioReceiver audioReceiver;
    private final VoiceSendTaskFactory sendTaskFactory;
    private final VoiceReceiveTaskFactory receiveTaskFactory;
    private final VoiceDisconnectTask disconnectTask;
    private final VoiceServerUpdateTask serverUpdateTask;
    private final VoiceChannelRetrieveTask channelRetrieveTask;
    private final Duration ipDiscoveryTimeout;
    private final RetrySpec ipDiscoveryRetrySpec;
    private final HttpClient httpClient;
    private final VoiceSocket voiceSocket;
    private final ResettableInterval heartbeat;
    private final Disposable.Swap cleanup;
    private final EmissionStrategy emissionStrategy;
    private final Sinks.Many<ByteBuf> receiver;
    private final Sinks.Many<VoiceGatewayPayload<?>> outbound;
    private final Sinks.Many<VoiceGatewayEvent> events;
    private final Sinks.Many<VoiceConnection.State> state;
    private final AtomicReference<VoiceServerOptions> serverOptions = new AtomicReference<>();
    private final AtomicReference<String> session = new AtomicReference<>();
    private volatile int ssrc;
    private volatile Sinks.One<CloseStatus> disconnectNotifier;
    private volatile ContextView currentContext;
    private volatile VoiceWebsocketHandler sessionHandler;

    public DefaultVoiceGatewayClient(VoiceGatewayOptions voiceGatewayOptions) {
        this.guildId = voiceGatewayOptions.getGuildId();
        this.selfId = voiceGatewayOptions.getSelfId();
        ObjectMapper objectMapper = ((JacksonResources) Objects.requireNonNull(voiceGatewayOptions.getJacksonResources())).getObjectMapper();
        this.payloadWriter = voiceGatewayPayload -> {
            return Mono.fromCallable(() -> {
                return Unpooled.wrappedBuffer(objectMapper.writeValueAsBytes(voiceGatewayPayload));
            });
        };
        this.payloadReader = byteBuf -> {
            return Mono.fromCallable(() -> {
                return (VoiceGatewayPayload) objectMapper.readValue(new ByteBufInputStream(byteBuf), new TypeReference<VoiceGatewayPayload<?>>() { // from class: discord4j.voice.DefaultVoiceGatewayClient.1
                });
            });
        };
        this.reactorResources = (VoiceReactorResources) Objects.requireNonNull(voiceGatewayOptions.getReactorResources());
        this.reconnectOptions = (ReconnectOptions) Objects.requireNonNull(voiceGatewayOptions.getReconnectOptions());
        this.reconnectContext = new ReconnectContext(this.reconnectOptions.getFirstBackoff(), this.reconnectOptions.getMaxBackoffInterval());
        this.audioProvider = (AudioProvider) Objects.requireNonNull(voiceGatewayOptions.getAudioProvider());
        this.audioReceiver = (AudioReceiver) Objects.requireNonNull(voiceGatewayOptions.getAudioReceiver());
        this.sendTaskFactory = (VoiceSendTaskFactory) Objects.requireNonNull(voiceGatewayOptions.getSendTaskFactory());
        this.receiveTaskFactory = (VoiceReceiveTaskFactory) Objects.requireNonNull(voiceGatewayOptions.getReceiveTaskFactory());
        this.disconnectTask = (VoiceDisconnectTask) Objects.requireNonNull(voiceGatewayOptions.getDisconnectTask());
        this.serverUpdateTask = (VoiceServerUpdateTask) Objects.requireNonNull(voiceGatewayOptions.getServerUpdateTask());
        this.channelRetrieveTask = (VoiceChannelRetrieveTask) Objects.requireNonNull(voiceGatewayOptions.getChannelRetrieveTask());
        this.ipDiscoveryTimeout = (Duration) Objects.requireNonNull(voiceGatewayOptions.getIpDiscoveryTimeout());
        this.ipDiscoveryRetrySpec = (RetrySpec) Objects.requireNonNull(voiceGatewayOptions.getIpDiscoveryRetrySpec());
        this.httpClient = this.reactorResources.getHttpClient().headers(httpHeaders -> {
            httpHeaders.add(HttpHeaderNames.USER_AGENT, "DiscordBot(https://discord4j.com, 3)");
        });
        this.voiceSocket = new VoiceSocket(this.reactorResources.getUdpClient());
        this.heartbeat = new ResettableInterval(this.reactorResources.getTimerTaskScheduler());
        this.cleanup = Disposables.swap();
        this.emissionStrategy = EmissionStrategy.timeoutDrop(Duration.ofSeconds(5L));
        this.receiver = newEmitterSink();
        this.outbound = newEmitterSink();
        this.events = newEmitterSink();
        this.state = Sinks.many().replay().latestOrDefault(VoiceConnection.State.CONNECTING);
    }

    private static <T> Sinks.Many<T> newEmitterSink() {
        return Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    }

    public Mono<VoiceConnection> start(VoiceServerOptions voiceServerOptions, String str) {
        return Mono.create(monoSink -> {
            monoSink.onRequest(j -> {
                Disposable.Composite composite = Disposables.composite();
                composite.add(this.serverUpdateTask.onVoiceServerUpdates(this.guildId).subscribe(voiceServerOptions2 -> {
                    VoiceServerOptions voiceServerOptions2 = this.serverOptions.get();
                    if (voiceServerOptions2.getEndpoint().equals(voiceServerOptions2.getEndpoint())) {
                        return;
                    }
                    log.debug(LogUtil.format(monoSink.currentContext(), "Voice server endpoint change: {}"), voiceServerOptions2.getEndpoint(), voiceServerOptions2.getEndpoint());
                    this.serverOptions.set(voiceServerOptions2);
                    if (this.sessionHandler != null) {
                        this.sessionHandler.close(DisconnectBehavior.retryAbruptly(new VoiceServerUpdateReconnectException(monoSink.currentContext())));
                    }
                }));
                composite.add(connect(voiceServerOptions, str, monoSink).contextWrite(monoSink.currentContext()).subscribe(null, th -> {
                    log.debug(LogUtil.format(monoSink.currentContext(), "Voice gateway error: {}"), th.toString());
                }, () -> {
                    log.debug(LogUtil.format(monoSink.currentContext(), "Voice gateway completed"));
                }));
                monoSink.onCancel(composite);
            });
        });
    }

    private Mono<Void> connect(VoiceServerOptions voiceServerOptions, String str, MonoSink<VoiceConnection> monoSink) {
        return Mono.deferContextual(contextView -> {
            this.serverOptions.compareAndSet(null, voiceServerOptions);
            this.session.compareAndSet(null, str);
            this.disconnectNotifier = Sinks.one();
            this.currentContext = contextView;
            this.sessionHandler = new VoiceWebsocketHandler(this.receiver, this.outbound.asFlux().flatMap(this.payloadWriter).doOnNext(byteBuf -> {
                logPayload(senderLog, contextView, byteBuf);
            }), contextView);
            Mono<VoiceConnection.State> doOnNext = this.state.asFlux().next().doOnNext(state -> {
                if (state == VoiceConnection.State.RESUMING) {
                    log.info(LogUtil.format(contextView, "Attempting to resume"));
                    this.emissionStrategy.emitNext(this.outbound, new Resume(this.guildId.asString(), this.session.get(), this.serverOptions.get().getToken()));
                } else {
                    nextState(VoiceConnection.State.CONNECTING);
                    log.info(LogUtil.format(contextView, "Identifying"));
                    this.emissionStrategy.emitNext(this.outbound, new Identify(this.guildId.asString(), this.selfId.asString(), this.session.get(), this.serverOptions.get().getToken()));
                }
            });
            Disposable.Composite composite = Disposables.composite();
            Mono doOnError = Mono.zip(((HttpClient.WebsocketSender) this.httpClient.websocket(WebsocketClientSpec.builder().maxFramePayloadLength(Integer.MAX_VALUE).build()).uri(this.serverOptions.get().getEndpoint() + "?v=4")).handle((websocketInbound, websocketOutbound) -> {
                return doOnNext.then(this.sessionHandler.handle(websocketInbound, websocketOutbound));
            }).contextWrite(LogUtil.clearContext()).flatMap(tuple2 -> {
                return handleClose((DisconnectBehavior) tuple2.getT1(), (CloseStatus) tuple2.getT2());
            }).then(), this.receiver.asFlux().doOnNext(byteBuf2 -> {
                logPayload(receiverLog, contextView, byteBuf2);
            }).flatMap(this.payloadReader).doOnNext(obj -> {
                if (obj instanceof Hello) {
                    Duration ofMillis = Duration.ofMillis(((Hello) obj).getData().getHeartbeatInterval());
                    this.heartbeat.start(ofMillis, ofMillis);
                } else if (obj instanceof Ready) {
                    log.info(LogUtil.format(contextView, "Waiting for session description"));
                    Ready ready = (Ready) obj;
                    this.ssrc = ready.getData().getSsrc();
                    this.cleanup.update(composite);
                    composite.add(Mono.defer(() -> {
                        return this.voiceSocket.setup(ready.getData().getIp(), ready.getData().getPort());
                    }).zipWith(this.voiceSocket.performIpDiscovery(ready.getData().getSsrc())).timeout(this.ipDiscoveryTimeout).retryWhen(this.ipDiscoveryRetrySpec).contextWrite(contextView).onErrorMap(th -> {
                        return new VoiceGatewayException(contextView, "UDP socket setup error", th);
                    }).subscribe(TupleUtils.consumer((connection, inetSocketAddress) -> {
                        composite.add(connection);
                        this.emissionStrategy.emitNext(this.outbound, new SelectProtocol("udp", inetSocketAddress.getHostName(), inetSocketAddress.getPort(), "xsalsa20_poly1305"));
                    }), th2 -> {
                        monoSink.error(th2);
                        this.sessionHandler.close(DisconnectBehavior.stop(th2));
                    }, () -> {
                        log.debug(LogUtil.format(contextView, "Voice socket setup complete"));
                    }));
                } else if (obj instanceof SessionDescription) {
                    log.info(LogUtil.format(contextView, "Receiving events"));
                    nextState(VoiceConnection.State.CONNECTED);
                    this.reconnectContext.reset();
                    PacketTransformer packetTransformer = new PacketTransformer(this.ssrc, new TweetNaclFast.SecretBox(((SessionDescription) obj).getData().getSecretKey()));
                    Consumer<Boolean> consumer = bool -> {
                        this.emissionStrategy.emitNext(this.outbound, new SentSpeaking(bool.booleanValue(), 0, this.ssrc));
                    };
                    composite.add(() -> {
                        log.debug(LogUtil.format(contextView, "Disposing voice tasks"));
                    });
                    VoiceSendTaskFactory voiceSendTaskFactory = this.sendTaskFactory;
                    Scheduler sendTaskScheduler = this.reactorResources.getSendTaskScheduler();
                    VoiceSocket voiceSocket = this.voiceSocket;
                    Objects.requireNonNull(voiceSocket);
                    composite.add(voiceSendTaskFactory.create(sendTaskScheduler, consumer, voiceSocket::send, this.audioProvider, packetTransformer));
                    composite.add(this.receiveTaskFactory.create(this.reactorResources.getReceiveTaskScheduler(), this.voiceSocket.getInbound(), packetTransformer, this.audioReceiver));
                    monoSink.success(acquireConnection());
                } else if (obj instanceof Resumed) {
                    log.info(LogUtil.format(contextView, "Resumed"));
                    nextState(VoiceConnection.State.CONNECTED);
                    this.reconnectContext.reset();
                }
                this.emissionStrategy.emitNext(this.events, (VoiceGatewayEvent) obj);
            }).then(), this.heartbeat.ticks().map((v1) -> {
                return new Heartbeat(v1);
            }).doOnNext(heartbeat -> {
                this.emissionStrategy.emitNext(this.outbound, heartbeat);
            }).then()).doOnError(th -> {
                log.error(LogUtil.format(contextView, "{}"), th.toString());
            });
            ResettableInterval resettableInterval = this.heartbeat;
            Objects.requireNonNull(resettableInterval);
            return doOnError.doOnTerminate(resettableInterval::stop).doOnCancel(() -> {
                this.sessionHandler.close();
            }).then();
        }).contextWrite(context -> {
            return context.put(LogUtil.KEY_GUILD_ID, this.guildId.asString());
        }).retryWhen(retryFactory()).then(Mono.defer(() -> {
            return this.disconnectNotifier.asMono().then();
        })).doOnSubscribe(subscription -> {
            if (this.disconnectNotifier != null) {
                throw new IllegalStateException("connect can only be subscribed once");
            }
        });
    }

    private void nextState(VoiceConnection.State state) {
        log.debug(LogUtil.format(this.currentContext, "New state: {}"), state);
        this.emissionStrategy.emitNext(this.state, state);
    }

    private VoiceConnection acquireConnection() {
        return new VoiceConnection() { // from class: discord4j.voice.DefaultVoiceGatewayClient.2
            @Override // discord4j.voice.VoiceConnection
            public Flux<VoiceGatewayEvent> events() {
                return DefaultVoiceGatewayClient.this.events.asFlux();
            }

            @Override // discord4j.voice.VoiceConnection
            public Flux<VoiceConnection.State> stateEvents() {
                return DefaultVoiceGatewayClient.this.state.asFlux();
            }

            @Override // discord4j.voice.VoiceConnection
            public Mono<Void> disconnect() {
                return onConnectOrDisconnect().flatMap(state -> {
                    return state.equals(VoiceConnection.State.CONNECTED) ? DefaultVoiceGatewayClient.this.stop() : Mono.empty();
                }).then();
            }

            @Override // discord4j.voice.VoiceConnection
            public Snowflake getGuildId() {
                return DefaultVoiceGatewayClient.this.guildId;
            }

            @Override // discord4j.voice.VoiceConnection
            public Mono<Snowflake> getChannelId() {
                return onConnectOrDisconnect().flatMap(state -> {
                    return state.equals(VoiceConnection.State.CONNECTED) ? DefaultVoiceGatewayClient.this.channelRetrieveTask.onRequest() : Mono.empty();
                });
            }

            @Override // discord4j.voice.VoiceConnection
            public Mono<Void> reconnect() {
                return reconnect(VoiceGatewayReconnectException::new);
            }

            @Override // discord4j.voice.VoiceConnection
            public Mono<Void> reconnect(Function<ContextView, Throwable> function) {
                return onConnectOrDisconnect().flatMap(state -> {
                    return state.equals(VoiceConnection.State.CONNECTED) ? Mono.fromRunnable(() -> {
                        DefaultVoiceGatewayClient.this.sessionHandler.close(DisconnectBehavior.retryAbruptly((Throwable) function.apply(DefaultVoiceGatewayClient.this.currentContext)));
                    }).then(stateEvents().filter(state -> {
                        return state.equals(VoiceConnection.State.CONNECTED);
                    }).next()) : Mono.error(new IllegalStateException("Voice connection has already disconnected"));
                }).then();
            }
        };
    }

    public Mono<Void> stop() {
        return Mono.defer(() -> {
            if (this.sessionHandler == null || this.disconnectNotifier == null) {
                return Mono.error(new IllegalStateException("Gateway client is not active!"));
            }
            this.sessionHandler.close(DisconnectBehavior.stop(null));
            return this.disconnectNotifier.asMono().then();
        });
    }

    private void logPayload(Logger logger, ContextView contextView, ByteBuf byteBuf) {
        if (logger.isTraceEnabled()) {
            logger.trace(LogUtil.format(contextView, byteBuf.toString(StandardCharsets.UTF_8).replaceAll("(\"token\": ?\")([A-Za-z0-9._-]*)(\")", "$1hunter2$3")));
        }
    }

    private Retry retryFactory() {
        return VoiceGatewayRetrySpec.create(this.reconnectOptions, this.reconnectContext).doBeforeRetry(voiceGatewayRetrySignal -> {
            nextState(voiceGatewayRetrySignal.nextState());
            long iteration = voiceGatewayRetrySignal.iteration();
            log.debug(LogUtil.format(getContextFromException(voiceGatewayRetrySignal.failure()), "{} in {} (attempts: {})"), voiceGatewayRetrySignal.nextState(), voiceGatewayRetrySignal.nextBackoff(), Long.valueOf(iteration));
        });
    }

    private ContextView getContextFromException(Throwable th) {
        return th instanceof CloseException ? ((CloseException) th).getContext() : th instanceof VoiceGatewayException ? ((VoiceGatewayException) th).getContext() : Context.empty();
    }

    private Mono<CloseStatus> handleClose(DisconnectBehavior disconnectBehavior, CloseStatus closeStatus) {
        return Mono.deferContextual(contextView -> {
            DisconnectBehavior stop = VoiceGatewayRetrySpec.NON_RETRYABLE_STATUS_CODES.contains(Integer.valueOf(closeStatus.getCode())) ? DisconnectBehavior.stop(disconnectBehavior.getCause()) : disconnectBehavior;
            log.debug(LogUtil.format(contextView, "Closing and {} with status {}"), stop, closeStatus);
            this.heartbeat.stop();
            if (stop.getAction() == DisconnectBehavior.Action.STOP) {
                this.cleanup.dispose();
            }
            switch (stop.getAction()) {
                case STOP_ABRUPTLY:
                case STOP:
                    return stop.getCause() != null ? Mono.just(new CloseException(closeStatus, contextView, stop.getCause())).flatMap(closeException -> {
                        nextState(VoiceConnection.State.DISCONNECTED);
                        this.disconnectNotifier.emitError(closeException, Sinks.EmitFailureHandler.FAIL_FAST);
                        return this.disconnectTask.onDisconnect(this.guildId).then(closeStatus.getCode() == 4014 ? Mono.just(closeStatus) : Mono.error(closeException));
                    }) : Mono.just(closeStatus).flatMap(closeStatus2 -> {
                        nextState(VoiceConnection.State.DISCONNECTED);
                        this.disconnectNotifier.emitValue(closeStatus, Sinks.EmitFailureHandler.FAIL_FAST);
                        return this.disconnectTask.onDisconnect(this.guildId).thenReturn(closeStatus);
                    });
                case RETRY_ABRUPTLY:
                case RETRY:
                default:
                    return Mono.error(new CloseException(closeStatus, contextView, stop.getCause()));
            }
        });
    }
}
