/*
 * Decompiled with CFR 0.152.
 */
package com.floragunn.signals;

import com.floragunn.codova.config.temporal.DurationFormat;
import com.floragunn.codova.validation.ConfigValidationException;
import com.floragunn.codova.validation.ValidatingDocNode;
import com.floragunn.codova.validation.ValidationErrors;
import com.floragunn.searchguard.internalauthtoken.InternalAuthTokenProvider;
import com.floragunn.searchguard.support.PrivilegedConfigClient;
import com.floragunn.searchguard.test.TestSgConfig;
import com.floragunn.searchguard.test.helper.cluster.LocalCluster;
import com.floragunn.searchguard.user.User;
import com.floragunn.searchsupport.diag.DiagnosticContext;
import com.floragunn.signals.MockWebserviceProvider;
import com.floragunn.signals.Signals;
import com.floragunn.signals.SignalsModule;
import com.floragunn.signals.SignalsTenant;
import com.floragunn.signals.execution.ActionExecutionException;
import com.floragunn.signals.execution.WatchExecutionContext;
import com.floragunn.signals.proxy.service.HttpProxyHostRegistry;
import com.floragunn.signals.settings.SignalsSettings;
import com.floragunn.signals.truststore.service.TrustManagerRegistry;
import com.floragunn.signals.watch.Watch;
import com.floragunn.signals.watch.WatchBuilder;
import com.floragunn.signals.watch.action.handlers.ActionExecutionResult;
import com.floragunn.signals.watch.action.handlers.ActionHandler;
import com.floragunn.signals.watch.common.Ack;
import com.floragunn.signals.watch.common.ValidationLevel;
import com.floragunn.signals.watch.init.WatchInitializationService;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import net.jcip.annotations.NotThreadSafe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.awaitility.Awaitility;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.node.PluginAwareNode;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.XContentType;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

@NotThreadSafe
public class SignalsTenantTest {
    private static final Logger log = LogManager.getLogger(SignalsTenantTest.class);
    private static TestSgConfig.User USER_CERTIFICATE = new TestSgConfig.User("certificate-user").roles(new TestSgConfig.Role[]{TestSgConfig.Role.ALL_ACCESS});
    public static final String UPLOADED_TRUSTSTORE_ID = "uploaded-truststore-id";
    @ClassRule
    public static LocalCluster.Embedded cluster = new LocalCluster.Builder().singleNode().sslEnabled().resources("sg_config/no-tenants").nodeSettings(new Object[]{"signals.enabled", true, "signals.index_names.log", "signals_main_log", "searchguard.enterprise_modules_enabled", false}).enableModule(SignalsModule.class).waitForComponents(new String[]{"signals"}).user(USER_CERTIFICATE).embedded().build();
    private static ClusterService clusterService;
    private static NodeEnvironment nodeEnvironment;
    private static NamedXContentRegistry xContentRegistry;
    private static ScriptService scriptService;
    private static InternalAuthTokenProvider internalAuthTokenProvider;
    private static DiagnosticContext diagnosticContext;
    private static TrustManagerRegistry trustManagerRegistry;
    private static HttpProxyHostRegistry httpProxyHostRegistry;
    private static FeatureService featureService;
    private static final User UHURA;

