package com.alibaba.nacos.istio.mcp;

import com.alibaba.nacos.istio.api.ApiConstants;
import com.alibaba.nacos.istio.api.ApiGeneratorFactory;
import com.alibaba.nacos.istio.common.AbstractConnection;
import com.alibaba.nacos.istio.common.Event;
import com.alibaba.nacos.istio.common.NacosResourceManager;
import com.alibaba.nacos.istio.common.ResourceSnapshot;
import com.alibaba.nacos.istio.common.WatchedStatus;
import com.alibaba.nacos.istio.misc.Loggers;
import com.alibaba.nacos.istio.util.NonceGenerator;
import io.grpc.stub.StreamObserver;
import istio.mcp.v1alpha1.Mcp;
import istio.mcp.v1alpha1.ResourceSourceGrpc;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:com/alibaba/nacos/istio/mcp/NacosMcpService.class */
public class NacosMcpService extends ResourceSourceGrpc.ResourceSourceImplBase {
    private final Map<String, AbstractConnection<Mcp.Resources>> connections = new ConcurrentHashMap(16);

    @Autowired
    ApiGeneratorFactory apiGeneratorFactory;

    @Autowired
    NacosResourceManager resourceManager;

    public boolean hasClientConnection() {
        return this.connections.size() != 0;
    }

    @Override // istio.mcp.v1alpha1.ResourceSourceGrpc.ResourceSourceImplBase
    public StreamObserver<Mcp.RequestResources> establishResourceStream(final StreamObserver<Mcp.Resources> streamObserver) {
        this.resourceManager.initResourceSnapshot();
        final McpConnection mcpConnection = new McpConnection(streamObserver);
        return new StreamObserver<Mcp.RequestResources>() { // from class: com.alibaba.nacos.istio.mcp.NacosMcpService.1
            private boolean initRequest = true;

            public void onNext(Mcp.RequestResources requestResources) {
                if (this.initRequest) {
                    mcpConnection.setConnectionId(requestResources.getSinkNode().getId());
                    NacosMcpService.this.connections.put(mcpConnection.getConnectionId(), mcpConnection);
                    this.initRequest = false;
                }
                NacosMcpService.this.process(requestResources, mcpConnection);
            }

            public void onError(Throwable th) {
                Loggers.MAIN.error("mcp: {} stream error.", mcpConnection.getConnectionId(), th);
                clear();
            }

            public void onCompleted() {
                streamObserver.onCompleted();
                clear();
            }

            private void clear() {
                NacosMcpService.this.connections.remove(mcpConnection.getConnectionId());
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void process(Mcp.RequestResources requestResources, AbstractConnection<Mcp.Resources> abstractConnection) {
        if (shouldPush(requestResources, abstractConnection)) {
            abstractConnection.push(buildMcpResourcesResponse(requestResources.getCollection(), this.resourceManager.getResourceSnapshot()), abstractConnection.getWatchedStatusByType(requestResources.getCollection()));
        }
    }

    private boolean shouldPush(Mcp.RequestResources requestResources, AbstractConnection<Mcp.Resources> abstractConnection) {
        String collection = requestResources.getCollection();
        String connectionId = abstractConnection.getConnectionId();
        if (requestResources.getErrorDetail().getCode() != 0) {
            Loggers.MAIN.error("mcp: ACK error, connection-id: {}, code: {}, message: {}", new Object[]{connectionId, Integer.valueOf(requestResources.getErrorDetail().getCode()), requestResources.getErrorDetail().getMessage()});
            return false;
        }
        if (requestResources.getResponseNonce().isEmpty()) {
            Loggers.MAIN.info("mcp: init request, type {}, connection-id {}, is incremental {}", new Object[]{collection, connectionId, Boolean.valueOf(requestResources.getIncremental())});
            WatchedStatus watchedStatus = new WatchedStatus();
            watchedStatus.setType(collection);
            abstractConnection.addWatchedResource(collection, watchedStatus);
            return true;
        }
        WatchedStatus watchedStatusByType = abstractConnection.getWatchedStatusByType(collection);
        if (watchedStatusByType == null) {
            Loggers.MAIN.info("mcp: reconnect, type {}, connection-id {}, is incremental {}", new Object[]{collection, connectionId, Boolean.valueOf(requestResources.getIncremental())});
            WatchedStatus watchedStatus2 = new WatchedStatus();
            watchedStatus2.setType(collection);
            abstractConnection.addWatchedResource(collection, watchedStatus2);
            return true;
        }
        if (!watchedStatusByType.getLatestNonce().equals(requestResources.getResponseNonce())) {
            Loggers.MAIN.warn("mcp: request dis match, type {}, connection-id {}", collection, connectionId);
            return false;
        }
        watchedStatusByType.setAckedNonce(requestResources.getResponseNonce());
        Loggers.MAIN.info("mcp: ack, type {}, connection-id {}, nonce {}", new Object[]{collection, connectionId, requestResources.getResponseNonce()});
        return false;
    }

    public void handleEvent(ResourceSnapshot resourceSnapshot, Event event) {
        switch (event.getType()) {
            case Service:
                if (this.connections.size() == 0) {
                    return;
                }
                Loggers.MAIN.info("xds: event {} trigger push.", event.getType());
                Mcp.Resources buildMcpResourcesResponse = buildMcpResourcesResponse(ApiConstants.SERVICE_ENTRY_COLLECTION, resourceSnapshot);
                for (AbstractConnection<Mcp.Resources> abstractConnection : this.connections.values()) {
                    WatchedStatus watchedStatusByType = abstractConnection.getWatchedStatusByType(ApiConstants.SERVICE_ENTRY_COLLECTION);
                    if (watchedStatusByType != null) {
                        abstractConnection.push(buildMcpResourcesResponse, watchedStatusByType);
                    }
                }
                return;
            default:
                Loggers.MAIN.warn("Invalid event {}, ignore it.", event.getType());
                return;
        }
    }

    private Mcp.Resources buildMcpResourcesResponse(String str, ResourceSnapshot resourceSnapshot) {
        List<?> generate = this.apiGeneratorFactory.getApiGenerator(str).generate(resourceSnapshot);
        return Mcp.Resources.newBuilder().setCollection(str).addAllResources(generate).setSystemVersionInfo(resourceSnapshot.getVersion()).setNonce(NonceGenerator.generateNonce()).m347build();
    }
}
