package info.bitrich.xchangestream.okex;

import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.bitstamp.v2.BitstampStreamingService;
import info.bitrich.xchangestream.core.StreamingTradeService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import java.util.List;
import org.knowm.xchange.dto.meta.ExchangeMetaData;
import org.knowm.xchange.dto.trade.UserTrade;
import org.knowm.xchange.instrument.Instrument;
import org.knowm.xchange.okex.OkexAdapters;
import org.knowm.xchange.okex.dto.trade.OkexOrderDetails;

/* loaded from: input_file:info/bitrich/xchangestream/okex/OkexStreamingTradeService.class */
public class OkexStreamingTradeService implements StreamingTradeService {
    private final OkexStreamingService service;
    private final ExchangeMetaData exchangeMetaData;
    private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();

    public OkexStreamingTradeService(OkexStreamingService okexStreamingService, ExchangeMetaData exchangeMetaData) {
        this.service = okexStreamingService;
        this.exchangeMetaData = exchangeMetaData;
    }

    @Override // info.bitrich.xchangestream.core.StreamingTradeService
    public Observable<UserTrade> getUserTrades(Instrument instrument, Object... objArr) {
        return this.service.subscribeChannel(OkexStreamingService.USERTRADES + OkexAdapters.adaptInstrument(instrument), new Object[0]).filter(jsonNode -> {
            return jsonNode.has(BitstampStreamingService.EVENT_ORDERBOOK);
        }).flatMap(jsonNode2 -> {
            return Observable.fromIterable(OkexAdapters.adaptUserTrades((List) this.mapper.treeToValue(jsonNode2.get(BitstampStreamingService.EVENT_ORDERBOOK), this.mapper.getTypeFactory().constructCollectionType(List.class, OkexOrderDetails.class)), this.exchangeMetaData).getUserTrades());
        });
    }
}