    @BeforeClass
    public static void setupTestData() throws Throwable {
        cluster.before();
        PluginAwareNode node = cluster.node();
        clusterService = (ClusterService)node.injector().getInstance(ClusterService.class);
        xContentRegistry = (NamedXContentRegistry)node.injector().getInstance(NamedXContentRegistry.class);
        scriptService = (ScriptService)node.injector().getInstance(ScriptService.class);
        internalAuthTokenProvider = (InternalAuthTokenProvider)node.injector().getInstance(InternalAuthTokenProvider.class);
        nodeEnvironment = (NodeEnvironment)node.injector().getInstance(NodeEnvironment.class);
        diagnosticContext = (DiagnosticContext)node.injector().getInstance(DiagnosticContext.class);
        trustManagerRegistry = ((Signals)cluster.getInjectable(Signals.class)).getTruststoreRegistry();
        httpProxyHostRegistry = ((Signals)cluster.getInjectable(Signals.class)).getHttpProxyHostRegistry();
        featureService = ((Signals)cluster.getInjectable(Signals.class)).getFeatureService();
        PrivilegedConfigClient privilegedConfigClient = PrivilegedConfigClient.adapt((Client)cluster.getInternalNodeClient());
        Client client = cluster.getInternalNodeClient();
        Watch watch = new WatchBuilder("test").cronTrigger("*/2 * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink").name("testsink").throttledFor("5s").build();
        watch.setTenant("test");
        XContentBuilder xContentBuilder = XContentFactory.jsonBuilder();
        watch.toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
        privilegedConfigClient.index(((IndexRequest)new IndexRequest(".signals_watches").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(xContentBuilder).id("test/test_watch")).actionGet();
        client.index(new IndexRequest("testsource").source(XContentType.JSON, new Object[]{"key1", "val1", "key2", "val2"})).actionGet();
        client.index(((IndexRequest)new IndexRequest("testsource").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, new Object[]{"a", "x", "b", "y"})).actionGet();
        client.index(((IndexRequest)new IndexRequest("testsource").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, new Object[]{"a", "xx", "b", "yy"})).actionGet();
    }

    @Ignore(value="TODO why is this ignored?")
    @Test
    public void initializationTest() throws Exception {
        Client client = cluster.getInternalNodeClient();
        Settings settings = Settings.builder().build();
        try (SignalsTenant tenant = new SignalsTenant("test", client, clusterService, nodeEnvironment, scriptService, xContentRegistry, internalAuthTokenProvider, new SignalsSettings(settings), null, diagnosticContext, (ThreadPool)Mockito.mock(ThreadPool.class), trustManagerRegistry, httpProxyHostRegistry, featureService);){
            tenant.init();
            Assert.assertEquals((long)1L, (long)tenant.getLocalWatchCount());
            Assert.assertTrue((boolean)tenant.runsWatchLocally("test_watch"));
        }
    }

    @Ignore(value="TODO somethings wrong with mockito here")
    @Test
    public void nodeFilterTest() throws Exception {
        Client client = cluster.getInternalNodeClient();
        SignalsSettings settings = (SignalsSettings)Mockito.mock(SignalsSettings.class, (Answer)Mockito.RETURNS_DEEP_STUBS);
        Mockito.when((Object)settings.getTenant("test").getNodeFilter()).thenReturn((Object)"unknown_attr:true");
        try (SignalsTenant tenant = new SignalsTenant("test", client, clusterService, nodeEnvironment, scriptService, xContentRegistry, internalAuthTokenProvider, settings, null, diagnosticContext, (ThreadPool)Mockito.mock(ThreadPool.class), trustManagerRegistry, httpProxyHostRegistry, featureService);){
            tenant.init();
            Assert.assertEquals((long)0L, (long)tenant.getLocalWatchCount());
            Assert.assertFalse((boolean)tenant.runsWatchLocally("test_watch"));
        }
    }

    @Test
    public void failoverTest() throws Exception {
        Ack ackedTime1;
        Client client = cluster.getInternalNodeClient();
        Settings settings = Settings.builder().build();
        try (SignalsTenant tenant = new SignalsTenant("failover_test", client, clusterService, nodeEnvironment, scriptService, xContentRegistry, internalAuthTokenProvider, new SignalsSettings(settings), null, diagnosticContext, (ThreadPool)Mockito.mock(ThreadPool.class), trustManagerRegistry, httpProxyHostRegistry, featureService);){
            tenant.init();
            Watch watch = new WatchBuilder("test_watch").atInterval("100ms").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink").name("testsink").build();
            tenant.addWatch(watch, UHURA, ValidationLevel.STRICT);
            for (int i = 0; i < 20; ++i) {
                Thread.sleep(100L);
                if (tenant.getLocalWatchCount() != 0) break;
            }
            Assert.assertEquals((long)1L, (long)tenant.getLocalWatchCount());
            Assert.assertTrue((boolean)tenant.runsWatchLocally("test_watch"));
            Thread.sleep(500L);
            ArrayList ackedActions = new ArrayList(tenant.ack("test_watch", new User("horst")).keySet());
            Assert.assertEquals(Arrays.asList("testsink"), ackedActions);
            ackedTime1 = tenant.getWatchStateManager().getWatchState("test_watch").getActionState("testsink").getAcked();
            Assert.assertNotNull((Object)ackedTime1);
            Thread.sleep(500L);
        }
        Thread.sleep(1000L);
        tenant = new SignalsTenant("failover_test", client, clusterService, nodeEnvironment, scriptService, xContentRegistry, internalAuthTokenProvider, new SignalsSettings(settings), null, diagnosticContext, (ThreadPool)Mockito.mock(ThreadPool.class), trustManagerRegistry, httpProxyHostRegistry, featureService);
        try {
            tenant.init();
            for (int i = 0; i < 20; ++i) {
                Thread.sleep(100L);
                if (tenant.getLocalWatchCount() != 0) break;
            }
            Assert.assertEquals((long)1L, (long)tenant.getLocalWatchCount());
            Assert.assertTrue((boolean)tenant.runsWatchLocally("test_watch"));
            Ack ackedTime2 = tenant.getWatchStateManager().getWatchState("test_watch").getActionState("testsink").getAcked();
            Assert.assertEquals((Object)ackedTime1, (Object)ackedTime2);
        }
        finally {
            tenant.close();
        }
    }

    @Test
    public void failoverWhileRunningTest() throws Exception {
        Client client = cluster.getInternalNodeClient();
        Settings settings = Settings.builder().build();
        try (SignalsTenant tenant = new SignalsTenant("failover_while_running_test", client, clusterService, nodeEnvironment, scriptService, xContentRegistry, internalAuthTokenProvider, new SignalsSettings(settings), null, diagnosticContext, (ThreadPool)Mockito.mock(ThreadPool.class), trustManagerRegistry, httpProxyHostRegistry, featureService);){
            tenant.init();
            Watch watch = new WatchBuilder("test_watch").atInterval("100ms").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().act(new SleepAction(Duration.ofSeconds(4L))).name("sleep").and().index("failover_while_running_testsink").name("testsink").build();
            tenant.addWatch(watch, UHURA, ValidationLevel.STRICT);
            for (int i = 0; i < 20; ++i) {
                Thread.sleep(100L);
                if (tenant.getLocalWatchCount() != 0) break;
            }
            Assert.assertEquals((long)1L, (long)tenant.getLocalWatchCount());
            Assert.assertTrue((boolean)tenant.runsWatchLocally("test_watch"));
            Thread.sleep(500L);
            tenant.shutdownHard();
        }
        Thread.sleep(1000L);
        tenant = new SignalsTenant("failover_while_running_test", client, clusterService, nodeEnvironment, scriptService, xContentRegistry, internalAuthTokenProvider, new SignalsSettings(settings), null, diagnosticContext, (ThreadPool)Mockito.mock(ThreadPool.class), trustManagerRegistry, httpProxyHostRegistry, featureService);
        try {
            tenant.init();
            for (int i = 0; i < 20; ++i) {
                Thread.sleep(100L);
                if (tenant.getLocalWatchCount() != 0) break;
            }
            Assert.assertEquals((long)1L, (long)tenant.getLocalWatchCount());
            Assert.assertTrue((boolean)tenant.runsWatchLocally("test_watch"));
        }
        finally {
            tenant.close();
        }
    }

    @Test
    public void shouldInitWatchWhichUsesTruststoreDuringStartupTime() throws Throwable {
        String tenantName = "test_tenant";
        String watchId = "tls_execution_test";
        try (MockWebserviceProvider webhookProvider = new MockWebserviceProvider("/tls_endpoint", true, false);){
            Client client = cluster.getInternalNodeClient();
            webhookProvider.uploadMockServerCertificateAsTruststore((LocalCluster)cluster, USER_CERTIFICATE, UPLOADED_TRUSTSTORE_ID);
            Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().postWebhook(webhookProvider.getUri()).truststoreId(UPLOADED_TRUSTSTORE_ID).throttledFor("0").name("send-http-request").build();
            Settings settings = Settings.builder().build();
            try (SignalsTenant tenant = new SignalsTenant(tenantName, client, clusterService, nodeEnvironment, scriptService, xContentRegistry, internalAuthTokenProvider, new SignalsSettings(settings), null, diagnosticContext, (ThreadPool)Mockito.mock(ThreadPool.class), trustManagerRegistry, httpProxyHostRegistry, featureService);){
                tenant.init();
                tenant.addWatch(watch, UHURA, ValidationLevel.STRICT);
                Awaitility.await().until(() -> tenant.getLocalWatchCount() != 0);
                Assert.assertEquals((long)1L, (long)tenant.getLocalWatchCount());
                Assert.assertTrue((boolean)tenant.runsWatchLocally(watchId));
                Awaitility.await().until(() -> webhookProvider.getRequestCount() > 0);
            }
            Thread.sleep(1000L);
            log.debug("Current number of webhook requests " + webhookProvider.getRequestCount());
            int requestCountBeforeSecondClusterStart = webhookProvider.getRequestCount();
            try (SignalsTenant tenant = new SignalsTenant(tenantName, client, clusterService, nodeEnvironment, scriptService, xContentRegistry, internalAuthTokenProvider, new SignalsSettings(settings), null, diagnosticContext, (ThreadPool)Mockito.mock(ThreadPool.class), trustManagerRegistry, httpProxyHostRegistry, featureService);){
                tenant.init();
                Awaitility.await().until(() -> tenant.getLocalWatchCount() != 0);
                Assert.assertEquals((long)1L, (long)tenant.getLocalWatchCount());
                Assert.assertTrue((boolean)tenant.runsWatchLocally(watchId));
                Awaitility.await().until(() -> webhookProvider.getRequestCount() > requestCountBeforeSecondClusterStart);
                log.debug("Current number of webhook requests " + webhookProvider.getRequestCount());
            }
        }
    }

    static {
        UHURA = User.forUser((String)"uhura").backendRoles(new String[]{"signals_admin", "all_access"}).build();
        ActionHandler.factoryRegistry.add(new ActionHandler.Factory[]{new SleepAction.Factory()});
    }

    static class SleepAction
    extends ActionHandler {
        private Duration duration;

        SleepAction(Duration duration) {
            this.duration = duration;
        }

        public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
            builder.field("duration", DurationFormat.INSTANCE.format(this.duration));
            return builder;
        }

        public ActionExecutionResult execute(WatchExecutionContext ctx) throws ActionExecutionException {
            try {
                Thread.sleep(this.duration.toMillis());
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            return new ActionExecutionResult("zzz");
        }

        public String getType() {
            return "sleep";
        }

        public static class Factory
        extends ActionHandler.Factory<SleepAction> {
            public Factory() {
                super("sleep");
            }

            protected SleepAction create(WatchInitializationService watchInitService, ValidatingDocNode vJsonNode, ValidationErrors validationErrors) throws ConfigValidationException {
                return new SleepAction(vJsonNode.get("duration").asDuration());
            }
        }
    }
}

