package info.bitrich.xchangestream.binance;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;
import info.bitrich.xchangestream.binance.dto.BinanceRawTrade;
import info.bitrich.xchangestream.binance.dto.BinanceWebsocketTransaction;
import info.bitrich.xchangestream.binance.dto.BookTickerBinanceWebSocketTransaction;
import info.bitrich.xchangestream.binance.dto.DepthBinanceWebSocketTransaction;
import info.bitrich.xchangestream.binance.dto.FundingRateWebsocketTransaction;
import info.bitrich.xchangestream.binance.dto.KlineBinanceWebSocketTransaction;
import info.bitrich.xchangestream.binance.dto.TickerBinanceWebsocketTransaction;
import info.bitrich.xchangestream.binance.dto.TradeBinanceWebsocketTransaction;
import info.bitrich.xchangestream.binance.exceptions.UpFrontSubscriptionRequiredException;
import info.bitrich.xchangestream.bitstamp.v2.BitstampStreamingService;
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.netty.handler.traffic.AbstractTrafficShapingHandler;
import io.reactivex.Observable;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.knowm.xchange.binance.BinanceAdapters;
import org.knowm.xchange.binance.BinanceErrorAdapter;
import org.knowm.xchange.binance.dto.BinanceException;
import org.knowm.xchange.binance.dto.marketdata.BinanceBookTicker;
import org.knowm.xchange.binance.dto.marketdata.BinanceKline;
import org.knowm.xchange.binance.dto.marketdata.BinanceOrderbook;
import org.knowm.xchange.binance.dto.marketdata.BinanceTicker24h;
import org.knowm.xchange.binance.dto.marketdata.KlineInterval;
import org.knowm.xchange.binance.service.BinanceMarketDataService;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.derivative.FuturesContract;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.marketdata.FundingRate;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.OrderBookUpdate;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.exceptions.ExchangeException;
import org.knowm.xchange.exceptions.RateLimitExceededException;
import org.knowm.xchange.instrument.Instrument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService.class */
public class BinanceStreamingMarketDataService implements StreamingMarketDataService {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BinanceStreamingMarketDataService.class);
    private static final JavaType TICKER_TYPE = getTickerType();
    private static final JavaType BOOK_TICKER_TYPE = getBookTickerType();
    private static final JavaType TRADE_TYPE = getTradeType();
    private static final JavaType DEPTH_TYPE = getDepthType();
    private static final JavaType FUNDING_RATE_TYPE = getFundingRateType();
    private static final JavaType KLINE_TYPE = getKlineType();
    private final BinanceStreamingService service;
    private final String orderBookUpdateFrequencyParameter;
    private final boolean realtimeOrderBookTicker;
    private final int oderBookFetchLimitParameter;
    private final BinanceMarketDataService marketDataService;
    private final Runnable onApiCall;
    private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
    private final AtomicBoolean fallenBack = new AtomicBoolean();
    private final AtomicReference<Runnable> fallbackOnApiCall = new AtomicReference<>(() -> {
    });
    private final Map<Instrument, Observable<BinanceTicker24h>> tickerSubscriptions = new ConcurrentHashMap();
    private final Map<Instrument, Observable<BinanceBookTicker>> bookTickerSubscriptions = new ConcurrentHashMap();
    private final Map<Instrument, Observable<OrderBook>> orderbookSubscriptions = new ConcurrentHashMap();
    private final Map<Instrument, Observable<BinanceRawTrade>> tradeSubscriptions = new ConcurrentHashMap();
    private final Map<Instrument, Observable<OrderBookUpdate>> orderBookUpdatesSubscriptions = new ConcurrentHashMap();
    private final Map<Instrument, Observable<DepthBinanceWebSocketTransaction>> orderBookRawUpdatesSubscriptions = new ConcurrentHashMap();
    private final Map<Instrument, Map<KlineInterval, Observable<BinanceKline>>> klineSubscriptions = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:info/bitrich/xchangestream/binance/BinanceStreamingMarketDataService$OrderbookSubscription.class */
    public final class OrderbookSubscription {
        final Observable<DepthBinanceWebSocketTransaction> stream;
        final AtomicLong lastUpdateId;
        final AtomicLong snapshotLastUpdateId;
        OrderBook orderBook;

        private OrderbookSubscription(Observable<DepthBinanceWebSocketTransaction> observable) {
            this.lastUpdateId = new AtomicLong();
            this.snapshotLastUpdateId = new AtomicLong();
            this.stream = observable;
        }

        void invalidateSnapshot() {
            this.snapshotLastUpdateId.set(0L);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void initSnapshotIfInvalid(Instrument instrument) {
            if (this.snapshotLastUpdateId.get() != 0) {
                return;
            }
            try {
                BinanceStreamingMarketDataService.LOG.info("Fetching initial orderbook snapshot for {} ", instrument);
                BinanceStreamingMarketDataService.this.onApiCall.run();
                ((Runnable) BinanceStreamingMarketDataService.this.fallbackOnApiCall.get()).run();
                BinanceOrderbook fetchBinanceOrderBook = fetchBinanceOrderBook(instrument);
                this.snapshotLastUpdateId.set(fetchBinanceOrderBook.lastUpdateId);
                this.lastUpdateId.set(fetchBinanceOrderBook.lastUpdateId);
                this.orderBook = BinanceMarketDataService.convertOrderBook(fetchBinanceOrderBook, instrument);
            } catch (Exception e) {
                BinanceStreamingMarketDataService.LOG.error("Failed to fetch initial order book for " + instrument, (Throwable) e);
                this.snapshotLastUpdateId.set(0L);
                this.lastUpdateId.set(0L);
                this.orderBook = null;
            }
        }

        private BinanceOrderbook fetchBinanceOrderBook(Instrument instrument) throws IOException, InterruptedException {
            try {
                return BinanceStreamingMarketDataService.this.marketDataService.getBinanceOrderbookAllProducts(instrument, Integer.valueOf(BinanceStreamingMarketDataService.this.oderBookFetchLimitParameter));
            } catch (BinanceException e) {
                if ((BinanceErrorAdapter.adapt(e) instanceof RateLimitExceededException) && BinanceStreamingMarketDataService.this.fallenBack.compareAndSet(false, true)) {
                    BinanceStreamingMarketDataService.LOG.error("API Rate limit was hit when fetching Binance order book snapshot. Provide a \nrate limiter. Apache Commons and Google Guava provide the TimedSemaphore\nand RateLimiter classes which are effective for this purpose. Example:\n\n  exchangeSpecification.setExchangeSpecificParametersItem(\n      info.bitrich.xchangestream.util.Events.BEFORE_API_CALL_HANDLER,\n      () -> rateLimiter.acquire())\n\nPausing for 15sec and falling back to one call per three seconds, but you\nwill get more optimal performance by handling your own rate limiting.");
                    RateLimiter create = RateLimiter.create(0.333d);
                    AtomicReference atomicReference = BinanceStreamingMarketDataService.this.fallbackOnApiCall;
                    Objects.requireNonNull(create);
                    atomicReference.set(create::acquire);
                    Thread.sleep(AbstractTrafficShapingHandler.DEFAULT_MAX_TIME);
                }
                throw e;
            }
        }
    }

    public BinanceStreamingMarketDataService(BinanceStreamingService binanceStreamingService, BinanceMarketDataService binanceMarketDataService, Runnable runnable, String str, boolean z, int i) {
        this.service = binanceStreamingService;
        this.orderBookUpdateFrequencyParameter = str;
        this.realtimeOrderBookTicker = z;
        this.oderBookFetchLimitParameter = i;
        this.marketDataService = binanceMarketDataService;
        this.onApiCall = runnable;
    }

    @Override // info.bitrich.xchangestream.core.StreamingMarketDataService
    public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... objArr) {
        if (this.service.isLiveSubscriptionEnabled() || this.service.getProductSubscription().getOrderBook().contains(currencyPair)) {
            return this.orderbookSubscriptions.computeIfAbsent(currencyPair, this::initOrderBookIfAbsent);
        }
        throw new UpFrontSubscriptionRequiredException();
    }

    @Override // info.bitrich.xchangestream.core.StreamingMarketDataService
    public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... objArr) {
        return this.realtimeOrderBookTicker ? getRawBookTicker(currencyPair).map(binanceBookTicker -> {
            return binanceBookTicker.toTicker(false);
        }) : getRawTicker(currencyPair).map(binanceTicker24h -> {
            return binanceTicker24h.toTicker(false);
        });
    }

    @Override // info.bitrich.xchangestream.core.StreamingMarketDataService
    public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... objArr) {
        return getRawTrades(currencyPair).map(binanceRawTrade -> {
            return BinanceStreamingAdapters.adaptRawTrade(binanceRawTrade, currencyPair);
        });
    }

    @Override // info.bitrich.xchangestream.core.StreamingMarketDataService
    public Observable<OrderBook> getOrderBook(Instrument instrument, Object... objArr) {
        if (this.service.isLiveSubscriptionEnabled() || this.service.getProductSubscription().getOrderBook().contains(instrument)) {
            return instrument instanceof FuturesContract ? this.service.subscribeChannel(channelFromCurrency(instrument, BinanceSubscriptionType.DEPTH20.getType()), new Object[0]).map(jsonNode -> {
                return readTransaction(jsonNode, DEPTH_TYPE, "order book");
            }).map((v0) -> {
                return v0.getData();
            }).filter(depthBinanceWebSocketTransaction -> {
                return BinanceAdapters.adaptSymbol(depthBinanceWebSocketTransaction.getSymbol(), true).equals(instrument);
            }).map(BinanceStreamingAdapters::adaptFuturesOrderbook) : this.orderbookSubscriptions.computeIfAbsent(instrument, this::initOrderBookIfAbsent);
        }
        throw new UpFrontSubscriptionRequiredException();
    }

    @Override // info.bitrich.xchangestream.core.StreamingMarketDataService
    public Observable<Ticker> getTicker(Instrument instrument, Object... objArr) {
        return this.realtimeOrderBookTicker ? getRawBookTicker(instrument).map(binanceBookTicker -> {
            return binanceBookTicker.toTicker(instrument instanceof FuturesContract);
        }) : getRawTicker(instrument).map(binanceTicker24h -> {
            return binanceTicker24h.toTicker(instrument instanceof FuturesContract);
        });
    }

    @Override // info.bitrich.xchangestream.core.StreamingMarketDataService
    public Observable<Trade> getTrades(Instrument instrument, Object... objArr) {
        return getRawTrades(instrument).map(binanceRawTrade -> {
            return BinanceStreamingAdapters.adaptRawTrade(binanceRawTrade, instrument);
        });
    }

    @Override // info.bitrich.xchangestream.core.StreamingMarketDataService
    public Observable<FundingRate> getFundingRate(Instrument instrument, Object... objArr) {
        return this.service.subscribeChannel(channelFromCurrency(instrument, BinanceSubscriptionType.FUNDING_RATES.getType()), new Object[0]).map(jsonNode -> {
            return readTransaction(jsonNode, FUNDING_RATE_TYPE, "funding rate");
        }).map((v0) -> {
            return v0.getData();
        }).filter(fundingRateWebsocketTransaction -> {
            return BinanceAdapters.adaptSymbol(fundingRateWebsocketTransaction.getSymbol(), true).equals(instrument);
        }).map((v0) -> {
            return v0.toFundingRate();
        });
    }

    private Observable<OrderBook> initOrderBookIfAbsent(Instrument instrument) {
        this.orderBookRawUpdatesSubscriptions.computeIfAbsent(instrument, instrument2 -> {
            return triggerObservableBody(rawOrderBookUpdates(instrument));
        });
        return createOrderBookObservable(instrument);
    }

    public Observable<BinanceTicker24h> getRawTicker(Instrument instrument) {
        if (this.service.isLiveSubscriptionEnabled() || this.service.getProductSubscription().getTicker().contains(instrument)) {
            return this.tickerSubscriptions.computeIfAbsent(instrument, instrument2 -> {
                return triggerObservableBody(rawTickerStream(instrument)).share();
            });
        }
        throw new UpFrontSubscriptionRequiredException();
    }

    public Observable<BinanceBookTicker> getRawBookTicker(Instrument instrument) {
        if (this.service.isLiveSubscriptionEnabled() || this.service.getProductSubscription().getTicker().contains(instrument)) {
            return this.bookTickerSubscriptions.computeIfAbsent(instrument, instrument2 -> {
                return triggerObservableBody(rawBookTickerStream(instrument)).share();
            });
        }
        throw new UpFrontSubscriptionRequiredException();
    }

    public Observable<BinanceRawTrade> getRawTrades(Instrument instrument) {
        if (this.service.isLiveSubscriptionEnabled() || this.service.getProductSubscription().getTrades().contains(instrument)) {
            return this.tradeSubscriptions.computeIfAbsent(instrument, instrument2 -> {
                return triggerObservableBody(rawTradeStream(instrument)).share();
            });
        }
        throw new UpFrontSubscriptionRequiredException();
    }

    public Observable<BinanceKline> getKlines(Instrument instrument, KlineInterval klineInterval) {
        if (this.service.isLiveSubscriptionEnabled() || this.service.getKlineSubscription().contains(instrument, klineInterval)) {
            return this.klineSubscriptions.compute(instrument, (instrument2, map) -> {
                Map createMapIfNull = createMapIfNull(map);
                createMapIfNull.computeIfAbsent(klineInterval, klineInterval2 -> {
                    return triggerObservableBody(klinesStream(instrument, klineInterval)).share();
                });
                return createMapIfNull;
            }).get(klineInterval);
        }
        throw new UpFrontSubscriptionRequiredException();
    }

    private static <K, V> Map<K, V> createMapIfNull(Map<K, V> map) {
        return map == null ? new ConcurrentHashMap() : map;
    }

    private Observable<BinanceKline> klinesStream(Instrument instrument, KlineInterval klineInterval) {
        return this.service.subscribeChannel(getChannelPrefix(instrument) + "@" + BinanceSubscriptionType.KLINE.getType() + "_" + klineInterval.code(), new Object[0]).map(jsonNode -> {
            return ((KlineBinanceWebSocketTransaction) readTransaction(jsonNode, KLINE_TYPE, "kline").getData()).toBinanceKline(instrument instanceof FuturesContract);
        }).filter(binanceKline -> {
            return binanceKline.getInstrument().equals(instrument) && binanceKline.getInterval().equals(klineInterval);
        });
    }

    public Observable<OrderBookUpdate> getOrderBookUpdates(Instrument instrument) {
        if (this.service.isLiveSubscriptionEnabled() || this.service.getProductSubscription().getOrderBook().contains(instrument)) {
            return this.orderBookUpdatesSubscriptions.computeIfAbsent(instrument, this::initOrderBookUpdateIfAbsent);
        }
        throw new UpFrontSubscriptionRequiredException();
    }

    private Observable<OrderBookUpdate> initOrderBookUpdateIfAbsent(Instrument instrument) {
        this.orderBookRawUpdatesSubscriptions.computeIfAbsent(instrument, instrument2 -> {
            return triggerObservableBody(rawOrderBookUpdates(instrument));
        });
        return createOrderBookUpdatesObservable(instrument);
    }

    private Observable<OrderBookUpdate> createOrderBookUpdatesObservable(Instrument instrument) {
        return this.orderBookRawUpdatesSubscriptions.get(instrument).flatMap(depthBinanceWebSocketTransaction -> {
            return observableFromStream(extractOrderBookUpdates(instrument, depthBinanceWebSocketTransaction));
        }).share();
    }

    private String channelFromCurrency(Instrument instrument, String str) {
        String str2 = getChannelPrefix(instrument) + "@" + str;
        return BinanceSubscriptionType.DEPTH.getType().equals(str) ? str2 + this.orderBookUpdateFrequencyParameter : str2;
    }

    private String getChannelPrefix(Instrument instrument) {
        return instrument instanceof FuturesContract ? ((FuturesContract) instrument).getCurrencyPair().toString().replace("/", "").toLowerCase() : instrument.toString().replace("/", "").toLowerCase();
    }

    public void openSubscriptions(ProductSubscription productSubscription, KlineSubscription klineSubscription) {
        klineSubscription.getKlines().forEach(this::initKlineSubscription);
        productSubscription.getTicker().forEach(this::initTickerSubscription);
        productSubscription.getOrderBook().forEach(this::initRawOrderBookUpdatesSubscription);
        productSubscription.getTrades().forEach(this::initTradeSubscription);
    }

    private void initKlineSubscription(Instrument instrument, Set<KlineInterval> set) {
        this.klineSubscriptions.compute(instrument, (instrument2, map) -> {
            Map createMapIfNull = createMapIfNull(map);
            set.forEach(klineInterval -> {
                createMapIfNull.put(klineInterval, triggerObservableBody(klinesStream(instrument, klineInterval)));
            });
            return createMapIfNull;
        });
    }

    public void unsubscribe(Instrument instrument, BinanceSubscriptionType binanceSubscriptionType) {
        if (binanceSubscriptionType == BinanceSubscriptionType.KLINE) {
            this.klineSubscriptions.computeIfPresent(instrument, (instrument2, map) -> {
                map.keySet().forEach(klineInterval -> {
                    unsubscribeKline(instrument, klineInterval);
                });
                return null;
            });
        } else {
            unsubscribe(instrument, binanceSubscriptionType, null);
        }
    }

    public void unsubscribeKline(Instrument instrument, KlineInterval klineInterval) {
        unsubscribe(instrument, BinanceSubscriptionType.KLINE, klineInterval);
    }

    private void unsubscribe(Instrument instrument, BinanceSubscriptionType binanceSubscriptionType, KlineInterval klineInterval) {
        if (!this.service.isLiveSubscriptionEnabled()) {
            throw new UnsupportedOperationException("Unsubscribe not supported for Binance when live Subscription/Unsubscription is disabled. Call BinanceStreamingExchange.enableLiveSubscription() to active it");
        }
        this.service.unsubscribeChannel(getChannelId(instrument, binanceSubscriptionType, klineInterval));
        switch (binanceSubscriptionType) {
            case DEPTH:
                this.orderbookSubscriptions.remove(instrument);
                this.orderBookUpdatesSubscriptions.remove(instrument);
                this.orderBookRawUpdatesSubscriptions.remove(instrument);
                return;
            case TRADE:
                this.tradeSubscriptions.remove(instrument);
                return;
            case TICKER:
                this.tickerSubscriptions.remove(instrument);
                return;
            case BOOK_TICKER:
                this.bookTickerSubscriptions.remove(instrument);
                return;
            case KLINE:
                this.klineSubscriptions.computeIfPresent(instrument, (instrument2, map) -> {
                    map.remove(klineInterval);
                    return map;
                });
                break;
        }
        throw new IllegalArgumentException("Subscription type not supported to unsubscribe from stream");
    }

    private String getChannelId(Instrument instrument, BinanceSubscriptionType binanceSubscriptionType, KlineInterval klineInterval) {
        return getChannelPrefix(instrument) + "@" + binanceSubscriptionType.getType() + (klineInterval != null ? "_" + klineInterval.code() : "");
    }

    private void initTradeSubscription(Instrument instrument) {
        this.tradeSubscriptions.put(instrument, triggerObservableBody(rawTradeStream(instrument)).share());
    }

    private void initTickerSubscription(Instrument instrument) {
        if (this.realtimeOrderBookTicker) {
            this.bookTickerSubscriptions.put(instrument, triggerObservableBody(rawBookTickerStream(instrument)).share());
        } else {
            this.tickerSubscriptions.put(instrument, triggerObservableBody(rawTickerStream(instrument)).share());
        }
    }

    private void initRawOrderBookUpdatesSubscription(Instrument instrument) {
        this.orderBookRawUpdatesSubscriptions.put(instrument, triggerObservableBody(rawOrderBookUpdates(instrument)));
    }

    private Observable<BinanceTicker24h> rawTickerStream(Instrument instrument) {
        return this.service.subscribeChannel(channelFromCurrency(instrument, BinanceSubscriptionType.TICKER.getType()), new Object[0]).map(jsonNode -> {
            return readTransaction(jsonNode, TICKER_TYPE, "ticker");
        }).filter(binanceWebsocketTransaction -> {
            return BinanceAdapters.adaptSymbol(((TickerBinanceWebsocketTransaction) binanceWebsocketTransaction.getData()).getSymbol(), instrument instanceof FuturesContract).equals(instrument);
        }).map(binanceWebsocketTransaction2 -> {
            return ((TickerBinanceWebsocketTransaction) binanceWebsocketTransaction2.getData()).getTicker();
        });
    }

    private Observable<BinanceBookTicker> rawBookTickerStream(Instrument instrument) {
        return this.service.subscribeChannel(channelFromCurrency(instrument, BinanceSubscriptionType.BOOK_TICKER.getType()), new Object[0]).map(jsonNode -> {
            return readTransaction(jsonNode, BOOK_TICKER_TYPE, "book ticker");
        }).filter(binanceWebsocketTransaction -> {
            return BinanceAdapters.adaptSymbol(((BookTickerBinanceWebSocketTransaction) binanceWebsocketTransaction.getData()).getTicker().getSymbol(), instrument instanceof FuturesContract).equals(instrument);
        }).map(binanceWebsocketTransaction2 -> {
            return ((BookTickerBinanceWebSocketTransaction) binanceWebsocketTransaction2.getData()).getTicker();
        });
    }

    private Observable<DepthBinanceWebSocketTransaction> rawOrderBookUpdates(Instrument instrument) {
        return this.service.subscribeChannel(channelFromCurrency(instrument, BinanceSubscriptionType.DEPTH.getType()), new Object[0]).map(jsonNode -> {
            return readTransaction(jsonNode, DEPTH_TYPE, "order book");
        }).map((v0) -> {
            return v0.getData();
        }).filter(depthBinanceWebSocketTransaction -> {
            return BinanceAdapters.adaptSymbol(depthBinanceWebSocketTransaction.getSymbol(), instrument instanceof FuturesContract).equals(instrument);
        });
    }

    private Observable<OrderBook> createOrderBookObservable(Instrument instrument) {
        OrderbookSubscription orderbookSubscription = new OrderbookSubscription(this.orderBookRawUpdatesSubscriptions.get(instrument));
        return orderbookSubscription.stream.doOnNext(depthBinanceWebSocketTransaction -> {
            orderbookSubscription.initSnapshotIfInvalid(instrument);
        }).filter(depthBinanceWebSocketTransaction2 -> {
            return orderbookSubscription.snapshotLastUpdateId.get() > 0;
        }).filter(depthBinanceWebSocketTransaction3 -> {
            return depthBinanceWebSocketTransaction3.getLastUpdateId() > orderbookSubscription.snapshotLastUpdateId.get();
        }).filter(depthBinanceWebSocketTransaction4 -> {
            boolean z;
            long j = orderbookSubscription.lastUpdateId.get();
            if (j == 0) {
                z = true;
            } else if (instrument instanceof FuturesContract) {
                z = depthBinanceWebSocketTransaction4.getFirstUpdateId() <= j && depthBinanceWebSocketTransaction4.getLastUpdateId() >= j;
            } else {
                z = depthBinanceWebSocketTransaction4.getFirstUpdateId() <= j + 1 && depthBinanceWebSocketTransaction4.getLastUpdateId() >= j + 1;
            }
            if (z) {
                orderbookSubscription.lastUpdateId.set(depthBinanceWebSocketTransaction4.getLastUpdateId());
            } else {
                LOG.info("Orderbook snapshot for {} out of date (last={}, U={}, u={}). This is normal. Re-syncing.", instrument, Long.valueOf(j), Long.valueOf(depthBinanceWebSocketTransaction4.getFirstUpdateId()), Long.valueOf(depthBinanceWebSocketTransaction4.getLastUpdateId()));
                orderbookSubscription.invalidateSnapshot();
            }
            return z;
        }).map(depthBinanceWebSocketTransaction5 -> {
            extractOrderBookUpdates(instrument, depthBinanceWebSocketTransaction5).forEach(orderBookUpdate -> {
                orderbookSubscription.orderBook.update(orderBookUpdate);
            });
            return orderbookSubscription.orderBook;
        }).share();
    }

    private Observable<BinanceRawTrade> rawTradeStream(Instrument instrument) {
        return this.service.subscribeChannel(channelFromCurrency(instrument, BinanceSubscriptionType.TRADE.getType()), new Object[0]).map(jsonNode -> {
            return readTransaction(jsonNode, TRADE_TYPE, BitstampStreamingService.EVENT_TRADE);
        }).filter(binanceWebsocketTransaction -> {
            return BinanceAdapters.adaptSymbol(((TradeBinanceWebsocketTransaction) binanceWebsocketTransaction.getData()).getSymbol(), instrument instanceof FuturesContract).equals(instrument);
        }).map(binanceWebsocketTransaction2 -> {
            return ((TradeBinanceWebsocketTransaction) binanceWebsocketTransaction2.getData()).getRawTrade();
        });
    }

    private <T> Observable<T> triggerObservableBody(Observable<T> observable) {
        observable.subscribe(obj -> {
        });
        return observable;
    }

    private <T> BinanceWebsocketTransaction<T> readTransaction(JsonNode jsonNode, JavaType javaType, String str) {
        try {
            return (BinanceWebsocketTransaction) this.mapper.readValue(this.mapper.treeAsTokens(jsonNode), javaType);
        } catch (IOException e) {
            throw new ExchangeException(String.format("Unable to parse %s transaction", str), e);
        }
    }

    private Stream<OrderBookUpdate> extractOrderBookUpdates(Instrument instrument, DepthBinanceWebSocketTransaction depthBinanceWebSocketTransaction) {
        BinanceOrderbook orderBook = depthBinanceWebSocketTransaction.getOrderBook();
        return Stream.concat(orderBook.bids.entrySet().stream().map(entry -> {
            return new OrderBookUpdate(Order.OrderType.BID, (BigDecimal) entry.getValue(), instrument, (BigDecimal) entry.getKey(), depthBinanceWebSocketTransaction.getEventTime(), (BigDecimal) entry.getValue());
        }), orderBook.asks.entrySet().stream().map(entry2 -> {
            return new OrderBookUpdate(Order.OrderType.ASK, (BigDecimal) entry2.getValue(), instrument, (BigDecimal) entry2.getKey(), depthBinanceWebSocketTransaction.getEventTime(), (BigDecimal) entry2.getValue());
        }));
    }

    private <T> Observable<T> observableFromStream(Stream<T> stream) {
        return Observable.create(observableEmitter -> {
            Objects.requireNonNull(observableEmitter);
            stream.forEach(observableEmitter::onNext);
            observableEmitter.onComplete();
        });
    }

    private static JavaType getTickerType() {
        return StreamingObjectMapperHelper.getObjectMapper().getTypeFactory().constructType(new TypeReference<BinanceWebsocketTransaction<TickerBinanceWebsocketTransaction>>() { // from class: info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.1
        });
    }

    private static JavaType getBookTickerType() {
        return StreamingObjectMapperHelper.getObjectMapper().getTypeFactory().constructType(new TypeReference<BinanceWebsocketTransaction<BookTickerBinanceWebSocketTransaction>>() { // from class: info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.2
        });
    }

    private static JavaType getTradeType() {
        return StreamingObjectMapperHelper.getObjectMapper().getTypeFactory().constructType(new TypeReference<BinanceWebsocketTransaction<TradeBinanceWebsocketTransaction>>() { // from class: info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.3
        });
    }

    private static JavaType getDepthType() {
        return StreamingObjectMapperHelper.getObjectMapper().getTypeFactory().constructType(new TypeReference<BinanceWebsocketTransaction<DepthBinanceWebSocketTransaction>>() { // from class: info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.4
        });
    }

    private static JavaType getFundingRateType() {
        return StreamingObjectMapperHelper.getObjectMapper().getTypeFactory().constructType(new TypeReference<BinanceWebsocketTransaction<FundingRateWebsocketTransaction>>() { // from class: info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.5
        });
    }

    private static JavaType getKlineType() {
        return StreamingObjectMapperHelper.getObjectMapper().getTypeFactory().constructType(new TypeReference<BinanceWebsocketTransaction<KlineBinanceWebSocketTransaction>>() { // from class: info.bitrich.xchangestream.binance.BinanceStreamingMarketDataService.6
        });
    }
}
