package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscription.class */
class LettuceReactiveSubscription implements ReactiveSubscription {
    private final LettuceByteBufferPubSubListenerWrapper listener;
    private final StatefulRedisPubSubConnection<ByteBuffer, ByteBuffer> connection;
    private final RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer> commands;
    private final State patternState;
    private final State channelState;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/data/redis/connection/lettuce/LettuceReactiveSubscription$State.class */
    public static class State {
        private final Set<ByteBuffer> targets = new ConcurrentSkipListSet();
        private final AtomicLong subscribers = new AtomicLong();
        private final AtomicReference<Flux<?>> flux = new AtomicReference<>();
        private final Function<Throwable, Throwable> exceptionTranslator;

        @Nullable
        private volatile Disposable disposable;

        State(Function<Throwable, Throwable> function) {
            this.exceptionTranslator = function;
        }

        Mono<Void> subscribe(ByteBuffer[] byteBufferArr, Function<ByteBuffer[], Mono<Void>> function) {
            return function.apply(byteBufferArr).doOnSuccess(r5 -> {
                this.targets.addAll(Arrays.asList(byteBufferArr));
            }).onErrorMap(this.exceptionTranslator);
        }

        Mono<Void> unsubscribe(ByteBuffer[] byteBufferArr, Function<ByteBuffer[], Mono<Void>> function) {
            return Mono.defer(() -> {
                List asList = Arrays.asList(byteBufferArr);
                return ((Mono) function.apply(byteBufferArr)).doOnSuccess(r5 -> {
                    this.targets.removeAll(asList);
                }).onErrorMap(this.exceptionTranslator);
            });
        }

        Set<ByteBuffer> getTargets() {
            return Collections.unmodifiableSet(this.targets);
        }

        /* JADX WARN: Multi-variable type inference failed */
        <T> Flux<T> receive(Supplier<Flux<T>> supplier) {
            Flux<?> flux = this.flux.get();
            if (flux != null) {
                return flux;
            }
            ConnectableFlux publish = supplier.get().onErrorMap(this.exceptionTranslator).publish();
            Flux<T> doFinally = publish.doOnSubscribe(subscription -> {
                if (this.subscribers.incrementAndGet() == 1) {
                    this.disposable = publish.connect();
                }
            }).doFinally(signalType -> {
                if (this.subscribers.decrementAndGet() == 0) {
                    this.flux.compareAndSet(publish, null);
                    terminate();
                }
            });
            return this.flux.compareAndSet(null, doFinally) ? doFinally : this.flux.get();
        }

