package info.bitrich.xchangestream.binance;

import info.bitrich.xchangestream.binance.BinanceUserDataChannel;
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.service.netty.ConnectionStateModel;
import info.bitrich.xchangestream.util.Events;
import io.reactivex.Completable;
import io.reactivex.Observable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.knowm.xchange.binance.BinanceAuthenticated;
import org.knowm.xchange.binance.BinanceExchange;
import org.knowm.xchange.binance.service.BinanceMarketDataService;
import org.knowm.xchange.client.ExchangeRestProxyBuilder;
import org.knowm.xchange.derivative.FuturesContract;
import org.knowm.xchange.instrument.Instrument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/binance/BinanceStreamingExchange.class */
public class BinanceStreamingExchange extends BinanceExchange implements StreamingExchange {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BinanceStreamingExchange.class);
    private static final String WS_API_BASE_URI = "wss://stream.binance.com:9443/";
    private static final String WS_SANDBOX_API_BASE_URI = "wss://testnet.binance.vision/";
    public static final String USE_HIGHER_UPDATE_FREQUENCY = "Binance_Orderbook_Use_Higher_Frequency";
    public static final String USE_REALTIME_BOOK_TICKER = "Binance_Ticker_Use_Realtime";
    public static final String FETCH_ORDER_BOOK_LIMIT = "Binance_Fetch_Order_Book_Limit";
    private BinanceStreamingService streamingService;
    private BinanceUserDataStreamingService userDataStreamingService;
    private BinanceStreamingMarketDataService streamingMarketDataService;
    private BinanceStreamingAccountService streamingAccountService;
    private BinanceStreamingTradeService streamingTradeService;
    private BinanceUserDataChannel userDataChannel;
    private Runnable onApiCall;
    private String orderBookUpdateFrequencyParameter = "";
    private int oderBookFetchLimitParameter = 1000;
    private boolean realtimeOrderBookTicker;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.knowm.xchange.binance.BinanceExchange, org.knowm.xchange.BaseExchange
    public void initServices() {
        super.initServices();
        this.onApiCall = Events.onApiCall(this.exchangeSpecification);
        boolean equals = Boolean.TRUE.equals(this.exchangeSpecification.getExchangeSpecificParametersItem(USE_HIGHER_UPDATE_FREQUENCY));
        this.realtimeOrderBookTicker = Boolean.TRUE.equals(this.exchangeSpecification.getExchangeSpecificParametersItem(USE_REALTIME_BOOK_TICKER));
        if (equals) {
            this.orderBookUpdateFrequencyParameter = "@100ms";
        }
        Object exchangeSpecificParametersItem = this.exchangeSpecification.getExchangeSpecificParametersItem(FETCH_ORDER_BOOK_LIMIT);
        if (exchangeSpecificParametersItem instanceof Integer) {
            this.oderBookFetchLimitParameter = ((Integer) exchangeSpecificParametersItem).intValue();
        }
    }

    public Completable connect(KlineSubscription klineSubscription, ProductSubscription... productSubscriptionArr) {
        return (klineSubscription == null || klineSubscription.isEmpty()) ? connect(productSubscriptionArr) : (productSubscriptionArr == null || productSubscriptionArr.length == 0) ? internalConnect(klineSubscription, ProductSubscription.create().build()) : internalConnect(klineSubscription, productSubscriptionArr);
    }

    @Override // info.bitrich.xchangestream.core.StreamingExchange
    public Completable connect(ProductSubscription... productSubscriptionArr) {
        if (productSubscriptionArr == null || productSubscriptionArr.length == 0) {
            throw new IllegalArgumentException("Subscriptions must be made at connection time");
        }
        return internalConnect(new KlineSubscription(Collections.emptyMap()), productSubscriptionArr);
    }

    private Completable internalConnect(KlineSubscription klineSubscription, ProductSubscription... productSubscriptionArr) {
        if (this.streamingService != null) {
            throw new UnsupportedOperationException("Exchange only handles a single connection - disconnect the current connection.");
        }
        ProductSubscription productSubscription = productSubscriptionArr[0];
        this.streamingService = createStreamingService(productSubscription, klineSubscription);
        ArrayList arrayList = new ArrayList();
        if (productSubscription.hasUnauthenticated() || klineSubscription.hasUnauthenticated()) {
            arrayList.add(this.streamingService.connect());
        }
        if (productSubscription.hasAuthenticated()) {
            if (this.exchangeSpecification.getApiKey() == null) {
                throw new IllegalArgumentException("API key required for authenticated streams");
            }
            LOG.info("Connecting to authenticated web socket");
            this.userDataChannel = new BinanceUserDataChannel((BinanceAuthenticated) ExchangeRestProxyBuilder.forInterface(BinanceAuthenticated.class, getExchangeSpecification()).build(), this.exchangeSpecification.getApiKey(), this.onApiCall);
            try {
                arrayList.add(createAndConnectUserDataService(this.userDataChannel.getListenKey()));
            } catch (BinanceUserDataChannel.NoActiveChannelException e) {
                throw new IllegalStateException("Failed to establish user data channel", e);
            }
        }
        this.streamingMarketDataService = new BinanceStreamingMarketDataService(this.streamingService, (BinanceMarketDataService) this.marketDataService, this.onApiCall, this.orderBookUpdateFrequencyParameter, this.realtimeOrderBookTicker, this.oderBookFetchLimitParameter);
        this.streamingAccountService = new BinanceStreamingAccountService(this.userDataStreamingService);
        this.streamingTradeService = new BinanceStreamingTradeService(this.userDataStreamingService);
        return Completable.concat(arrayList).doOnComplete(() -> {
            this.streamingMarketDataService.openSubscriptions(productSubscription, klineSubscription);
        }).doOnComplete(() -> {
            this.streamingAccountService.openSubscriptions();
        }).doOnComplete(() -> {
            this.streamingTradeService.openSubscriptions();
        });
    }

    private Completable createAndConnectUserDataService(String str) {
        this.userDataStreamingService = BinanceUserDataStreamingService.create(getStreamingBaseUri(), str);
        applyStreamingSpecification(getExchangeSpecification(), this.userDataStreamingService);
        return this.userDataStreamingService.connect().doOnComplete(() -> {
            LOG.info("Connected to authenticated web socket");
            this.userDataChannel.onChangeListenKey(str2 -> {
                this.userDataStreamingService.disconnect().doOnComplete(() -> {
                    createAndConnectUserDataService(str2).doOnComplete(() -> {
                        this.streamingAccountService.setUserDataStreamingService(this.userDataStreamingService);
                        this.streamingTradeService.setUserDataStreamingService(this.userDataStreamingService);
                    });
                });
            });
        });
    }

    @Override // info.bitrich.xchangestream.core.StreamingExchange
    public Completable disconnect() {
        ArrayList arrayList = new ArrayList();
        if (this.streamingService != null) {
            arrayList.add(this.streamingService.disconnect());
            this.streamingService = null;
        }
        if (this.userDataStreamingService != null) {
            arrayList.add(this.userDataStreamingService.disconnect());
            this.userDataStreamingService = null;
        }
        if (this.userDataChannel != null) {
            this.userDataChannel.close();
            this.userDataChannel = null;
        }
        this.streamingMarketDataService = null;
        return Completable.concat(arrayList);
    }

    @Override // info.bitrich.xchangestream.core.StreamingExchange
    public boolean isAlive() {
        return this.streamingService != null && this.streamingService.isSocketOpen();
    }

    @Override // info.bitrich.xchangestream.core.StreamingExchange
    public Observable<Throwable> reconnectFailure() {
        return this.streamingService.subscribeReconnectFailure();
    }

    @Override // info.bitrich.xchangestream.core.StreamingExchange
    public Observable<Object> connectionSuccess() {
        return this.streamingService.subscribeConnectionSuccess();
    }

    @Override // info.bitrich.xchangestream.core.StreamingExchange
    public Observable<ConnectionStateModel.State> connectionStateObservable() {
        return this.streamingService.subscribeConnectionState();
    }

    @Override // info.bitrich.xchangestream.core.StreamingExchange
    public BinanceStreamingMarketDataService getStreamingMarketDataService() {
        return this.streamingMarketDataService;
    }

    @Override // info.bitrich.xchangestream.core.StreamingExchange
    public BinanceStreamingAccountService getStreamingAccountService() {
        return this.streamingAccountService;
    }

    @Override // info.bitrich.xchangestream.core.StreamingExchange
    public BinanceStreamingTradeService getStreamingTradeService() {
        return this.streamingTradeService;
    }

    protected BinanceStreamingService createStreamingService(ProductSubscription productSubscription, KlineSubscription klineSubscription) {
        BinanceStreamingService binanceStreamingService = new BinanceStreamingService(getStreamingBaseUri() + "stream?streams=" + buildSubscriptionStreams(productSubscription, klineSubscription), productSubscription, klineSubscription);
        applyStreamingSpecification(getExchangeSpecification(), binanceStreamingService);
        return binanceStreamingService;
    }

    private String buildSubscriptionStreams(ProductSubscription productSubscription, KlineSubscription klineSubscription) {
        return (String) Stream.concat(Arrays.stream(buildSubscriptionStreams(productSubscription).split("/")), buildSubscriptionStreams(klineSubscription)).filter((v0) -> {
            return StringUtils.isNotEmpty(v0);
        }).collect(Collectors.joining("/"));
    }

    private Stream<String> buildSubscriptionStreams(KlineSubscription klineSubscription) {
        return klineSubscription.getKlines().entrySet().stream().flatMap(entry -> {
            return ((Set) entry.getValue()).stream().map(klineInterval -> {
                return getPrefix((Instrument) entry.getKey()) + "@kline_" + klineInterval.code();
            });
        });
    }

    protected String getStreamingBaseUri() {
        return Boolean.TRUE.equals(this.exchangeSpecification.getExchangeSpecificParametersItem("Use_Sandbox")) ? WS_SANDBOX_API_BASE_URI : WS_API_BASE_URI;
    }

    public String buildSubscriptionStreams(ProductSubscription productSubscription) {
        String[] strArr = new String[5];
        strArr[0] = buildSubscriptionStrings(productSubscription.getTicker(), this.realtimeOrderBookTicker ? BinanceSubscriptionType.BOOK_TICKER.getType() : BinanceSubscriptionType.TICKER.getType());
        strArr[1] = buildSubscriptionStrings(productSubscription.getOrderBook(), BinanceSubscriptionType.DEPTH.getType());
        strArr[2] = buildSubscriptionStrings(productSubscription.getOrderBook(), BinanceSubscriptionType.DEPTH20.getType());
        strArr[3] = buildSubscriptionStrings(productSubscription.getTrades(), BinanceSubscriptionType.TRADE.getType());
        strArr[4] = buildSubscriptionStrings(productSubscription.getFundingRates(), BinanceSubscriptionType.FUNDING_RATES.getType());
        return (String) Stream.of((Object[]) strArr).filter(str -> {
            return !str.isEmpty();
        }).collect(Collectors.joining("/"));
    }

    private String buildSubscriptionStrings(List<Instrument> list, String str) {
        return (BinanceSubscriptionType.DEPTH.getType().equals(str) || BinanceSubscriptionType.DEPTH20.getType().equals(str)) ? (String) subscriptionStrings(list).map(str2 -> {
            return str2 + "@" + str + this.orderBookUpdateFrequencyParameter;
        }).collect(Collectors.joining("/")) : (String) subscriptionStrings(list).map(str3 -> {
            return str3 + "@" + str;
        }).collect(Collectors.joining("/"));
    }

    private static Stream<String> subscriptionStrings(List<Instrument> list) {
        return list.stream().map(BinanceStreamingExchange::getPrefix);
    }

    private static String getPrefix(Instrument instrument) {
        String lowerCase = String.join("", instrument.toString().split("/")).toLowerCase();
        if (instrument instanceof FuturesContract) {
            lowerCase = String.join("", ((FuturesContract) instrument).getCurrencyPair().toString().split("/")).toLowerCase();
        }
        return lowerCase;
    }

    @Override // info.bitrich.xchangestream.core.StreamingExchange
    public void useCompressedMessages(boolean z) {
        this.streamingService.useCompressedMessages(z);
    }

    public void enableLiveSubscription() {
        if (this.streamingService == null) {
            throw new UnsupportedOperationException("You must connect to streams before enabling live subscription.");
        }
        this.streamingService.enableLiveSubscription();
    }

    public void disableLiveSubscription() {
        if (this.streamingService != null) {
            this.streamingService.disableLiveSubscription();
        }
    }
}
