package io.questdb.cutlass.text;

import io.questdb.cairo.CairoConfiguration;
import io.questdb.cairo.CairoEngine;
import io.questdb.cairo.PartitionBy;
import io.questdb.cairo.TableReader;
import io.questdb.cairo.TableToken;
import io.questdb.cairo.TableWriter;
import io.questdb.cairo.security.AllowAllCairoSecurityContext;
import io.questdb.cutlass.text.ParallelCsvFileImporter;
import io.questdb.griffin.FunctionFactoryCache;
import io.questdb.griffin.SqlCompiler;
import io.questdb.griffin.SqlException;
import io.questdb.griffin.SqlExecutionContextImpl;
import io.questdb.log.Log;
import io.questdb.log.LogFactory;
import io.questdb.mp.RingQueue;
import io.questdb.mp.Sequence;
import io.questdb.mp.SynchronizedJob;
import io.questdb.std.LongList;
import io.questdb.std.Misc;
import io.questdb.std.Numbers;
import io.questdb.std.Sinkable;
import io.questdb.std.datetime.microtime.MicrosecondClock;
import io.questdb.std.str.Path;
import io.questdb.std.str.StringSink;
import java.io.Closeable;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/questdb/cutlass/text/TextImportRequestJob.class */
public class TextImportRequestJob extends SynchronizedJob implements Closeable {
    private static final Log LOG = LogFactory.getLog((Class<?>) TextImportRequestJob.class);
    private final MicrosecondClock clock;
    private final CairoEngine engine;
    private final int logRetentionDays;
    private final RingQueue<TextImportRequestTask> requestQueue;
    private final Sequence requestSubSeq;
    private final TableToken statusTableToken;
    private final TextImportExecutionContext textImportExecutionContext;
    private ParallelCsvFileImporter parallelImporter;
    private Path path;
    private SerialCsvFileImporter serialImporter;
    private SqlCompiler sqlCompiler;
    private SqlExecutionContextImpl sqlExecutionContext;
    private TextImportRequestTask task;
    private TableWriter writer;
    private final LongList partitionsToRemove = new LongList();
    private final StringSink stringSink = new StringSink();
    private final ParallelCsvFileImporter.PhaseStatusReporter updateStatusRef = this::updateStatus;

    public TextImportRequestJob(CairoEngine cairoEngine, int i, @Nullable FunctionFactoryCache functionFactoryCache) throws SqlException {
        this.requestQueue = cairoEngine.getMessageBus().getTextImportRequestQueue();
        this.requestSubSeq = cairoEngine.getMessageBus().getTextImportRequestSubSeq();
        this.parallelImporter = new ParallelCsvFileImporter(cairoEngine, i);
        this.serialImporter = new SerialCsvFileImporter(cairoEngine);
        CairoConfiguration configuration = cairoEngine.getConfiguration();
        this.clock = configuration.getMicrosecondClock();
        this.sqlCompiler = new SqlCompiler(cairoEngine, functionFactoryCache, null);
        this.sqlExecutionContext = new SqlExecutionContextImpl(cairoEngine, 1);
        this.sqlExecutionContext.with(AllowAllCairoSecurityContext.INSTANCE, null, null);
        String str = ((Object) configuration.getSystemTableNamePrefix()) + "text_import_log";
        this.sqlCompiler.compile("CREATE TABLE IF NOT EXISTS \"" + str + "\" (ts timestamp, id string, table symbol, file symbol, phase symbol, status symbol, message string,rows_handled long,rows_imported long,errors long) timestamp(ts) partition by DAY", this.sqlExecutionContext);
        this.statusTableToken = cairoEngine.getTableToken(str);
        this.writer = cairoEngine.getWriter(AllowAllCairoSecurityContext.INSTANCE, this.statusTableToken, "QuestDB system");
        this.logRetentionDays = configuration.getSqlCopyLogRetentionDays();
        this.textImportExecutionContext = cairoEngine.getTextImportExecutionContext();
        this.path = new Path();
        this.engine = cairoEngine;
        enforceLogRetention();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.parallelImporter = (ParallelCsvFileImporter) Misc.free(this.parallelImporter);
        this.serialImporter = (SerialCsvFileImporter) Misc.free(this.serialImporter);
        this.writer = (TableWriter) Misc.free(this.writer);
        this.sqlCompiler = (SqlCompiler) Misc.free(this.sqlCompiler);
        this.sqlExecutionContext = (SqlExecutionContextImpl) Misc.free(this.sqlExecutionContext);
        this.path = (Path) Misc.free(this.path);
    }

