package org.apache.flink.streaming.api.runners.python.beam;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.graph.TimerReference;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.Constants;
import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
import org.apache.flink.python.metric.process.FlinkMetricContainer;
import org.apache.flink.python.util.ProtoUtils;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.streaming.api.operators.python.process.timer.TimerRegistration;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/runners/python/beam/BeamDataStreamPythonFunctionRunner.class */
public class BeamDataStreamPythonFunctionRunner extends BeamPythonFunctionRunner {
    private static final String TRANSFORM_ID_PREFIX = "transform-";
    private static final String COLLECTION_PREFIX = "collection-";
    private static final String CODER_PREFIX = "coder-";

    @Nullable
    private final FlinkFnApi.CoderInfoDescriptor timerCoderDescriptor;
    private final String headOperatorFunctionUrn;
    private final List<FlinkFnApi.UserDefinedDataStreamFunction> userDefinedDataStreamFunctions;

    public BeamDataStreamPythonFunctionRunner(String str, ProcessPythonEnvironmentManager processPythonEnvironmentManager, String str2, List<FlinkFnApi.UserDefinedDataStreamFunction> list, @Nullable FlinkMetricContainer flinkMetricContainer, @Nullable KeyedStateBackend<?> keyedStateBackend, @Nullable OperatorStateBackend operatorStateBackend, @Nullable TypeSerializer<?> typeSerializer, @Nullable TypeSerializer<?> typeSerializer2, @Nullable TimerRegistration timerRegistration, MemoryManager memoryManager, double d, FlinkFnApi.CoderInfoDescriptor coderInfoDescriptor, FlinkFnApi.CoderInfoDescriptor coderInfoDescriptor2, @Nullable FlinkFnApi.CoderInfoDescriptor coderInfoDescriptor3, Map<String, FlinkFnApi.CoderInfoDescriptor> map) {
        super(str, processPythonEnvironmentManager, flinkMetricContainer, keyedStateBackend, operatorStateBackend, typeSerializer, typeSerializer2, timerRegistration, memoryManager, d, coderInfoDescriptor, coderInfoDescriptor2, map);
        this.headOperatorFunctionUrn = (String) Preconditions.checkNotNull(str2);
        Preconditions.checkArgument(list != null && list.size() >= 1);
        this.userDefinedDataStreamFunctions = list;
        this.timerCoderDescriptor = coderInfoDescriptor3;
    }

    @Override // org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner
    protected void buildTransforms(RunnerApi.Components.Builder builder) {
        for (int i = 0; i < this.userDefinedDataStreamFunctions.size(); i++) {
            Map<String, String> hashMap = new HashMap<>();
            if (i == this.userDefinedDataStreamFunctions.size() - 1) {
                for (Map.Entry<String, FlinkFnApi.CoderInfoDescriptor> entry : this.sideOutputCoderDescriptors.entrySet()) {
                    String str = "collection-revise-" + entry.getKey();
                    String str2 = "coder-revise-" + entry.getKey();
                    hashMap.put(entry.getKey(), str);
                    addCollectionToComponents(builder, str, str2);
                }
            }
            String str3 = COLLECTION_PREFIX + i;
            String str4 = CODER_PREFIX + i;
            hashMap.put("", str3);
            addCollectionToComponents(builder, str3, str4);
            String str5 = TRANSFORM_ID_PREFIX + i;
            FlinkFnApi.UserDefinedDataStreamFunction userDefinedDataStreamFunction = this.userDefinedDataStreamFunctions.get(i);
            if (i == 0) {
                addTransformToComponents(builder, str5, createUdfPayload(userDefinedDataStreamFunction, this.headOperatorFunctionUrn, true), "input", hashMap);
            } else {
                addTransformToComponents(builder, str5, createUdfPayload(userDefinedDataStreamFunction, Constants.STATELESS_FUNCTION_URN, false), COLLECTION_PREFIX + (i - 1), hashMap);
            }
        }
        for (Map.Entry<String, FlinkFnApi.CoderInfoDescriptor> entry2 : this.sideOutputCoderDescriptors.entrySet()) {
            addTransformToComponents(builder, "transform-revise-" + entry2.getKey(), createRevisePayload(), "collection-revise-" + entry2.getKey(), Collections.singletonMap("", entry2.getKey()));
        }
        addTransformToComponents(builder, "transform-revise", createRevisePayload(), COLLECTION_PREFIX + (this.userDefinedDataStreamFunctions.size() - 1), Collections.singletonMap("", ""));
    }

    private RunnerApi.ParDoPayload createRevisePayload() {
        return RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.FunctionSpec.newBuilder().setUrn(Constants.STATELESS_FUNCTION_URN).setPayload(ByteString.copyFrom(ProtoUtils.createReviseOutputDataStreamFunctionProto().toByteArray())).build()).build();
    }

    private RunnerApi.ParDoPayload createUdfPayload(FlinkFnApi.UserDefinedDataStreamFunction userDefinedDataStreamFunction, String str, boolean z) {
        RunnerApi.ParDoPayload.Builder doFn = RunnerApi.ParDoPayload.newBuilder().setDoFn(RunnerApi.FunctionSpec.newBuilder().setUrn(str).setPayload(ByteString.copyFrom(userDefinedDataStreamFunction.toByteArray())).build());
        if (z && this.timerCoderDescriptor != null) {
            doFn.putTimerFamilySpecs("timer", RunnerApi.TimerFamilySpec.newBuilder().setTimeDomain(RunnerApi.TimeDomain.Enum.EVENT_TIME).setTimerFamilyCoderId(Constants.WRAPPER_TIMER_CODER_ID).build());
        }
        return doFn.build();
    }

    private void addTransformToComponents(RunnerApi.Components.Builder builder, String str, RunnerApi.ParDoPayload parDoPayload, String str2, Map<String, String> map) {
        RunnerApi.PTransform.Builder spec = RunnerApi.PTransform.newBuilder().setUniqueName(str).setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(BeamUrns.getUrn(RunnerApi.StandardPTransforms.Primitives.PAR_DO)).setPayload(parDoPayload.toByteString()).build());
        spec.putInputs("input", str2);
        spec.putAllOutputs(map);
        builder.putTransforms(str, spec.build());
    }

    private void addCollectionToComponents(RunnerApi.Components.Builder builder, String str, String str2) {
        builder.putPcollections(str, RunnerApi.PCollection.newBuilder().setWindowingStrategyId(Constants.WINDOW_STRATEGY).setCoderId(str2).build()).putCoders(str2, ProtoUtils.createCoderProto(this.inputCoderDescriptor));
    }

    @Override // org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner
    protected List<TimerReference> getTimers(RunnerApi.Components components) {
        return this.timerCoderDescriptor != null ? Collections.singletonList(TimerReference.fromTimerId(RunnerApi.ExecutableStagePayload.TimerId.newBuilder().setTransformId("transform-0").setLocalName("timer").build(), components)) : Collections.emptyList();
    }

    @Override // org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner
    protected Optional<RunnerApi.Coder> getOptionalTimerCoderProto() {
        return this.timerCoderDescriptor != null ? Optional.of(ProtoUtils.createCoderProto(this.timerCoderDescriptor)) : Optional.empty();
    }
}
