/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.raft.internals;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.message.VotersRecord;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.errors.BufferAllocationException;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.raft.internals.BatchBuilder;
import org.apache.kafka.server.common.serialization.RecordSerde;

public class BatchAccumulator<T>
implements Closeable {
    private final int epoch;
    private final Time time;
    private final int lingerMs;
    private final int maxBatchSize;
    private final int maxNumberOfBatches;
    private final Compression compression;
    private final MemoryPool memoryPool;
    private final RecordSerde<T> serde;
    private final SimpleTimer lingerTimer = new SimpleTimer();
    private final AtomicLong drainOffset = new AtomicLong(Long.MAX_VALUE);
    private final ConcurrentLinkedQueue<CompletedBatch<T>> completed = new ConcurrentLinkedQueue();
    private volatile DrainStatus drainStatus = DrainStatus.NONE;
    private final ReentrantLock appendLock = new ReentrantLock();
    private long nextOffset;
    private BatchBuilder<T> currentBatch;

    public BatchAccumulator(int epoch, long baseOffset, int lingerMs, int maxBatchSize, int maxNumberOfBatches, MemoryPool memoryPool, Time time, Compression compression, RecordSerde<T> serde) {
        this.epoch = epoch;
        this.lingerMs = lingerMs;
        this.maxBatchSize = maxBatchSize;
        this.maxNumberOfBatches = maxNumberOfBatches;
        this.memoryPool = memoryPool;
        this.time = time;
        this.compression = compression;
        this.serde = serde;
        this.nextOffset = baseOffset;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long append(int epoch, List<T> records, boolean delayDrain) {
        int numberOfCompletedBatches = this.completed.size();
        if (epoch < this.epoch) {
            throw new NotLeaderException("Append failed because the given epoch " + epoch + " is stale. Current leader epoch = " + this.epoch());
        }
        if (epoch > this.epoch) {
            throw new IllegalArgumentException("Attempt to append from epoch " + epoch + " which is larger than the current epoch " + this.epoch);
        }
        if (numberOfCompletedBatches >= this.maxNumberOfBatches) {
            throw new IllegalStateException(String.format("Attempting to append records when the number of batches %s reached %s", numberOfCompletedBatches, this.maxNumberOfBatches));
        }
        ObjectSerializationCache serializationCache = new ObjectSerializationCache();
        this.appendLock.lock();
        try {
            long lastOffset = this.nextOffset + (long)records.size() - 1L;
            this.maybeCompleteDrain();
            BatchBuilder<T> batch = null;
            batch = this.maybeAllocateBatch(records, serializationCache);
            if (batch == null) {
                throw new BufferAllocationException("Append failed because we failed to allocate memory to write the batch");
            }
            if (delayDrain) {
                this.drainOffset.compareAndSet(Long.MAX_VALUE, this.nextOffset);
            }
            for (T record : records) {
                batch.appendRecord(record, serializationCache);
            }
            this.maybeResetLinger();
            this.nextOffset = lastOffset + 1L;
            long l = lastOffset;
            return l;
        }
        finally {
            this.appendLock.unlock();
        }
    }

    private void maybeResetLinger() {
        if (!this.lingerTimer.isRunning()) {
            this.lingerTimer.reset(this.time.milliseconds() + (long)this.lingerMs);
        }
    }

    private BatchBuilder<T> maybeAllocateBatch(Collection<T> records, ObjectSerializationCache serializationCache) {
        if (this.currentBatch == null) {
            this.startNewBatch();
        }
        if (this.currentBatch != null) {
            OptionalInt bytesNeeded = this.currentBatch.bytesNeeded(records, serializationCache);
            if (bytesNeeded.isPresent() && bytesNeeded.getAsInt() > this.maxBatchSize) {
                throw new RecordBatchTooLargeException(String.format("The total record(s) size of %d exceeds the maximum allowed batch size of %d", bytesNeeded.getAsInt(), this.maxBatchSize));
            }
            if (bytesNeeded.isPresent()) {
                this.completeCurrentBatch();
                this.startNewBatch();
            }
        }
        return this.currentBatch;
    }

    private void completeCurrentBatch() {
        MemoryRecords data = this.currentBatch.build();
        this.completed.add(new CompletedBatch<T>(this.currentBatch.baseOffset(), this.currentBatch.records(), data, this.memoryPool, this.currentBatch.initialBuffer()));
        this.currentBatch = null;
    }

    public void allowDrain() {
        this.drainOffset.set(Long.MAX_VALUE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long appendControlMessages(MemoryRecordsCreator valueCreator) {
        this.appendLock.lock();
        try {
            ByteBuffer buffer = this.memoryPool.tryAllocate(this.maxBatchSize);
            if (buffer != null) {
                try {
                    this.forceDrain();
                    MemoryRecords memoryRecords = valueCreator.create(this.nextOffset, this.epoch, this.compression, buffer);
                    int numberOfRecords = this.validateMemoryRecordsAndReturnCount(memoryRecords);
                    this.completed.add(new CompletedBatch(this.nextOffset, numberOfRecords, memoryRecords, this.memoryPool, buffer));
                    this.nextOffset += (long)numberOfRecords;
                }
                catch (Exception e) {
                    this.memoryPool.release(buffer);
                    throw e;
                }
            } else {
                throw new IllegalStateException("Could not allocate buffer for the control record");
            }
            long l = this.nextOffset - 1L;
            return l;
        }
        finally {
            this.appendLock.unlock();
        }
    }

    private int validateMemoryRecordsAndReturnCount(MemoryRecords memoryRecords) {
        Iterator batches = memoryRecords.batches().iterator();
        if (!batches.hasNext()) {
            throw new IllegalArgumentException("valueCreator didn't create a batch");
        }
        MutableRecordBatch batch = (MutableRecordBatch)batches.next();
        Integer numberOfRecords = batch.countOrNull();
        if (!batch.isControlBatch()) {
            throw new IllegalArgumentException("valueCreator didn't create a control batch");
        }
        if (batch.baseOffset() != this.nextOffset) {
            throw new IllegalArgumentException(String.format("Expected a base offset of %d but got %d", this.nextOffset, batch.baseOffset()));
        }
        if (batch.partitionLeaderEpoch() != this.epoch) {
            throw new IllegalArgumentException(String.format("Expected a partition leader epoch of %d but got %d", this.epoch, batch.partitionLeaderEpoch()));
        }
        if (numberOfRecords == null) {
            throw new IllegalArgumentException("valueCreator didn't create a batch with the count");
        }
        if (numberOfRecords < 1) {
            throw new IllegalArgumentException("valueCreator didn't create at least one control record");
        }
        if (batches.hasNext()) {
            throw new IllegalArgumentException("valueCreator created more than one batch");
        }
        return numberOfRecords;
    }

    public long appendVotersRecord(VotersRecord voters, long currentTimestamp) {
        return this.appendControlMessages((baseOffset, epoch, compression, buffer) -> MemoryRecords.withVotersRecord((long)baseOffset, (long)currentTimestamp, (int)epoch, (ByteBuffer)buffer, (VotersRecord)voters));
    }

    public void appendLeaderChangeMessage(LeaderChangeMessage leaderChangeMessage, long currentTimestamp) {
        this.appendControlMessages((baseOffset, epoch, compression, buffer) -> MemoryRecords.withLeaderChangeMessage((long)baseOffset, (long)currentTimestamp, (int)epoch, (ByteBuffer)buffer, (LeaderChangeMessage)leaderChangeMessage));
    }

    public void appendSnapshotHeaderRecord(SnapshotHeaderRecord snapshotHeaderRecord, long currentTimestamp) {
        this.appendControlMessages((baseOffset, epoch, compression, buffer) -> MemoryRecords.withSnapshotHeaderRecord((long)baseOffset, (long)currentTimestamp, (int)epoch, (ByteBuffer)buffer, (SnapshotHeaderRecord)snapshotHeaderRecord));
    }

    public void appendSnapshotFooterRecord(SnapshotFooterRecord snapshotFooterRecord, long currentTimestamp) {
        this.appendControlMessages((baseOffset, epoch, compression, buffer) -> MemoryRecords.withSnapshotFooterRecord((long)baseOffset, (long)currentTimestamp, (int)epoch, (ByteBuffer)buffer, (SnapshotFooterRecord)snapshotFooterRecord));
    }

    public void forceDrain() {
        this.appendLock.lock();
        try {
            this.drainStatus = DrainStatus.STARTED;
            this.maybeCompleteDrain();
        }
        finally {
            this.appendLock.unlock();
        }
    }

    private void maybeCompleteDrain() {
        if (this.drainStatus == DrainStatus.STARTED) {
            if (this.currentBatch != null && this.currentBatch.nonEmpty()) {
                this.completeCurrentBatch();
            }
            this.lingerTimer.reset(Long.MAX_VALUE);
            this.drainStatus = DrainStatus.FINISHED;
        }
    }

    private void startNewBatch() {
        ByteBuffer buffer = this.memoryPool.tryAllocate(this.maxBatchSize);
        if (buffer != null) {
            this.currentBatch = new BatchBuilder<T>(buffer, this.serde, this.compression, this.nextOffset, this.time.milliseconds(), this.epoch, this.maxBatchSize);
        }
    }

    public boolean needsDrain(long currentTimeMs) {
        return this.timeUntilDrain(currentTimeMs) <= 0L;
    }

    public long timeUntilDrain(long currentTimeMs) {
        boolean drainableBatches = Optional.ofNullable(this.completed.peek()).map(batch -> batch.drainable(this.drainOffset.get())).orElse(false);
        if (drainableBatches) {
            return 0L;
        }
        return this.lingerTimer.remainingMs(currentTimeMs);
    }

    public int epoch() {
        return this.epoch;
    }

    public List<CompletedBatch<T>> drain() {
        return this.drain(this.drainOffset.get());
    }

    private List<CompletedBatch<T>> drain(long drainOffset) {
        if (this.drainStatus == DrainStatus.NONE) {
            this.drainStatus = DrainStatus.STARTED;
        }
        if (this.appendLock.tryLock()) {
            try {
                this.maybeCompleteDrain();
            }
            finally {
                this.appendLock.unlock();
            }
        }
        if (this.drainStatus == DrainStatus.FINISHED) {
            this.drainStatus = DrainStatus.NONE;
            return this.drainCompleted(drainOffset);
        }
        return List.of();
    }

    private List<CompletedBatch<T>> drainCompleted(long drainOffset) {
        ArrayList<CompletedBatch<T>> res = new ArrayList<CompletedBatch<T>>();
        CompletedBatch<T> batch;
        while ((batch = this.completed.peek()) != null && batch.drainable(drainOffset)) {
            this.completed.poll();
            res.add(batch);
        }
        return res;
    }

    public boolean isEmpty() {
        return !this.lingerTimer.isRunning();
    }

    @Override
    public void close() {
        List<CompletedBatch<CompletedBatch>> unwritten;
        this.appendLock.lock();
        try {
            unwritten = this.drain(Long.MAX_VALUE);
        }
        finally {
            this.appendLock.unlock();
        }
        unwritten.forEach(CompletedBatch::release);
    }

    private static class SimpleTimer {
        private final AtomicLong deadlineMs = new AtomicLong(Long.MAX_VALUE);

        private SimpleTimer() {
        }

        boolean isRunning() {
            return this.deadlineMs.get() != Long.MAX_VALUE;
        }

        void reset(long deadlineMs) {
            this.deadlineMs.set(deadlineMs);
        }

        long remainingMs(long currentTimeMs) {
            return Math.max(0L, this.deadlineMs.get() - currentTimeMs);
        }
    }

    private static enum DrainStatus {
        STARTED,
        FINISHED,
        NONE;

    }

    public static class CompletedBatch<T> {
        public final long baseOffset;
        public final int numRecords;
        public final Optional<List<T>> records;
        public final MemoryRecords data;
        private final MemoryPool pool;
        private final ByteBuffer initialBuffer;

        private CompletedBatch(long baseOffset, List<T> records, MemoryRecords data, MemoryPool pool, ByteBuffer initialBuffer) {
            this.baseOffset = baseOffset;
            this.records = Optional.of(records);
            this.numRecords = records.size();
            this.data = data;
            this.pool = pool;
            this.initialBuffer = initialBuffer;
            this.validateContruction();
        }

        private CompletedBatch(long baseOffset, int numRecords, MemoryRecords data, MemoryPool pool, ByteBuffer initialBuffer) {
            this.baseOffset = baseOffset;
            this.records = Optional.empty();
            this.numRecords = numRecords;
            this.data = data;
            this.pool = pool;
            this.initialBuffer = initialBuffer;
            this.validateContruction();
        }

        private void validateContruction() {
            Objects.requireNonNull(this.data.firstBatch(), "Expected memory records to contain one batch");
            if (this.numRecords <= 0) {
                throw new IllegalArgumentException(String.format("Completed batch must contain at least one record: %s", this.numRecords));
            }
        }

        public int sizeInBytes() {
            return this.data.sizeInBytes();
        }

        public void release() {
            this.pool.release(this.initialBuffer);
        }

        public long appendTimestamp() {
            return this.data.firstBatch().maxTimestamp();
        }

        public boolean drainable(long drainOffset) {
            return this.baseOffset + (long)this.numRecords - 1L < drainOffset;
        }
    }

    @FunctionalInterface
    public static interface MemoryRecordsCreator {
        public MemoryRecords create(long var1, int var3, Compression var4, ByteBuffer var5);
    }
}

