/*
 * Decompiled with CFR 0.152.
 */
package com.floragunn.searchguard.enterprise.femt.datamigration880.service;

import com.floragunn.codova.documents.DocNode;
import com.floragunn.codova.documents.DocumentParseException;
import com.floragunn.codova.documents.Format;
import com.floragunn.fluent.collections.ImmutableList;
import com.floragunn.searchguard.enterprise.femt.datamigration880.service.DataMigrationContext;
import com.floragunn.searchguard.enterprise.femt.datamigration880.service.DataMigrationService;
import com.floragunn.searchguard.enterprise.femt.datamigration880.service.ExecutionStatus;
import com.floragunn.searchguard.enterprise.femt.datamigration880.service.IndexAlreadyExistsException;
import com.floragunn.searchguard.enterprise.femt.datamigration880.service.MigrationConfig;
import com.floragunn.searchguard.enterprise.femt.datamigration880.service.MigrationExecutionSummary;
import com.floragunn.searchguard.enterprise.femt.datamigration880.service.MigrationStateRepository;
import com.floragunn.searchguard.enterprise.femt.datamigration880.service.MigrationStep;
import com.floragunn.searchguard.enterprise.femt.datamigration880.service.OptimisticLock;
import com.floragunn.searchguard.enterprise.femt.datamigration880.service.OptimisticLockException;
import com.floragunn.searchguard.enterprise.femt.datamigration880.service.StepExecutionStatus;
import com.floragunn.searchguard.enterprise.femt.datamigration880.service.StepExecutionSummary;
import com.floragunn.searchguard.enterprise.femt.datamigration880.service.StepResult;
import com.floragunn.searchguard.enterprise.femt.datamigration880.service.persistence.IndexMigrationStateRepository;
import com.floragunn.searchguard.enterprise.femt.datamigration880.service.steps.StepsFactory;
import com.floragunn.searchguard.support.PrivilegedConfigClient;
import com.floragunn.searchguard.test.helper.cluster.LocalCluster;
import com.floragunn.searchsupport.action.StandardResponse;
import com.floragunn.searchsupport.junit.matcher.DocNodeMatchers;
import java.time.Clock;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.awaitility.Awaitility;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.client.internal.Client;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(value=MockitoJUnitRunner.class)
public class DataMigrationServiceLockingTest {
    private static final Logger log = LogManager.getLogger(DataMigrationServiceLockingTest.class);
    private static final ZonedDateTime NOW = ZonedDateTime.of(LocalDateTime.of(2010, 1, 1, 12, 1), ZoneOffset.UTC);
    public static final MigrationConfig STRICT_CONFIG = new MigrationConfig(false);
    private ExecutorService executor;
    @ClassRule
    public static LocalCluster.Embedded cluster = new LocalCluster.Builder().sslEnabled().resources("multitenancy").enterpriseModulesEnabled().embedded().build();
    @Mock
    private StepsFactory stepFactory1;
    @Mock
    private StepsFactory stepFactory2;
    private MigrationStateRepository repository;

    @Before
    public void before() {
        PrivilegedConfigClient client = PrivilegedConfigClient.adapt((Client)cluster.getInternalNodeClient());
        this.repository = new IndexMigrationStateRepository(client);
        this.executor = Executors.newFixedThreadPool(1);
        if (this.repository.isIndexCreated()) {
            client.admin().indices().delete(new DeleteIndexRequest(".sg_data_migration_state")).actionGet();
        }
    }

    @After
    public void after() {
        this.executor.shutdown();
    }

    @Test(timeout=10000L)
    public void shouldExecuteSingleMigration() {
        Mockito.when((Object)this.stepFactory1.createSteps()).thenReturn((Object)ImmutableList.of((Object)new WaitStep().enoughWaiting()));
        DataMigrationService migrationService = new DataMigrationService(this.repository, this.stepFactory1, Clock.systemUTC());
        StandardResponse response = migrationService.migrateData(STRICT_CONFIG);
        MatcherAssert.assertThat((Object)response.getStatus(), (Matcher)Matchers.equalTo((Object)200));
    }

