package com.aizuda.snailjob.client.common.window;

import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.util.JsonUtil;
import com.aizuda.snailjob.common.core.window.Listener;
import com.aizuda.snailjob.common.log.SnailJobLog;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/aizuda/snailjob/client/common/window/SlidingWindow.class */
public class SlidingWindow<T> {
    public final TreeMap<LocalDateTime, ConcurrentLinkedQueue<T>> saveData = new TreeMap<>();
    private final Integer totalThreshold;
    private final Integer windowTotalThreshold;
    private final List<Listener<T>> listeners;
    private final ScheduledExecutorService threadPoolExecutor;
    private final long duration;
    private final ChronoUnit chronoUnit;
    private static final ReentrantLock SAVE_LOCK = new ReentrantLock();
    private static final ReentrantLock NOTICE_LOCK = new ReentrantLock();

    /* loaded from: input_file:com/aizuda/snailjob/client/common/window/SlidingWindow$Builder.class */
    public static class Builder<T> {
        private List<Listener<T>> listeners;
        private ScheduledExecutorService threadPoolExecutor;
        private Integer totalThreshold = 10;
        private Integer windowTotalThreshold = 5;
        private long duration = 10;
        private ChronoUnit chronoUnit = ChronoUnit.SECONDS;

        public static <T> Builder<T> newBuilder() {
            return new Builder<>();
        }

        public Builder<T> withTotalThreshold(int i) {
            Assert.isTrue(i > 0, "Total window period threshold cannot be less than 0");
            this.totalThreshold = Integer.valueOf(i);
            return this;
        }

        public Builder<T> withWindowTotalThreshold(int i) {
            Assert.isTrue(i > 0, "Window quantity threshold cannot be less than 0");
            this.windowTotalThreshold = Integer.valueOf(i);
            return this;
        }

        public Builder<T> withListener(Listener<T> listener) {
            if (CollUtil.isEmpty(this.listeners)) {
                this.listeners = new ArrayList();
            }
            this.listeners.add(listener);
            return this;
        }

        public Builder<T> withDuration(long j, ChronoUnit chronoUnit) {
            Assert.isTrue(j > 0, "Window period cannot be less than 0");
            this.duration = j;
            this.chronoUnit = chronoUnit;
            return this;
        }

        public Builder<T> withScheduledExecutorServiced(ScheduledExecutorService scheduledExecutorService) {
            this.threadPoolExecutor = scheduledExecutorService;
            return this;
        }

