package info.bitrich.xchangestream.kraken;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import info.bitrich.xchangestream.kraken.dto.enums.KrakenSubscriptionName;
import io.reactivex.Observable;
import java.util.Date;
import java.util.List;
import java.util.TreeSet;
import org.apache.commons.lang3.ObjectUtils;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.dto.trade.LimitOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:info/bitrich/xchangestream/kraken/KrakenStreamingMarketDataService.class */
public class KrakenStreamingMarketDataService implements StreamingMarketDataService {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KrakenStreamingMarketDataService.class);
    private static final int MIN_DATA_ARRAY_SIZE = 4;
    public static final String KRAKEN_CHANNEL_DELIMITER = "-";
    private final KrakenStreamingService service;
    private final boolean spreadForTicker;

    public KrakenStreamingMarketDataService(KrakenStreamingService krakenStreamingService, boolean z) {
        this.service = krakenStreamingService;
        this.spreadForTicker = z;
    }

    @Override // info.bitrich.xchangestream.core.StreamingMarketDataService
    public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... objArr) {
        String channelName = getChannelName(KrakenSubscriptionName.book, currencyPair);
        TreeSet newTreeSet = Sets.newTreeSet();
        TreeSet newTreeSet2 = Sets.newTreeSet();
        int intValue = ((Integer) ObjectUtils.defaultIfNull(KrakenStreamingService.parseOrderBookSize(objArr), 10)).intValue();
        return subscribe(channelName, 4, objArr).map(arrayNode -> {
            try {
                return KrakenStreamingAdapters.adaptOrderbookMessage(intValue, newTreeSet, newTreeSet2, currencyPair, arrayNode);
            } catch (IllegalStateException e) {
                LOG.warn("Resubscribing {} channel after adapter error {}", currencyPair, e.getMessage());
                newTreeSet.clear();
                newTreeSet2.clear();
                this.service.sendMessage(this.service.getUnsubscribeMessage(channelName, objArr));
                this.service.sendMessage(this.service.getSubscribeMessage(channelName, objArr));
                return new OrderBook((Date) null, (List<LimitOrder>) Lists.newArrayList(), (List<LimitOrder>) Lists.newArrayList(), false);
            }
        }).filter(orderBook -> {
            return orderBook.getBids().size() > 0 && orderBook.getAsks().size() > 0;
        });
    }

    @Override // info.bitrich.xchangestream.core.StreamingMarketDataService
    public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... objArr) {
        return this.spreadForTicker ? subscribe(getChannelName(KrakenSubscriptionName.spread, currencyPair), 4, null).map(arrayNode -> {
            return KrakenStreamingAdapters.adaptSpreadMessage(currencyPair, arrayNode);
        }) : subscribe(getChannelName(KrakenSubscriptionName.ticker, currencyPair), 4, null).map(arrayNode2 -> {
            return KrakenStreamingAdapters.adaptTickerMessage(currencyPair, arrayNode2);
        });
    }

    @Override // info.bitrich.xchangestream.core.StreamingMarketDataService
    public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... objArr) {
        return subscribe(getChannelName(KrakenSubscriptionName.trade, currencyPair), 4, null).flatMap(arrayNode -> {
            return Observable.fromIterable(KrakenStreamingAdapters.adaptTrades(currencyPair, arrayNode));
        });
    }

    public Observable<ArrayNode> subscribe(String str, int i, Object... objArr) {
        return this.service.subscribeChannel(str, objArr).filter(jsonNode -> {
            return jsonNode instanceof ArrayNode;
        }).map(jsonNode2 -> {
            return (ArrayNode) jsonNode2;
        }).filter(arrayNode -> {
            if (arrayNode.size() >= i) {
                return true;
            }
            LOG.warn("Invalid message in channel {}. It contains {} array items but expected at least {}", str, Integer.valueOf(arrayNode.size()), Integer.valueOf(i));
            return false;
        });
    }

    public String getChannelName(KrakenSubscriptionName krakenSubscriptionName, CurrencyPair currencyPair) {
        return krakenSubscriptionName + KRAKEN_CHANNEL_DELIMITER + (currencyPair.base.toString() + "/" + currencyPair.counter.toString());
    }
}