    @Test(timeout=10000L)
    public void shouldNotAttemptToRerunMigrationTooEarly() throws DocumentParseException {
        Clock nowClock = Clock.fixed(NOW.toInstant(), ZoneOffset.UTC);
        Clock pastClock = Clock.offset(nowClock, Duration.ofSeconds(-1L));
        DataMigrationService migrationService1 = new DataMigrationService(this.repository, this.stepFactory1, pastClock);
        DataMigrationService migrationService2 = new DataMigrationService(this.repository, this.stepFactory2, nowClock);
        SyncStep syncStep = new SyncStep();
        WaitStep waitStep = new WaitStep();
        Mockito.when((Object)this.stepFactory1.createSteps()).thenReturn((Object)ImmutableList.of((Object)syncStep, (Object)waitStep));
        this.executor.submit(() -> migrationService1.migrateData(STRICT_CONFIG));
        syncStep.waitUntilStepExecution();
        StandardResponse response = migrationService2.migrateData(STRICT_CONFIG);
        String jsonString = response.toJsonString();
        log.debug("Status and body from the second data migration run in parallel: '{}', '{}'", (Object)response.getStatus(), (Object)jsonString);
        MatcherAssert.assertThat((Object)response.getStatus(), (Matcher)Matchers.equalTo((Object)400));
        DocNode responseBody = DocNode.parse((Format)Format.JSON).from(jsonString);
        MatcherAssert.assertThat((Object)responseBody, (Matcher)DocNodeMatchers.containsValue((String)"$.data.status", (Object)"failure"));
        MatcherAssert.assertThat((Object)responseBody, (Matcher)DocNodeMatchers.containsValue((String)"$.data.stages[0].status", (Object)"migration_already_in_progress_error"));
        waitStep.enoughWaiting();
        Awaitility.await().until(() -> ((MigrationExecutionSummary)this.repository.findById("migration_8_8_0").orElseThrow()).status(), Matchers.equalTo((Object)ExecutionStatus.SUCCESS));
    }

    @Test
    public void shouldBreakMigrationIfAnotherMigrationProcessCreatedIndexInParrrarel() throws DocumentParseException {
        RepositoryWrapper migrationStateRepository = new RepositoryWrapper(this.repository){

            @Override
            public void createIndex() throws IndexAlreadyExistsException {
                this.wrapped.createIndex();
                this.wrapped.createIndex();
            }
        };
        Clock fixed = Clock.fixed(NOW.toInstant(), ZoneOffset.UTC);
        DataMigrationService migrationService = new DataMigrationService((MigrationStateRepository)migrationStateRepository, this.stepFactory1, fixed);
        StandardResponse response = migrationService.migrateData(STRICT_CONFIG);
        log.debug("Parallel index creation caused response '{}'.", (Object)response.toJsonString());
        MatcherAssert.assertThat((Object)response.getStatus(), (Matcher)Matchers.equalTo((Object)409));
        DocNode responseBody = DocNode.parse((Format)Format.JSON).from(response.toJsonString());
        MatcherAssert.assertThat((Object)responseBody, (Matcher)DocNodeMatchers.containsValue((String)"$.data.status", (Object)"failure"));
        MatcherAssert.assertThat((Object)responseBody, (Matcher)DocNodeMatchers.containsValue((String)"$.data.stages[0].status", (Object)"status_index_already_exists_error"));
    }

    @Test
    public void shouldBreakInCaseOfParallelMigrationStatusDocumentCreation() throws DocumentParseException {
        RepositoryWrapper migrationStateRepository = new RepositoryWrapper(this.repository){

            @Override
            public void create(String migrationId, MigrationExecutionSummary summary) throws OptimisticLockException {
                MigrationExecutionSummary migrationSummary = this.createMigrationSummary();
                this.wrapped.create(migrationId, migrationSummary);
                this.wrapped.create(migrationId, summary);
            }

            private MigrationExecutionSummary createMigrationSummary() {
                LocalDateTime localDateTime = NOW.toLocalDateTime();
                StepExecutionSummary stepSummary = new StepExecutionSummary(0L, localDateTime, "created by another process", StepExecutionStatus.OK, "I am race condition winner!");
                ImmutableList stages = ImmutableList.of((Object)stepSummary);
                return new MigrationExecutionSummary(localDateTime, ExecutionStatus.IN_PROGRESS, null, null, stages, null);
            }
        };
        Clock fixed = Clock.fixed(NOW.toInstant(), ZoneOffset.UTC);
        DataMigrationService migrationService = new DataMigrationService((MigrationStateRepository)migrationStateRepository, this.stepFactory1, fixed);
        StandardResponse response = migrationService.migrateData(STRICT_CONFIG);
        log.debug("Parallel index creation caused response '{}'.", (Object)response.toJsonString());
        MatcherAssert.assertThat((Object)response.getStatus(), (Matcher)Matchers.equalTo((Object)412));
        DocNode responseBody = DocNode.parse((Format)Format.JSON).from(response.toJsonString());
        MatcherAssert.assertThat((Object)responseBody, (Matcher)DocNodeMatchers.containsValue((String)"$.data.status", (Object)"failure"));
        MatcherAssert.assertThat((Object)responseBody, (Matcher)DocNodeMatchers.containsValue((String)"$.data.stages[0].status", (Object)"cannot_create_status_document_error"));
    }