    private void updateStatus(byte b, byte b2, CharSequence charSequence, long j, long j2, long j3) {
        if (this.writer != null) {
            this.stringSink.clear();
            Numbers.appendHex(this.stringSink, this.task.getImportId(), true);
            try {
                TableWriter.Row newRow = this.writer.newRow(this.clock.getTicks());
                newRow.putStr(1, this.stringSink);
                newRow.putSym(2, this.task.getTableName());
                newRow.putSym(3, this.task.getFileName());
                newRow.putSym(4, TextImportTask.getPhaseName(b));
                newRow.putSym(5, TextImportTask.getStatusName(b2));
                newRow.putStr(6, charSequence);
                newRow.putLong(7, j);
                newRow.putLong(8, j2);
                newRow.putLong(9, j3);
                newRow.append();
                this.writer.commit();
            } catch (Throwable th) {
                LOG.error().$((CharSequence) "could not update status table [importId=").$hexPadded(this.task.getImportId()).$((CharSequence) ", statusTableName=").$((Sinkable) this.statusTableToken).$((CharSequence) ", tableName=").$((CharSequence) this.task.getTableName()).$((CharSequence) ", fileName=").$((CharSequence) this.task.getFileName()).$((CharSequence) ", phase=").$((CharSequence) TextImportTask.getPhaseName(b)).$((CharSequence) ", status=").$((CharSequence) TextImportTask.getStatusName(b)).$((CharSequence) ", msg=").$(charSequence).$((CharSequence) ", rowsHandled=").$(j).$((CharSequence) ", rowsImported=").$(j2).$((CharSequence) ", errors=").$(j3).$((CharSequence) ", error=`").$(th).$('`').I$();
                this.writer = (TableWriter) Misc.free(this.writer);
            }
            if (this.writer == null) {
                try {
                    this.writer = this.engine.getWriter(AllowAllCairoSecurityContext.INSTANCE, this.statusTableToken, "QuestDB system");
                } catch (Throwable th2) {
                    LOG.error().$((CharSequence) "could not re-open writer [table=").$((Sinkable) this.statusTableToken).$((CharSequence) ", error=`").$(th2).$('`').I$();
                }
            }
        }
    }

    private boolean useParallelImport() {
        TableToken tableTokenIfExists = this.engine.getTableTokenIfExists(this.task.getTableName());
        if (this.engine.getStatus(this.sqlExecutionContext.getCairoSecurityContext(), this.path, tableTokenIfExists) != 0) {
            return this.task.getPartitionBy() >= 0 && this.task.getPartitionBy() != 3;
        }
        TableReader reader = this.engine.getReader(this.sqlExecutionContext.getCairoSecurityContext(), tableTokenIfExists);
        Throwable th = null;
        try {
            try {
                boolean isPartitioned = PartitionBy.isPartitioned(reader.getPartitionedBy());
                if (reader != null) {
                    if (0 != 0) {
                        try {
                            reader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        reader.close();
                    }
                }
                return isPartitioned;
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (reader != null) {
                if (th != null) {
                    try {
                        reader.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    reader.close();
                }
            }
            throw th4;
        }
    }

    void enforceLogRetention() {
        if (this.writer != null) {
            if (this.logRetentionDays < 1) {
                this.writer.truncate();
                return;
            }
            if (this.writer.getPartitionCount() > 0) {
                this.partitionsToRemove.clear();
                for (int partitionCount = (this.writer.getPartitionCount() - this.logRetentionDays) - 1; partitionCount > -1; partitionCount--) {
                    this.partitionsToRemove.add(this.writer.getPartitionTimestamp(partitionCount));
                }
                int size = this.partitionsToRemove.size();
                for (int i = 0; i < size; i++) {
                    this.writer.removePartition(this.partitionsToRemove.getQuick(i));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.questdb.mp.SynchronizedJob
    public boolean runSerially() {
        long next = this.requestSubSeq.next();
        if (next <= -1) {
            return false;
        }
        this.task = this.requestQueue.get(next);
        try {
            try {
                if (useParallelImport()) {
                    this.parallelImporter.of(this.task.getTableName(), this.task.getFileName(), this.task.getImportId(), this.task.getPartitionBy(), this.task.getDelimiter(), this.task.getTimestampColumnName(), this.task.getTimestampFormat(), this.task.isHeaderFlag(), this.textImportExecutionContext.getCircuitBreaker(), this.task.getAtomicity());
                    this.parallelImporter.setStatusReporter(this.updateStatusRef);
                    this.parallelImporter.process();
                } else {
                    this.serialImporter.of(this.task.getTableName(), this.task.getFileName(), this.task.getImportId(), this.task.getDelimiter(), this.task.getTimestampColumnName(), this.task.getTimestampFormat(), this.task.isHeaderFlag(), this.textImportExecutionContext.getCircuitBreaker(), this.task.getAtomicity());
                    this.serialImporter.setStatusReporter(this.updateStatusRef);
                    this.serialImporter.process();
                }
                this.requestSubSeq.done(next);
                this.textImportExecutionContext.resetActiveImportId();
            } catch (TextImportException e) {
                updateStatus((byte) -1, e.isCancelled() ? (byte) 3 : (byte) 2, e.getMessage(), 0L, 0L, 0L);
                this.requestSubSeq.done(next);
                this.textImportExecutionContext.resetActiveImportId();
            }
            enforceLogRetention();
            return true;
        } catch (Throwable th) {
            this.requestSubSeq.done(next);
            this.textImportExecutionContext.resetActiveImportId();
            throw th;
        }
    }
}