        void terminate() {
            this.flux.set(null);
            Disposable disposable = this.disposable;
            if (disposable == null || disposable.isDisposed()) {
                return;
            }
            disposable.dispose();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LettuceReactiveSubscription(SubscriptionListener subscriptionListener, StatefulRedisPubSubConnection<ByteBuffer, ByteBuffer> statefulRedisPubSubConnection, Function<Throwable, Throwable> function) {
        this.listener = new LettuceByteBufferPubSubListenerWrapper(new LettuceMessageListener((message, bArr) -> {
        }, subscriptionListener));
        this.connection = statefulRedisPubSubConnection;
        this.commands = statefulRedisPubSubConnection.reactive();
        statefulRedisPubSubConnection.addListener(this.listener);
        this.patternState = new State(function);
        this.channelState = new State(function);
    }

    @Override // org.springframework.data.redis.connection.ReactiveSubscription
    public Mono<Void> subscribe(ByteBuffer... byteBufferArr) {
        Assert.notNull(byteBufferArr, "Channels must not be null!");
        Assert.noNullElements(byteBufferArr, "Channels must not contain null elements!");
        State state = this.channelState;
        RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer> redisPubSubReactiveCommands = this.commands;
        Objects.requireNonNull(redisPubSubReactiveCommands);
        return state.subscribe(byteBufferArr, (v1) -> {
            return r2.subscribe(v1);
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveSubscription
    public Mono<Void> pSubscribe(ByteBuffer... byteBufferArr) {
        Assert.notNull(byteBufferArr, "Patterns must not be null!");
        Assert.noNullElements(byteBufferArr, "Patterns must not contain null elements!");
        State state = this.patternState;
        RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer> redisPubSubReactiveCommands = this.commands;
        Objects.requireNonNull(redisPubSubReactiveCommands);
        return state.subscribe(byteBufferArr, (v1) -> {
            return r2.psubscribe(v1);
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveSubscription
    public Mono<Void> unsubscribe() {
        return unsubscribe((ByteBuffer[]) this.channelState.getTargets().toArray(new ByteBuffer[this.channelState.getTargets().size()]));
    }

    @Override // org.springframework.data.redis.connection.ReactiveSubscription
    public Mono<Void> unsubscribe(ByteBuffer... byteBufferArr) {
        Assert.notNull(byteBufferArr, "Channels must not be null!");
        Assert.noNullElements(byteBufferArr, "Channels must not contain null elements!");
        if (ObjectUtils.isEmpty(byteBufferArr)) {
            return Mono.empty();
        }
        State state = this.channelState;
        RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer> redisPubSubReactiveCommands = this.commands;
        Objects.requireNonNull(redisPubSubReactiveCommands);
        return state.unsubscribe(byteBufferArr, (v1) -> {
            return r2.unsubscribe(v1);
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveSubscription
    public Mono<Void> pUnsubscribe() {
        return pUnsubscribe((ByteBuffer[]) this.patternState.getTargets().toArray(new ByteBuffer[this.patternState.getTargets().size()]));
    }

    @Override // org.springframework.data.redis.connection.ReactiveSubscription
    public Mono<Void> pUnsubscribe(ByteBuffer... byteBufferArr) {
        Assert.notNull(byteBufferArr, "Patterns must not be null!");
        Assert.noNullElements(byteBufferArr, "Patterns must not contain null elements!");
        if (ObjectUtils.isEmpty(byteBufferArr)) {
            return Mono.empty();
        }
        State state = this.patternState;
        RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer> redisPubSubReactiveCommands = this.commands;
        Objects.requireNonNull(redisPubSubReactiveCommands);
        return state.unsubscribe(byteBufferArr, (v1) -> {
            return r2.punsubscribe(v1);
        });
    }

    @Override // org.springframework.data.redis.connection.ReactiveSubscription
    public Set<ByteBuffer> getChannels() {
        return this.channelState.getTargets();
    }

    @Override // org.springframework.data.redis.connection.ReactiveSubscription
    public Set<ByteBuffer> getPatterns() {
        return this.patternState.getTargets();
    }

    @Override // org.springframework.data.redis.connection.ReactiveSubscription
    public Flux<ReactiveSubscription.Message<ByteBuffer, ByteBuffer>> receive() {
        return this.channelState.receive(() -> {
            return this.commands.observeChannels().filter(channelMessage -> {
                return this.channelState.getTargets().contains(channelMessage.getChannel());
            }).map(channelMessage2 -> {
                return new ReactiveSubscription.ChannelMessage((ByteBuffer) channelMessage2.getChannel(), (ByteBuffer) channelMessage2.getMessage());
            });
        }).mergeWith(this.patternState.receive(() -> {
            return this.commands.observePatterns().filter(patternMessage -> {
                return this.patternState.getTargets().contains(patternMessage.getPattern());
            }).map(patternMessage2 -> {
                return new ReactiveSubscription.PatternMessage((ByteBuffer) patternMessage2.getPattern(), (ByteBuffer) patternMessage2.getChannel(), (ByteBuffer) patternMessage2.getMessage());
            });
        }));
    }

    @Override // org.springframework.data.redis.connection.ReactiveSubscription
    public Mono<Void> cancel() {
        return unsubscribe().then(pUnsubscribe()).then(Mono.defer(() -> {
            this.channelState.terminate();
            this.patternState.terminate();
            return this.commands.ping().then(Mono.fromRunnable(() -> {
                this.connection.removeListener(this.listener);
            }));
        }));
    }
}