    @Test
    public void shouldBreakMigrationIfOptimisticLockFailsDuringMigrationStatusUpdate() throws DocumentParseException {
        this.repository.createIndex();
        LocalDateTime now = NOW.toLocalDateTime();
        StepExecutionSummary stepSummary = new StepExecutionSummary(0L, now, "precondition check", StepExecutionStatus.UNEXPECTED_ERROR, "Sth. went wrong.");
        ImmutableList stages = ImmutableList.of((Object)stepSummary);
        this.repository.upsert("migration_8_8_0", new MigrationExecutionSummary(now, ExecutionStatus.FAILURE, null, null, stages));
        RepositoryWrapper migrationStateRepository = new RepositoryWrapper(this.repository){

            @Override
            public void updateWithLock(String id, MigrationExecutionSummary dataMigrationSummary, OptimisticLock lock) throws OptimisticLockException {
                this.wrapped.upsert(id, this.createMigrationSummary());
                this.wrapped.updateWithLock(id, dataMigrationSummary, lock);
            }

            private MigrationExecutionSummary createMigrationSummary() {
                LocalDateTime localDateTime = NOW.toLocalDateTime();
                StepExecutionSummary stepSummary = new StepExecutionSummary(0L, localDateTime, "created by another process", StepExecutionStatus.OK, "Optimistic lock broken");
                ImmutableList stages = ImmutableList.of((Object)stepSummary);
                return new MigrationExecutionSummary(localDateTime, ExecutionStatus.IN_PROGRESS, null, null, stages, null);
            }
        };
        Clock fixed = Clock.fixed(NOW.toInstant(), ZoneOffset.UTC);
        DataMigrationService migrationService = new DataMigrationService((MigrationStateRepository)migrationStateRepository, this.stepFactory1, fixed);
        StandardResponse response = migrationService.migrateData(STRICT_CONFIG);
        log.debug("Optimistic lock failure response '{}'.", (Object)response.toJsonString());
        MatcherAssert.assertThat((Object)response.getStatus(), (Matcher)Matchers.equalTo((Object)409));
        DocNode responseBody = DocNode.parse((Format)Format.JSON).from(response.toJsonString());
        MatcherAssert.assertThat((Object)responseBody, (Matcher)DocNodeMatchers.containsValue((String)"$.data.status", (Object)"failure"));
        MatcherAssert.assertThat((Object)responseBody, (Matcher)DocNodeMatchers.containsValue((String)"$.data.stages[0].status", (Object)"cannot_update_status_document_lock_error"));
    }

    private static class WaitStep
    implements MigrationStep {
        private final CountDownLatch countDownLatch = new CountDownLatch(1);

        public StepResult execute(DataMigrationContext dataMigrationContext) {
            log.debug("Start waiting in migration '{}'", (Object)dataMigrationContext.getMigrationId());
            try {
                if (!this.countDownLatch.await(1L, TimeUnit.MINUTES)) {
                    String message = "Cannot wait so long, probably invocation of method enoughWaiting is missing";
                    log.error(message);
                    throw new RuntimeException(message);
                }
                log.debug("Finish waiting in migration '{}'", (Object)dataMigrationContext.getMigrationId());
                return new StepResult(StepExecutionStatus.OK, "Waited so long...");
            }
            catch (InterruptedException e) {
                String message = "Unexpected interruption of waiting";
                log.error(message);
                throw new RuntimeException(message, e);
            }
        }

        public WaitStep enoughWaiting() {
            log.debug("Step execution will be resumed.");
            this.countDownLatch.countDown();
            return this;
        }

        public String name() {
            return "just wait";
        }
    }

    private static class SyncStep
    implements MigrationStep {
        private final CountDownLatch countDownLatch = new CountDownLatch(1);

        public StepResult execute(DataMigrationContext dataMigrationContext) {
            this.countDownLatch.countDown();
            return new StepResult(StepExecutionStatus.OK, "Synchronized on countDownLatch");
        }

        public String name() {
            return "external sync step";
        }

        public void waitUntilStepExecution() {
            try {
                this.countDownLatch.await(1L, TimeUnit.MINUTES);
            }
            catch (InterruptedException e) {
                String message = "Cannot wait so long for SyncStep execution";
                log.error(message);
                throw new RuntimeException(message, e);
            }
        }
    }

    private static class RepositoryWrapper
    implements MigrationStateRepository {
        protected final MigrationStateRepository wrapped;

        public RepositoryWrapper(MigrationStateRepository wrapped) {
            this.wrapped = Objects.requireNonNull(wrapped, "Repository is required");
        }

        public void upsert(String id, MigrationExecutionSummary migrationExecutionSummary) {
            this.wrapped.upsert(id, migrationExecutionSummary);
        }

        public void updateWithLock(String id, MigrationExecutionSummary migrationExecutionSummary, OptimisticLock lock) throws OptimisticLockException {
            this.wrapped.updateWithLock(id, migrationExecutionSummary, lock);
        }

        public boolean isIndexCreated() {
            return this.wrapped.isIndexCreated();
        }

        public void createIndex() throws IndexAlreadyExistsException {
            this.wrapped.createIndex();
        }

        public Optional<MigrationExecutionSummary> findById(String id) {
            return this.wrapped.findById(id);
        }

        public void create(String migrationId, MigrationExecutionSummary summary) throws OptimisticLockException {
            this.wrapped.create(migrationId, summary);
        }
    }
}