        public SlidingWindow<T> build() {
            if (Objects.isNull(this.threadPoolExecutor)) {
                this.threadPoolExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
                    return new Thread(runnable, "sliding-window-thread");
                });
            }
            if (CollUtil.isEmpty(this.listeners)) {
                this.listeners = Collections.EMPTY_LIST;
            }
            return new SlidingWindow<>(this.totalThreshold.intValue(), this.windowTotalThreshold.intValue(), this.listeners, this.threadPoolExecutor, this.duration, this.chronoUnit);
        }
    }

    public SlidingWindow(int i, int i2, List<Listener<T>> list, ScheduledExecutorService scheduledExecutorService, long j, ChronoUnit chronoUnit) {
        this.totalThreshold = Integer.valueOf(i);
        this.listeners = list;
        this.windowTotalThreshold = Integer.valueOf(i2);
        this.threadPoolExecutor = scheduledExecutorService;
        this.duration = j;
        this.chronoUnit = chronoUnit;
    }

    public void add(T t) {
        LocalDateTime now = LocalDateTime.now();
        if (!isOpenNewWindow(now)) {
            oldWindowAdd(t);
            return;
        }
        SAVE_LOCK.lock();
        LocalDateTime plus = now.plus(this.duration, (TemporalUnit) this.chronoUnit);
        try {
            if (isOpenNewWindow(now)) {
                ConcurrentLinkedQueue<T> concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
                concurrentLinkedQueue.add(t);
                SnailJobLog.LOCAL.debug("Adding new data [{}] [{}] size:[{}]", new Object[]{plus, Thread.currentThread().getName(), Integer.valueOf(concurrentLinkedQueue.size())});
                this.saveData.put(plus, concurrentLinkedQueue);
                removeInvalidWindow();
            } else {
                oldWindowAdd(t);
            }
            SAVE_LOCK.unlock();
        } catch (Throwable th) {
            SAVE_LOCK.unlock();
            throw th;
        }
    }

    private void alarmWindowTotal() {
        if (this.saveData.size() > this.windowTotalThreshold.intValue()) {
            SnailJobLog.LOCAL.warn(" The number of currently active windows is too high Total:[{}] > Threshold:[{}]", new Object[]{Integer.valueOf(this.saveData.size()), this.windowTotalThreshold});
        }
    }

    private void removeInvalidWindow() {
        for (int i = 0; i < this.saveData.size() - 2; i++) {
            Map.Entry<LocalDateTime, ConcurrentLinkedQueue<T>> firstEntry = this.saveData.firstEntry();
            if (CollUtil.isEmpty(firstEntry.getValue())) {
                this.saveData.remove(firstEntry.getKey());
            }
        }
    }

    private void oldWindowAdd(T t) {
        LocalDateTime newWindowPeriod = getNewWindowPeriod();
        if (Objects.isNull(newWindowPeriod)) {
            return;
        }
        int i = 10;
        ConcurrentLinkedQueue<T> concurrentLinkedQueue = this.saveData.get(newWindowPeriod);
        while (Objects.isNull(concurrentLinkedQueue) && i > 0) {
            i--;
            newWindowPeriod = getNewWindowPeriod();
            if (!Objects.isNull(newWindowPeriod)) {
                concurrentLinkedQueue = this.saveData.get(newWindowPeriod);
            }
        }
        if (Objects.nonNull(concurrentLinkedQueue)) {
            concurrentLinkedQueue.add(t);
        } else {
            SnailJobLog.LOCAL.error("Data loss. [{}]", new Object[]{JsonUtil.toJsonString(t)});
        }
        if (concurrentLinkedQueue.size() >= this.totalThreshold.intValue()) {
            doHandlerListener(newWindowPeriod);
        }
    }

    private void doHandlerListener(LocalDateTime localDateTime) {
        ConcurrentLinkedQueue<T> concurrentLinkedQueue;
        NOTICE_LOCK.lock();
        ConcurrentLinkedQueue<T> concurrentLinkedQueue2 = null;
        try {
            try {
                concurrentLinkedQueue = this.saveData.get(localDateTime);
            } catch (Exception e) {
                SnailJobLog.LOCAL.error("deep copy task queue is error", new Object[]{e});
                NOTICE_LOCK.unlock();
            }
            if (CollUtil.isEmpty(concurrentLinkedQueue)) {
                NOTICE_LOCK.unlock();
                return;
            }
            concurrentLinkedQueue2 = new ConcurrentLinkedQueue<>(concurrentLinkedQueue);
            clear(localDateTime, concurrentLinkedQueue2);
            if (CollUtil.isEmpty(concurrentLinkedQueue2)) {
                NOTICE_LOCK.unlock();
                return;
            }
            NOTICE_LOCK.unlock();
            if (CollectionUtils.isEmpty(concurrentLinkedQueue2)) {
                return;
            }
            try {
                Iterator<Listener<T>> it = this.listeners.iterator();
                while (it.hasNext()) {
                    it.next().handler(new ArrayList(concurrentLinkedQueue2));
                }
            } catch (Exception e2) {
                SnailJobLog.LOCAL.error("notice is error", new Object[]{e2});
            }
        } catch (Throwable th) {
            NOTICE_LOCK.unlock();
            throw th;
        }
    }

    private LocalDateTime getOldWindowPeriod() {
        try {
            return this.saveData.firstKey();
        } catch (NoSuchElementException e) {
            SnailJobLog.LOCAL.error("First window exception. saveData:[{}]", new Object[]{JsonUtil.toJsonString(this.saveData)});
            return null;
        }
    }

    private LocalDateTime getNewWindowPeriod() {
        try {
            return this.saveData.lastKey();
        } catch (NoSuchElementException e) {
            SnailJobLog.LOCAL.error("The last window is abnormal. SaveData:[{}]", new Object[]{JsonUtil.toJsonString(this.saveData)});
            return null;
        }
    }

    private void removeInvalidWindow(LocalDateTime localDateTime) {
        if (localDateTime.isBefore(LocalDateTime.now().minus(this.duration * 2, (TemporalUnit) this.chronoUnit)) && CollUtil.isEmpty(this.saveData.get(localDateTime))) {
            this.saveData.remove(localDateTime);
        }
    }

    private boolean isOpenNewWindow(LocalDateTime localDateTime) {
        if (this.saveData.isEmpty()) {
            return true;
        }
        LocalDateTime newWindowPeriod = getNewWindowPeriod();
        if (Objects.isNull(newWindowPeriod)) {
            return true;
        }
        return newWindowPeriod.isBefore(localDateTime);
    }

    private void extract(LocalDateTime localDateTime) {
        if (this.saveData.isEmpty()) {
            return;
        }
        LocalDateTime oldWindowPeriod = getOldWindowPeriod();
        if (Objects.isNull(oldWindowPeriod)) {
            return;
        }
        removeInvalidWindow(oldWindowPeriod);
        alarmWindowTotal();
        if (oldWindowPeriod.isBefore(localDateTime)) {
            SnailJobLog.LOCAL.debug("Time window reached [{}] [{}]", new Object[]{oldWindowPeriod, JsonUtil.toJsonString(this.saveData)});
            doHandlerListener(oldWindowPeriod);
        }
    }

    private void clear(LocalDateTime localDateTime, ConcurrentLinkedQueue<T> concurrentLinkedQueue) {
        this.saveData.get(localDateTime).removeAll(concurrentLinkedQueue);
    }

    public void start() {
        this.threadPoolExecutor.scheduleAtFixedRate(() -> {
            try {
                extract(LocalDateTime.now());
            } catch (Exception e) {
                SnailJobLog.LOCAL.error("Sliding window exception", new Object[]{e});
            }
        }, 1L, 1L, TimeUnit.SECONDS);
    }

    public void end() {
        Iterator<LocalDateTime> it = this.saveData.keySet().iterator();
        while (it.hasNext()) {
            doHandlerListener(it.next());
        }
    }
}
