package info.bitrich.xchangestream.okex;

import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.bitstamp.v2.BitstampStreamingService;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.questdb.PropServerConfiguration;
import io.reactivex.Observable;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.marketdata.FundingRate;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.instrument.Instrument;
import org.knowm.xchange.okex.OkexAdapters;
import org.knowm.xchange.okex.dto.marketdata.OkexFundingRate;
import org.knowm.xchange.okex.dto.marketdata.OkexOrderbook;
import org.knowm.xchange.okex.dto.marketdata.OkexPublicOrder;
import org.knowm.xchange.okex.dto.marketdata.OkexTicker;
import org.knowm.xchange.okex.dto.marketdata.OkexTrade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/okex/OkexStreamingMarketDataService.class */
public class OkexStreamingMarketDataService implements StreamingMarketDataService {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) OkexStreamingMarketDataService.class);
    private final OkexStreamingService service;
    private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
    private final Map<String, OrderBook> orderBookMap = new HashMap();

    public OkexStreamingMarketDataService(OkexStreamingService okexStreamingService) {
        this.service = okexStreamingService;
    }

    @Override // info.bitrich.xchangestream.core.StreamingMarketDataService
    public Observable<Ticker> getTicker(Instrument instrument, Object... objArr) {
        return this.service.subscribeChannel(OkexStreamingService.TICKERS + OkexAdapters.adaptInstrument(instrument), new Object[0]).filter(jsonNode -> {
            return jsonNode.has(BitstampStreamingService.EVENT_ORDERBOOK);
        }).flatMap(jsonNode2 -> {
            return Observable.fromIterable((List) this.mapper.treeToValue(jsonNode2.get(BitstampStreamingService.EVENT_ORDERBOOK), this.mapper.getTypeFactory().constructCollectionType(List.class, OkexTicker.class))).map(OkexAdapters::adaptTicker);
        });
    }

    @Override // info.bitrich.xchangestream.core.StreamingMarketDataService
    public Observable<Trade> getTrades(Instrument instrument, Object... objArr) {
        return this.service.subscribeChannel(OkexStreamingService.TRADES + OkexAdapters.adaptInstrument(instrument), new Object[0]).filter(jsonNode -> {
            return jsonNode.has(BitstampStreamingService.EVENT_ORDERBOOK);
        }).flatMap(jsonNode2 -> {
            return Observable.fromIterable(OkexAdapters.adaptTrades((List) this.mapper.treeToValue(jsonNode2.get(BitstampStreamingService.EVENT_ORDERBOOK), this.mapper.getTypeFactory().constructCollectionType(List.class, OkexTrade.class)), instrument).getTrades());
        });
    }

    @Override // info.bitrich.xchangestream.core.StreamingMarketDataService
    public Observable<FundingRate> getFundingRate(Instrument instrument, Object... objArr) {
        return this.service.subscribeChannel(OkexStreamingService.FUNDING_RATE + OkexAdapters.adaptInstrument(instrument), new Object[0]).filter(jsonNode -> {
            return jsonNode.has(BitstampStreamingService.EVENT_ORDERBOOK);
        }).map(jsonNode2 -> {
            return OkexAdapters.adaptFundingRate((List) this.mapper.treeToValue(jsonNode2.get(BitstampStreamingService.EVENT_ORDERBOOK), this.mapper.getTypeFactory().constructCollectionType(List.class, OkexFundingRate.class)));
        });
    }

    @Override // info.bitrich.xchangestream.core.StreamingMarketDataService
    public Observable<OrderBook> getOrderBook(Instrument instrument, Object... objArr) {
        String adaptInstrument = OkexAdapters.adaptInstrument(instrument);
        String obj = objArr.length >= 1 ? objArr[0].toString() : OkexStreamingService.ORDERBOOK;
        return this.service.subscribeChannel(OkexStreamingService.ORDERBOOK + adaptInstrument, new Object[0]).filter(jsonNode -> {
            return jsonNode.has("action");
        }).flatMap(jsonNode2 -> {
            String asText = obj.equals(OkexStreamingService.ORDERBOOK5) ? PropServerConfiguration.SNAPSHOT_DIRECTORY : jsonNode2.get("action").asText();
            if (PropServerConfiguration.SNAPSHOT_DIRECTORY.equalsIgnoreCase(asText)) {
                OrderBook adaptOrderBook = OkexAdapters.adaptOrderBook((List<OkexOrderbook>) this.mapper.treeToValue(jsonNode2.get(BitstampStreamingService.EVENT_ORDERBOOK), this.mapper.getTypeFactory().constructCollectionType(List.class, OkexOrderbook.class)), instrument);
                this.orderBookMap.put(adaptInstrument, adaptOrderBook);
                return Observable.just(adaptOrderBook);
            }
            if (!"update".equalsIgnoreCase(asText)) {
                LOG.error(String.format("Unexpected books action=%s, message=%s", asText, jsonNode2));
                return Observable.fromIterable(new LinkedList());
            }
            OrderBook orDefault = this.orderBookMap.getOrDefault(adaptInstrument, null);
            if (orDefault == null) {
                LOG.error(String.format("Failed to get orderBook, instId=%s.", adaptInstrument));
                return Observable.fromIterable(new LinkedList());
            }
            ((List) this.mapper.treeToValue(jsonNode2.get(BitstampStreamingService.EVENT_ORDERBOOK).get(0).get("asks"), this.mapper.getTypeFactory().constructCollectionType(List.class, OkexPublicOrder.class))).forEach(okexPublicOrder -> {
                orDefault.update(OkexAdapters.adaptLimitOrder(okexPublicOrder, instrument, Order.OrderType.ASK));
            });
            ((List) this.mapper.treeToValue(jsonNode2.get(BitstampStreamingService.EVENT_ORDERBOOK).get(0).get("bids"), this.mapper.getTypeFactory().constructCollectionType(List.class, OkexPublicOrder.class))).forEach(okexPublicOrder2 -> {
                orDefault.update(OkexAdapters.adaptLimitOrder(okexPublicOrder2, instrument, Order.OrderType.BID));
            });
            return Observable.just(orDefault);
        });
    }
}
