package org.apache.beam.runners.core.metrics;

import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.api.python.shaded.org.joda.time.DateTimeUtils;

/* loaded from: input_file:org/apache/beam/runners/core/metrics/ExecutionStateSampler.class */
public class ExecutionStateSampler {
    private final DateTimeUtils.MillisProvider clock;

    @VisibleForTesting
    volatile long lastSampleTimeMillis;
    private static final DateTimeUtils.MillisProvider SYSTEM_MILLIS_PROVIDER = System::currentTimeMillis;
    private static final ExecutionStateSampler INSTANCE = new ExecutionStateSampler(SYSTEM_MILLIS_PROVIDER);
    private static long periodMs = 200;
    private final Set<ExecutionStateTracker> activeTrackers = ConcurrentHashMap.newKeySet();
    private Future<Void> executionSamplerFuture = null;

    private ExecutionStateSampler(DateTimeUtils.MillisProvider millisProvider) {
        this.clock = millisProvider;
    }

    public static ExecutionStateSampler instance() {
        return INSTANCE;
    }

    @VisibleForTesting
    public static ExecutionStateSampler newForTest() {
        return new ExecutionStateSampler(SYSTEM_MILLIS_PROVIDER);
    }

    @VisibleForTesting
    public static ExecutionStateSampler newForTest(DateTimeUtils.MillisProvider millisProvider) {
        return new ExecutionStateSampler((DateTimeUtils.MillisProvider) Preconditions.checkNotNull(millisProvider));
    }

    public static void setSamplingPeriod(long j) {
        periodMs = j;
    }

    public void reset() {
        this.lastSampleTimeMillis = 0L;
    }

    public void start() {
        start(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("state-sampler-%d").build()));
    }

    @VisibleForTesting
    synchronized void start(ExecutorService executorService) {
        if (this.executionSamplerFuture != null) {
            return;
        }
        this.executionSamplerFuture = executorService.submit(() -> {
            this.lastSampleTimeMillis = this.clock.getMillis();
            long j = this.lastSampleTimeMillis + periodMs;
            while (!Thread.interrupted()) {
                long millis = this.clock.getMillis();
                long j2 = j - millis;
                if (j2 > 0) {
                    try {
                        Thread.sleep(j2);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                } else {
                    doSampling(millis - this.lastSampleTimeMillis);
                    this.lastSampleTimeMillis = millis;
                    j = this.lastSampleTimeMillis + periodMs;
                }
            }
            return null;
        });
    }

    public synchronized void stop() {
        if (this.executionSamplerFuture == null) {
            return;
        }
        this.executionSamplerFuture.cancel(true);
        try {
            try {
                try {
                    this.executionSamplerFuture.get(5 * periodMs, TimeUnit.MILLISECONDS);
                    this.executionSamplerFuture = null;
                } catch (InterruptedException | TimeoutException e) {
                    throw new RuntimeException("Failed to stop state sampling after waiting 5 sampling periods.", e);
                }
            } catch (CancellationException e2) {
                this.executionSamplerFuture = null;
            } catch (ExecutionException e3) {
                throw new RuntimeException("Exception in state sampler", e3);
            }
        } catch (Throwable th) {
            this.executionSamplerFuture = null;
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addTracker(ExecutionStateTracker executionStateTracker) {
        this.activeTrackers.add(executionStateTracker);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeTracker(ExecutionStateTracker executionStateTracker) {
        this.activeTrackers.remove(executionStateTracker);
        long millis = this.clock.getMillis() - this.lastSampleTimeMillis;
        if (millis > 0) {
            executionStateTracker.takeSample(millis);
        }
    }

    @VisibleForTesting
    public void doSampling(long j) {
        Iterator<ExecutionStateTracker> it = this.activeTrackers.iterator();
        while (it.hasNext()) {
            it.next().takeSample(j);
        }
    }
}
