/*
 * Decompiled with CFR 0.152.
 */
package com.floragunn.signals;

import com.floragunn.searchguard.test.GenericRestClient;
import com.floragunn.searchguard.test.helper.cluster.ClusterConfiguration;
import com.floragunn.searchguard.test.helper.cluster.LocalCluster;
import com.floragunn.searchsupport.jobs.core.IndexJobStateStore;
import com.floragunn.searchsupport.junit.AsyncAssert;
import com.floragunn.searchsupport.junit.LoggingTestWatcher;
import com.floragunn.signals.SignalsModule;
import com.floragunn.signals.watch.Watch;
import com.floragunn.signals.watch.WatchBuilder;
import java.time.Duration;
import net.jcip.annotations.NotThreadSafe;
import org.apache.http.Header;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.xcontent.XContentType;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;

@NotThreadSafe
public class SignalsStressTests {
    private static final Logger log = LogManager.getLogger(SignalsStressTests.class);
    @Rule
    public LoggingTestWatcher loggingTestWatcher = new LoggingTestWatcher();
    @ClassRule
    public static LocalCluster.Embedded cluster;

    @BeforeClass
    public static void setupTestData() {
        Client client = cluster.getInternalNodeClient();
        client.index(new IndexRequest("testsource").source(XContentType.JSON, new Object[]{"key1", "val1", "key2", "val2"})).actionGet();
        client.index(((IndexRequest)new IndexRequest("testsource").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, new Object[]{"a", "x", "b", "y"})).actionGet();
        client.index(((IndexRequest)new IndexRequest("testsource").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, new Object[]{"a", "xx", "b", "yy"})).actionGet();
    }

    @Test
    public void failoverOnClusterChangeTest() throws Exception {
        String ackedAt;
        String watchRunsOnNode;
        String tenant = "_main";
        String watchId = "put_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient("uhura", "uhura", new Header[0]);){
            Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink_put_watch").name("testsink").build();
            restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            AsyncAssert.awaitAssert((String)"Watch did not get assigned a node", () -> {
                try {
                    String node = restClient.get(watchPath + "/_state", new Header[0]).getBodyAsDocNode().getAsString("node");
                    return node != null && node.length() > 0 && !node.equals("null");
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, (Duration)Duration.ofSeconds(10L));
            watchRunsOnNode = restClient.get(watchPath + "/_state", new Header[0]).getBodyAsDocNode().getAsString("node");
            log.info("Watch runs on node " + watchRunsOnNode);
            Thread.sleep(500L);
            Assert.assertEquals((long)200L, (long)restClient.put(watchPath + "/_ack").getStatusCode());
            AsyncAssert.awaitAssert((String)"Watch state contains acked date", () -> {
                try {
                    String acked = restClient.get(watchPath + "/_state", new Header[0]).getBodyAsDocNode().get("actions", new String[]{"testsink", "acked", "on"}).toString();
                    return acked != null && acked.length() > 0 && !acked.equals("null");
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, (Duration)Duration.ofSeconds(10L));
            ackedAt = restClient.get(watchPath + "/_state", new Header[0]).getBodyAsDocNode().getAsNode("actions", new String[]{"testsearch", "acked"}).getOrDefault((Object)"on", (Object)"").toString();
        }
        Thread.sleep(2000L);
        log.warn("Stopping node " + watchRunsOnNode + "; watch must now find a new home.");
        cluster.getNodeByName(watchRunsOnNode).stop();
        Thread.sleep(500L);
        restClient = cluster.getRestClient("uhura", "uhura", new Header[0]);
        try {
            AsyncAssert.awaitAssert((String)"Watch got assigned a different node", () -> {
                try {
                    String node = restClient.get(watchPath + "/_state", new Header[0]).getBodyAsDocNode().getAsString("node");
                    return node != null && node.length() > 0 && !node.equals("null") && !node.equals(watchRunsOnNode);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, (Duration)Duration.ofSeconds(80L));
            GenericRestClient.HttpResponse response = restClient.get(watchPath + "/_state", new Header[0]);
            String watchRunsOnNodeNow = response.getBodyAsDocNode().getAsString("node");
            String newAckedAt = response.getBodyAsDocNode().getAsNode("actions", new String[]{"testsearch", "acked"}).getOrDefault((Object)"on", (Object)"").toString();
            log.info("Watch moved from " + watchRunsOnNode + " to " + watchRunsOnNodeNow);
            Assert.assertEquals((String)response.getBody(), (Object)ackedAt, (Object)newAckedAt);
        }
        finally {
            if (restClient != null) {
                restClient.close();
            }
        }
    }

    static {
        IndexJobStateStore.includeNodeIdInSchedulerToJobStoreMapKeys = true;
        cluster = new LocalCluster.Builder().sslEnabled().resources("sg_config/no-tenants").nodeSettings(new Object[]{"signals.enabled", true, "signals.index_names.log", "signals_main_log", "searchguard.enterprise_modules_enabled", false, "searchguard.diagnosis.action_stack.enabled", true}).clusterConfiguration(ClusterConfiguration.THREE_MASTERS).enableModule(SignalsModule.class).waitForComponents(new String[]{"signals"}).embedded().build();
    }
}

