package info.bitrich.xchangestream.binance;

import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.knowm.xchange.binance.BinanceAuthenticated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/binance/BinanceUserDataChannel.class */
class BinanceUserDataChannel implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BinanceUserDataChannel.class);
    private final BinanceAuthenticated binance;
    private final String apiKey;
    private final Runnable onApiCall;
    private final Disposable keepAlive;
    private String listenKey;
    private Consumer<String> onChangeListenKey;

    /* loaded from: input_file:info/bitrich/xchangestream/binance/BinanceUserDataChannel$NoActiveChannelException.class */
    static final class NoActiveChannelException extends Exception {
        private static final long serialVersionUID = -8161003286845820286L;

        NoActiveChannelException() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BinanceUserDataChannel(BinanceAuthenticated binanceAuthenticated, String str, Runnable runnable) {
        this.binance = binanceAuthenticated;
        this.apiKey = str;
        this.onApiCall = runnable;
        openChannel();
        this.keepAlive = Observable.interval(30L, TimeUnit.MINUTES).subscribe(l -> {
            keepAlive();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onChangeListenKey(Consumer<String> consumer) {
        this.onChangeListenKey = consumer;
    }

    private void keepAlive() {
        if (this.listenKey == null) {
            return;
        }
        try {
            LOG.debug("Keeping user data channel alive");
            this.onApiCall.run();
            this.binance.keepAliveUserDataStream(this.apiKey, this.listenKey);
            LOG.debug("User data channel keepalive sent successfully");
        } catch (Exception e) {
            LOG.error("User data channel keepalive failed.", (Throwable) e);
            this.listenKey = null;
            reconnect();
        }
    }

    private void reconnect() {
        try {
            openChannel();
            if (this.onChangeListenKey != null) {
                this.onChangeListenKey.accept(this.listenKey);
            }
        } catch (Exception e) {
            LOG.error("Failed to reconnect. Will retry in 15 seconds.", (Throwable) e);
            Observable.timer(15L, TimeUnit.SECONDS).subscribe(l -> {
                reconnect();
            });
        }
    }

    private void openChannel() {
        try {
            LOG.debug("Opening new user data channel");
            this.onApiCall.run();
            this.listenKey = this.binance.startUserDataStream(this.apiKey).getListenKey();
            LOG.debug("Opened new user data channel");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getListenKey() throws NoActiveChannelException {
        if (this.listenKey == null) {
            throw new NoActiveChannelException();
        }
        return this.listenKey;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.keepAlive.dispose();
    }
}
