package org.apache.flink.kubernetes.kubeclient;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.LoadBalancerStatus;
import io.fabric8.kubernetes.api.model.NodeList;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServicePort;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentList;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.PodResource;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import io.fabric8.kubernetes.client.dsl.ServiceResource;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesPodsWatcher;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesWatch;
import org.apache.flink.kubernetes.kubeclient.services.ServiceType;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.class */
public class Fabric8FlinkKubeClient implements FlinkKubeClient {
    private static final Logger LOG = LoggerFactory.getLogger(Fabric8FlinkKubeClient.class);
    private final String clusterId;
    private final String namespace;
    private final int maxRetryAttempts;
    private final KubernetesConfigOptions.NodePortAddressType nodePortAddressType;
    private final NamespacedKubernetesClient internalClient;
    private final ExecutorService kubeClientExecutorService;

    public Fabric8FlinkKubeClient(Configuration configuration, NamespacedKubernetesClient namespacedKubernetesClient, ExecutorService executorService) {
        this.clusterId = (String) configuration.getOptional(KubernetesConfigOptions.CLUSTER_ID).orElseThrow(() -> {
            return new IllegalArgumentException(String.format("Configuration option '%s' is not set.", KubernetesConfigOptions.CLUSTER_ID.key()));
        });
        this.namespace = configuration.getString(KubernetesConfigOptions.NAMESPACE);
        this.maxRetryAttempts = configuration.getInteger(KubernetesConfigOptions.KUBERNETES_TRANSACTIONAL_OPERATION_MAX_RETRIES);
        this.nodePortAddressType = (KubernetesConfigOptions.NodePortAddressType) configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE);
        this.internalClient = (NamespacedKubernetesClient) Preconditions.checkNotNull(namespacedKubernetesClient);
        this.kubeClientExecutorService = (ExecutorService) Preconditions.checkNotNull(executorService);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
    public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJobManagerSpecification) {
        Deployment deployment = kubernetesJobManagerSpecification.getDeployment();
        List<HasMetadata> accompanyingResources = kubernetesJobManagerSpecification.getAccompanyingResources();
        LOG.debug("Start to create deployment with spec {}{}", System.lineSeparator(), KubernetesUtils.tryToGetPrettyPrintYaml(deployment));
        setOwnerReference((Deployment) this.internalClient.apps().deployments().create((MixedOperation<Deployment, DeploymentList, RollableScalableResource<Deployment>>) deployment), accompanyingResources);
        this.internalClient.resourceList(accompanyingResources).createOrReplace();
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
    public CompletableFuture<Void> createTaskManagerPod(KubernetesPod kubernetesPod) {
        return CompletableFuture.runAsync(() -> {
            Deployment deployment = (Deployment) ((RollableScalableResource) this.internalClient.apps().deployments().withName(KubernetesUtils.getDeploymentName(this.clusterId))).get();
            if (deployment == null) {
                throw new RuntimeException("Failed to find Deployment named " + this.clusterId + " in namespace " + this.namespace);
            }
            setOwnerReference(deployment, Collections.singletonList(kubernetesPod.getInternalResource()));
            LOG.debug("Start to create pod with spec {}{}", System.lineSeparator(), KubernetesUtils.tryToGetPrettyPrintYaml(kubernetesPod.getInternalResource()));
            this.internalClient.pods().create((MixedOperation<Pod, PodList, PodResource<Pod>>) kubernetesPod.getInternalResource());
        }, this.kubeClientExecutorService);
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
    public CompletableFuture<Void> stopPod(String str) {
        return CompletableFuture.runAsync(() -> {
            ((PodResource) this.internalClient.pods().withName(str)).delete();
        }, this.kubeClientExecutorService);
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
    public Optional<Endpoint> getRestEndpoint(String str) {
        Optional<KubernetesService> restService = getRestService(str);
        if (!restService.isPresent()) {
            return Optional.empty();
        }
        Service internalResource = restService.get().getInternalResource();
        int restPortFromExternalService = getRestPortFromExternalService(internalResource);
        return ServiceType.classify(internalResource).isClusterIP() ? Optional.of(new Endpoint(ExternalServiceDecorator.getNamespacedExternalServiceName(str, this.namespace), restPortFromExternalService)) : getRestEndPointFromService(internalResource, restPortFromExternalService);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
    public List<KubernetesPod> getPodsWithLabels(Map<String, String> map) {
        List<Pod> items = ((PodList) ((FilterWatchListDeletable) this.internalClient.pods().withLabels(map)).list()).getItems();
        return (items == null || items.isEmpty()) ? new ArrayList() : (List) items.stream().map(KubernetesPod::new).collect(Collectors.toList());
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
    public void stopAndCleanupCluster(String str) {
        ((RollableScalableResource) this.internalClient.apps().deployments().withName(KubernetesUtils.getDeploymentName(str))).cascading(true).delete();
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
    public Optional<KubernetesService> getRestService(String str) {
        String externalServiceName = ExternalServiceDecorator.getExternalServiceName(str);
        Service service = (Service) ((ServiceResource) this.internalClient.services().withName(externalServiceName)).fromServer().get();
        if (service != null) {
            return Optional.of(new KubernetesService(service));
        }
        LOG.debug("Service {} does not exist", externalServiceName);
        return Optional.empty();
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
    public KubernetesWatch watchPodsAndDoCallback(Map<String, String> map, FlinkKubeClient.WatchCallbackHandler<KubernetesPod> watchCallbackHandler) throws Exception {
        return (KubernetesWatch) FutureUtils.retry(() -> {
            return CompletableFuture.supplyAsync(() -> {
                return new KubernetesWatch(((FilterWatchListDeletable) this.internalClient.pods().withLabels(map)).watch(new KubernetesPodsWatcher(watchCallbackHandler)));
            }, this.kubeClientExecutorService);
        }, this.maxRetryAttempts, th -> {
            return ExceptionUtils.findThrowable(th, KubernetesClientException.class).isPresent();
        }, this.kubeClientExecutorService).get();
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
    public KubernetesLeaderElector createLeaderElector(KubernetesLeaderElectionConfiguration kubernetesLeaderElectionConfiguration, KubernetesLeaderElector.LeaderCallbackHandler leaderCallbackHandler) {
        return new KubernetesLeaderElector(this.internalClient, kubernetesLeaderElectionConfiguration, leaderCallbackHandler);
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
    public CompletableFuture<Void> createConfigMap(KubernetesConfigMap kubernetesConfigMap) {
        String name = kubernetesConfigMap.getName();
        return CompletableFuture.runAsync(() -> {
        }, this.kubeClientExecutorService).exceptionally(th -> {
            throw new CompletionException((Throwable) new KubernetesException("Failed to create ConfigMap " + name, th));
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
    public Optional<KubernetesConfigMap> getConfigMap(String str) {
        ConfigMap configMap = (ConfigMap) ((Resource) this.internalClient.configMaps().withName(str)).get();
        return configMap == null ? Optional.empty() : Optional.of(new KubernetesConfigMap(configMap));
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
    public CompletableFuture<Boolean> checkAndUpdateConfigMap(String str, Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> function) {
        return FutureUtils.retry(() -> {
            return attemptCheckAndUpdateConfigMap(str, function);
        }, this.maxRetryAttempts, th -> {
            return ExceptionUtils.findThrowable(th, KubernetesClientException.class).isPresent();
        }, this.kubeClientExecutorService);
    }

    private CompletableFuture<Boolean> attemptCheckAndUpdateConfigMap(String str, Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> function) {
        return CompletableFuture.supplyAsync(() -> {
            KubernetesConfigMap orElseThrow = getConfigMap(str).orElseThrow(() -> {
                return new CompletionException((Throwable) new KubernetesException("Cannot retry checkAndUpdateConfigMap with configMap " + str + " because it does not exist."));
            });
            Optional optional = (Optional) function.apply(orElseThrow);
            if (!optional.isPresent()) {
                return false;
            }
            try {
                ((Resource) this.internalClient.configMaps().withName(str)).lockResourceVersion(((KubernetesConfigMap) optional.get()).getResourceVersion()).replace(((KubernetesConfigMap) optional.get()).getInternalResource());
                return true;
            } catch (Throwable th) {
                LOG.debug("Failed to update ConfigMap {} with data {}. Trying again.", orElseThrow.getName(), orElseThrow.getData());
                throw new CompletionException((Throwable) new PossibleInconsistentStateException(th));
            }
        }, this.kubeClientExecutorService);
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
    public CompletableFuture<Void> deleteConfigMapsByLabels(Map<String, String> map) {
        return CompletableFuture.runAsync(() -> {
            ((FilterWatchListDeletable) this.internalClient.configMaps().withLabels(map)).delete();
        }, this.kubeClientExecutorService);
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
    public CompletableFuture<Void> deleteConfigMap(String str) {
        return CompletableFuture.runAsync(() -> {
            ((Resource) this.internalClient.configMaps().withName(str)).delete();
        }, this.kubeClientExecutorService);
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
    public KubernetesConfigMapSharedWatcher createConfigMapSharedWatcher(Map<String, String> map) {
        return new KubernetesConfigMapSharedInformer(this.internalClient, map);
    }

    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient, java.lang.AutoCloseable
    public void close() {
        this.internalClient.close();
        ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, new ExecutorService[]{this.kubeClientExecutorService});
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.kubernetes.kubeclient.FlinkKubeClient
    public KubernetesPod loadPodFromTemplateFile(File file) {
        if (file.exists()) {
            return new KubernetesPod((Pod) ((PodResource) this.internalClient.pods().load(file)).get());
        }
        throw new FlinkRuntimeException(String.format("Pod template file %s does not exist.", file));
    }

    private void setOwnerReference(Deployment deployment, List<HasMetadata> list) {
        OwnerReference build = new OwnerReferenceBuilder().withName(deployment.getMetadata().getName()).withApiVersion(deployment.getApiVersion()).withUid(deployment.getMetadata().getUid()).withKind(deployment.getKind()).withController(true).withBlockOwnerDeletion(true).build();
        list.forEach(hasMetadata -> {
            hasMetadata.getMetadata().setOwnerReferences(Collections.singletonList(build));
        });
    }

    private int getRestPortFromExternalService(Service service) {
        List list = (List) service.getSpec().getPorts().stream().filter(servicePort -> {
            return servicePort.getName().equals(Constants.REST_PORT_NAME);
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            throw new RuntimeException("Failed to find port \"rest\" in Service \"" + ExternalServiceDecorator.getExternalServiceName(this.clusterId) + "\"");
        }
        ServicePort servicePort2 = (ServicePort) list.get(0);
        KubernetesConfigOptions.ServiceExposedType valueOf = KubernetesConfigOptions.ServiceExposedType.valueOf(service.getSpec().getType());
        switch (valueOf) {
            case ClusterIP:
            case LoadBalancer:
                return servicePort2.getPort().intValue();
            case NodePort:
                return servicePort2.getNodePort().intValue();
            default:
                throw new RuntimeException("Unrecognized Service type: " + valueOf);
        }
    }

    private Optional<Endpoint> getRestEndPointFromService(Service service, int i) {
        String str;
        if (service.getStatus() == null) {
            return Optional.empty();
        }
        LoadBalancerStatus loadBalancer = service.getStatus().getLoadBalancer();
        return loadBalancer != null ? getLoadBalancerRestEndpoint(loadBalancer, i) : (!(service.getSpec() != null && service.getSpec().getExternalIPs() != null && !service.getSpec().getExternalIPs().isEmpty()) || (str = service.getSpec().getExternalIPs().get(0)) == null || str.isEmpty()) ? Optional.empty() : Optional.of(new Endpoint(str, i));
    }

    private Optional<Endpoint> getLoadBalancerRestEndpoint(LoadBalancerStatus loadBalancerStatus, int i) {
        String str;
        if ((loadBalancerStatus.getIngress() == null || loadBalancerStatus.getIngress().isEmpty()) ? false : true) {
            str = loadBalancerStatus.getIngress().get(0).getIp();
            if (str == null || str.isEmpty()) {
                str = loadBalancerStatus.getIngress().get(0).getHostname();
            }
        } else {
            str = (String) ((NodeList) this.internalClient.nodes().list()).getItems().stream().flatMap(node -> {
                return node.getStatus().getAddresses().stream();
            }).filter(nodeAddress -> {
                return this.nodePortAddressType.name().equals(nodeAddress.getType());
            }).map((v0) -> {
                return v0.getAddress();
            }).filter(str2 -> {
                return !str2.isEmpty();
            }).findAny().orElse(null);
            if (str == null) {
                LOG.warn("Unable to find any node ip with type [{}]. Please see [{}] config option for more details.", this.nodePortAddressType, KubernetesConfigOptions.REST_SERVICE_EXPOSED_NODE_PORT_ADDRESS_TYPE.key());
            }
        }
        return str == null || str.isEmpty() ? Optional.empty() : Optional.of(new Endpoint(str, i));
    }
}
