package info.bitrich.xchangestream.binance;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import info.bitrich.xchangestream.binance.dto.BaseBinanceWebSocketTransaction;
import info.bitrich.xchangestream.binance.dto.OutboundAccountPositionBinanceWebsocketTransaction;
import info.bitrich.xchangestream.core.StreamingAccountService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.subjects.BehaviorSubject;
import io.reactivex.subjects.Subject;
import java.util.Objects;
import org.knowm.xchange.currency.Currency;
import org.knowm.xchange.dto.account.Balance;
import org.knowm.xchange.exceptions.ExchangeException;
import org.knowm.xchange.exceptions.ExchangeSecurityException;

/* loaded from: input_file:info/bitrich/xchangestream/binance/BinanceStreamingAccountService.class */
public class BinanceStreamingAccountService implements StreamingAccountService {
    private volatile Disposable accountInfo;
    private volatile BinanceUserDataStreamingService binanceUserDataStreamingService;
    private final BehaviorSubject<OutboundAccountPositionBinanceWebsocketTransaction> accountInfoLast = BehaviorSubject.create();
    private final Subject<OutboundAccountPositionBinanceWebsocketTransaction> accountInfoPublisher = this.accountInfoLast.toSerialized();
    private final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();

    public BinanceStreamingAccountService(BinanceUserDataStreamingService binanceUserDataStreamingService) {
        this.binanceUserDataStreamingService = binanceUserDataStreamingService;
    }

    public Observable<OutboundAccountPositionBinanceWebsocketTransaction> getRawAccountInfo() {
        checkConnected();
        return this.accountInfoPublisher;
    }

    public Observable<Balance> getBalanceChanges() {
        checkConnected();
        return getRawAccountInfo().map((v0) -> {
            return v0.toBalanceList();
        }).flatMap((v0) -> {
            return Observable.fromIterable(v0);
        });
    }

    private void checkConnected() {
        if (this.binanceUserDataStreamingService == null || !this.binanceUserDataStreamingService.isSocketOpen()) {
            throw new ExchangeSecurityException("Not authenticated");
        }
    }

    @Override // info.bitrich.xchangestream.core.StreamingAccountService
    public Observable<Balance> getBalanceChanges(Currency currency, Object... objArr) {
        return getBalanceChanges().filter(balance -> {
            return balance.getCurrency().equals(currency);
        });
    }

    public void openSubscriptions() {
        if (this.binanceUserDataStreamingService != null) {
            Observable filter = this.binanceUserDataStreamingService.subscribeChannel(BaseBinanceWebSocketTransaction.BinanceWebSocketTypes.OUTBOUND_ACCOUNT_POSITION).map(this::accountInfo).filter(outboundAccountPositionBinanceWebsocketTransaction -> {
                return this.accountInfoLast.getValue() == null || this.accountInfoLast.getValue().getEventTime().before(outboundAccountPositionBinanceWebsocketTransaction.getEventTime());
            });
            Subject<OutboundAccountPositionBinanceWebsocketTransaction> subject = this.accountInfoPublisher;
            Objects.requireNonNull(subject);
            this.accountInfo = filter.subscribe((v1) -> {
                r2.onNext(v1);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setUserDataStreamingService(BinanceUserDataStreamingService binanceUserDataStreamingService) {
        if (this.accountInfo != null && !this.accountInfo.isDisposed()) {
            this.accountInfo.dispose();
        }
        this.binanceUserDataStreamingService = binanceUserDataStreamingService;
        openSubscriptions();
    }

    private OutboundAccountPositionBinanceWebsocketTransaction accountInfo(JsonNode jsonNode) {
        try {
            return (OutboundAccountPositionBinanceWebsocketTransaction) this.mapper.treeToValue(jsonNode, OutboundAccountPositionBinanceWebsocketTransaction.class);
        } catch (Exception e) {
            throw new ExchangeException("Unable to parse account info", e);
        }
    }
}
