package info.bitrich.xchangestream.binance;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction;
import info.bitrich.xchangestream.binance.dto.ExecutionReportBinanceUserTransaction;
import info.bitrich.xchangestream.core.StreamingTradeService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
import java.io.IOException;
import java.util.Objects;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.derivative.FuturesContract;
import org.knowm.xchange.dto.Order;
import org.knowm.xchange.dto.trade.UserTrade;
import org.knowm.xchange.exceptions.ExchangeException;
import org.knowm.xchange.exceptions.ExchangeSecurityException;
import org.knowm.xchange.instrument.Instrument;

/* loaded from: input_file:info/bitrich/xchangestream/binance/BinanceStreamingTradeService.class */
public class BinanceStreamingTradeService implements StreamingTradeService {
    private volatile Disposable executionReports;
    private volatile BinanceUserDataStreamingService binanceUserDataStreamingService;
    private final Subject<ExecutionReportBinanceUserTransaction> executionReportsPublisher = PublishSubject.create().toSerialized();
    private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();

    public BinanceStreamingTradeService(BinanceUserDataStreamingService binanceUserDataStreamingService) {
        this.binanceUserDataStreamingService = binanceUserDataStreamingService;
    }

    public Observable<ExecutionReportBinanceUserTransaction> getRawExecutionReports() {
        if (this.binanceUserDataStreamingService == null || !this.binanceUserDataStreamingService.isSocketOpen()) {
            throw new ExchangeSecurityException("Not authenticated");
        }
        return this.executionReportsPublisher;
    }

    public Observable<Order> getOrderChanges(boolean z) {
        return getRawExecutionReports().filter(executionReportBinanceUserTransaction -> {
            return !executionReportBinanceUserTransaction.getExecutionType().equals(ExecutionReportBinanceUserTransaction.ExecutionType.REJECTED);
        }).map(executionReportBinanceUserTransaction2 -> {
            return executionReportBinanceUserTransaction2.toOrder(z);
        });
    }

    @Override // info.bitrich.xchangestream.core.StreamingTradeService
    public Observable<Order> getOrderChanges(CurrencyPair currencyPair, Object... objArr) {
        return getOrderChanges(false).filter(order -> {
            return currencyPair.equals(order.getInstrument());
        });
    }

    @Override // info.bitrich.xchangestream.core.StreamingTradeService
    public Observable<UserTrade> getUserTrades(CurrencyPair currencyPair, Object... objArr) {
        return getUserTrades(false).filter(userTrade -> {
            return userTrade.getInstrument().equals(currencyPair);
        });
    }

    @Override // info.bitrich.xchangestream.core.StreamingTradeService
    public Observable<Order> getOrderChanges(Instrument instrument, Object... objArr) {
        return getOrderChanges(instrument instanceof FuturesContract).filter(order -> {
            return instrument.equals(order.getInstrument());
        });
    }

    @Override // info.bitrich.xchangestream.core.StreamingTradeService
    public Observable<UserTrade> getUserTrades(Instrument instrument, Object... objArr) {
        return getUserTrades(instrument instanceof FuturesContract).filter(userTrade -> {
            return userTrade.getInstrument().equals(instrument);
        });
    }

    public Observable<UserTrade> getUserTrades(boolean z) {
        return getRawExecutionReports().filter(executionReportBinanceUserTransaction -> {
            return executionReportBinanceUserTransaction.getExecutionType().equals(ExecutionReportBinanceUserTransaction.ExecutionType.TRADE);
        }).map(executionReportBinanceUserTransaction2 -> {
            return executionReportBinanceUserTransaction2.toUserTrade(z);
        });
    }

    public void openSubscriptions() {
        if (this.binanceUserDataStreamingService != null) {
            Observable<R> map = this.binanceUserDataStreamingService.subscribeChannel(BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.EXECUTION_REPORT).map(this::executionReport);
            Subject<ExecutionReportBinanceUserTransaction> subject = this.executionReportsPublisher;
            Objects.requireNonNull(subject);
            this.executionReports = map.subscribe((Consumer<? super R>) (v1) -> {
                r2.onNext(v1);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setUserDataStreamingService(BinanceUserDataStreamingService binanceUserDataStreamingService) {
        if (this.executionReports != null && !this.executionReports.isDisposed()) {
            this.executionReports.dispose();
        }
        this.binanceUserDataStreamingService = binanceUserDataStreamingService;
        openSubscriptions();
    }

    private ExecutionReportBinanceUserTransaction executionReport(JsonNode jsonNode) {
        try {
            return (ExecutionReportBinanceUserTransaction) this.mapper.treeToValue(jsonNode, ExecutionReportBinanceUserTransaction.class);
        } catch (IOException e) {
            throw new ExchangeException("Unable to parse execution report", e);
        }
    }
}
