/*
 * Decompiled with CFR 0.152.
 */
package com.floragunn.signals;

import com.floragunn.codova.config.temporal.DurationExpression;
import com.floragunn.codova.documents.DocNode;
import com.floragunn.codova.documents.Document;
import com.floragunn.codova.documents.Format;
import com.floragunn.fluent.collections.ImmutableList;
import com.floragunn.fluent.collections.ImmutableSet;
import com.floragunn.searchguard.test.GenericRestClient;
import com.floragunn.searchguard.test.TestSgConfig;
import com.floragunn.searchguard.test.helper.cluster.LocalCluster;
import com.floragunn.searchguard.test.helper.network.SocketUtils;
import com.floragunn.searchsupport.junit.LoggingTestWatcher;
import com.floragunn.searchsupport.junit.matcher.DocNodeMatchers;
import com.floragunn.searchsupport.proxy.wiremock.WireMockRequestHeaderAddingFilter;
import com.floragunn.signals.MockWebserviceProvider;
import com.floragunn.signals.Signals;
import com.floragunn.signals.SignalsModule;
import com.floragunn.signals.proxy.service.HttpProxyHostRegistry;
import com.floragunn.signals.truststore.service.TrustManagerRegistry;
import com.floragunn.signals.util.WatchLogSearch;
import com.floragunn.signals.watch.Watch;
import com.floragunn.signals.watch.WatchBuilder;
import com.floragunn.signals.watch.action.handlers.email.EmailAccount;
import com.floragunn.signals.watch.action.handlers.email.EmailAction;
import com.floragunn.signals.watch.action.handlers.slack.SlackAccount;
import com.floragunn.signals.watch.action.handlers.slack.SlackActionConf;
import com.floragunn.signals.watch.common.HttpRequestConfig;
import com.floragunn.signals.watch.common.ValidationLevel;
import com.floragunn.signals.watch.common.throttle.ThrottlePeriodParser;
import com.floragunn.signals.watch.common.throttle.ValidatingThrottlePeriodParser;
import com.floragunn.signals.watch.init.WatchInitializationService;
import com.floragunn.signals.watch.result.ActionLog;
import com.floragunn.signals.watch.result.Status;
import com.floragunn.signals.watch.result.WatchLog;
import com.floragunn.signals.watch.severity.SeverityLevel;
import com.github.tomakehurst.wiremock.core.Options;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.extension.Extension;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import com.icegreen.greenmail.util.GreenMail;
import com.icegreen.greenmail.util.GreenMailUtil;
import com.icegreen.greenmail.util.ServerSetup;
import jakarta.mail.Part;
import java.net.InetAddress;
import java.net.URI;
import java.time.DayOfWeek;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import net.jcip.annotations.NotThreadSafe;
import org.apache.http.Header;
import org.apache.http.entity.ContentType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.awaitility.Awaitility;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentType;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import org.quartz.TimeOfDay;

@NotThreadSafe
public class RestApiTest {
    private static final Logger log = LogManager.getLogger(RestApiTest.class);
    public static final String USERNAME_UHURA = "uhura";
    public static final String UPLOADED_TRUSTSTORE_ID = "uploaded-truststore-id";
    public static final String SIGNALS_LOGS_INDEX_NAME = ".signals__main_log";
    public static final String HUGE_DOCUMENT_INDEX = "huge_document_index";
    public static final int HUGE_DOCUMENT_FIELD_COUNT = 970;
    private static ScriptService scriptService;
    private static ThrottlePeriodParser throttlePeriodParser;
    private final WatchInitializationService watchInitializationService = new WatchInitializationService(null, scriptService, (TrustManagerRegistry)Mockito.mock(TrustManagerRegistry.class), (HttpProxyHostRegistry)Mockito.mock(HttpProxyHostRegistry.class), throttlePeriodParser, ValidationLevel.STRICT);
    private static final WireMockRequestHeaderAddingFilter REQUEST_HEADER_ADDING_FILTER;
    @Rule
    public WireMockRule wireMockProxy = new WireMockRule((Options)WireMockConfiguration.options().bindAddress("127.0.0.8").enableBrowserProxying(true).proxyPassThrough(true).dynamicPort().extensions(new Extension[]{REQUEST_HEADER_ADDING_FILTER}));
    @Rule
    public LoggingTestWatcher loggingTestWatcher = new LoggingTestWatcher();
    private static TestSgConfig.User USER_CERTIFICATE;
    @ClassRule
    public static LocalCluster.Embedded cluster;

    @BeforeClass
    public static void setupTestData() {
        Client client = cluster.getInternalNodeClient();
        client.index(new IndexRequest("testsource").source(XContentType.JSON, new Object[]{"key1", "val1", "key2", "val2"})).actionGet();
        client.index(((IndexRequest)new IndexRequest("testsource").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, new Object[]{"a", "x", "b", "y"})).actionGet();
        client.index(((IndexRequest)new IndexRequest("testsource").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, new Object[]{"a", "xx", "b", "yy"})).actionGet();
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(HUGE_DOCUMENT_INDEX).settings(Settings.builder().put("mapping.total_fields.limit", 3000).build());
        client.admin().indices().create(createIndexRequest).actionGet();
        Object[] hugeDocument = new String[1940];
        for (int i = 0; i < hugeDocument.length; i += 2) {
            hugeDocument[i] = "key_" + i;
            hugeDocument[i + 1] = "value_" + i;
        }
        client.index(((IndexRequest)new IndexRequest(HUGE_DOCUMENT_INDEX).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, hugeDocument)).actionGet();
    }

    @BeforeClass
    public static void setupDependencies() throws Exception {
        scriptService = (ScriptService)cluster.getInjectable(ScriptService.class);
        throttlePeriodParser = new ValidatingThrottlePeriodParser(((Signals)cluster.getInjectable(Signals.class)).getSignalsSettings());
    }

    @Test
    public void testGetWatchUnauthorized() throws Exception {
        String tenant = "_main";
        String watchId = "get_watch_unauth";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient("noshirt", "redshirt", new Header[0]);){
            Client client = cluster.getInternalNodeClient();
            GenericRestClient.HttpResponse response = restClient.get(watchPath, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)403L, (long)response.getStatusCode());
        }
    }

    @Test
    public void testPutWatch() throws Exception {
        String tenant = "_main";
        String watchId = "put_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest("testsink_put_watch")).actionGet();
            Watch watch = new WatchBuilder(watchId).cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink_put_watch").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, (ToXContentObject)watch);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.get(watchPath, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            watch = Watch.parseFromElasticDocument((WatchInitializationService)this.watchInitializationService, (String)"test", (String)"put_test", (String)response.getBody(), (long)-1L);
            this.awaitMinCountOfDocuments(client, "testsink_put_watch", 1L);
        }
    }

    @Test
    public void testPutWatchWithInvalidTrigger() throws Exception {
        String tenant = "_main";
        String watchId = "put_test_invalid_trigger";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String watchJson = "{ \"trigger\": {\"schedule\": {\"daily\": {\"at\": \"\"} } }, \"checks\":[], \"actions\":[], \"active\":false, \"log_runtime_data\":false }";
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watchJson, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)400L, (long)response.getStatusCode());
            Assert.assertTrue((String)response.getBody(), (boolean)response.getBody().contains("Invalid value"));
            Assert.assertTrue((String)response.getBody(), (boolean)response.getBody().contains("Time of day"));
        }
    }

    @Test
    public void testWatchStateAfterPutWatch() throws Exception {
        String tenant = "_main";
        String watchId = "put_state_after_put_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Watch watch = new WatchBuilder(watchId).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink_put_watch").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = this.awaitRestGet(watchPath + "/_state", restClient);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            response = restClient.postJson("/_signals/watch/" + tenant + "/_search/_state", "{ \"query\": {\"match\": {\"_id\": \"_main/put_state_after_put_test\"}}}", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Assert.assertTrue((String)response.getBody(), (boolean)response.getBody().contains("\"hits\":{\"total\":{\"value\":1,\"relation\":\"eq\"}"));
        }
    }

    @Test
    public void testPutWatchWithSeverity() throws Exception {
        String tenant = "_main";
        String watchId = "put_test_severity";
        String testSink = "testsink_" + watchId;
        String testSinkResolve = "testsink_resolve_" + watchId;
        String testSource = "testsource_" + watchId;
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.index(((IndexRequest)new IndexRequest(testSource).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).id("1").source(XContentType.JSON, new Object[]{"a", "x", "b", "y"})).actionGet();
            client.admin().indices().create(new CreateIndexRequest(testSink)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(testSinkResolve)).actionGet();
            Watch watch = new WatchBuilder(watchId).cronTrigger("* * * * * ?").search(testSource).query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").consider("data.testsearch.hits.total.value").greaterOrEqual(1.0).as(SeverityLevel.ERROR).when(SeverityLevel.ERROR, new SeverityLevel[0]).index(testSink).name("a1").and().whenResolved(SeverityLevel.ERROR, new SeverityLevel[0]).index(testSinkResolve).name("r1").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.get(watchPath, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            watch = Watch.parseFromElasticDocument((WatchInitializationService)this.watchInitializationService, (String)"test", (String)"put_test", (String)response.getBody(), (long)-1L);
            this.awaitMinCountOfDocuments(client, testSink, 1L);
            Assert.assertEquals((String)this.getDocs(client, testSinkResolve), (long)0L, (long)this.getCountOfDocuments(client, testSinkResolve));
            client.delete(((DeleteRequest)new DeleteRequest(testSource).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).id("1")).actionGet();
            this.awaitMinCountOfDocuments(client, testSinkResolve, 1L);
            Thread.sleep(2000L);
            Assert.assertEquals((String)this.getDocs(client, testSinkResolve), (long)1L, (long)this.getCountOfDocuments(client, testSinkResolve));
        }
    }

    @Test
    public void testPutWatchWithSeverityValidation() throws Exception {
        String tenant = "_main";
        String watchId = "put_test_severity_validation";
        String testSink = "testsink_" + watchId;
        String testSinkResolve = "testsink_resolve_" + watchId;
        String testSource = "testsource_" + watchId;
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.index(((IndexRequest)new IndexRequest(testSource).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).id("1").source(XContentType.JSON, new Object[]{"a", "x", "b", "y"})).actionGet();
            client.admin().indices().create(new CreateIndexRequest(testSink)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(testSinkResolve)).actionGet();
            Watch watch = new WatchBuilder(watchId).cronTrigger("* * * * * ?").search(testSource).query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").consider("data.testsearch.hits.total.value").greaterOrEqual(1.0).as(SeverityLevel.ERROR).when(SeverityLevel.INFO, new SeverityLevel[0]).index(testSink).name("a1").and().whenResolved(SeverityLevel.ERROR, new SeverityLevel[0]).index(testSinkResolve).name("r1").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)400L, (long)response.getStatusCode());
            Assert.assertTrue((String)response.getBody(), (boolean)response.getBody().contains("Uses a severity which is not defined by severity mapping: [info]"));
        }
    }

    @Test
    public void testPutWatchWithSeverity2() throws Exception {
        String tenant = "_main";
        String watchId = "put_test_severity2";
        String testSink = "testsink_" + watchId;
        String testSinkResolve1 = "testsink_resolve1_" + watchId;
        String testSinkResolve2 = "testsink_resolve2_" + watchId;
        String testSource = "testsource_" + watchId;
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.index(((IndexRequest)new IndexRequest(testSource).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).id("1").source(XContentType.JSON, new Object[]{"a", "x", "b", "y"})).actionGet();
            this.awaitMinCountOfDocuments(client, testSource, 1L);
            client.admin().indices().create(new CreateIndexRequest(testSink)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(testSinkResolve1)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(testSinkResolve2)).actionGet();
            Watch watch = new WatchBuilder(watchId).atMsInterval(400L).search(testSource).query("{\"match_all\" : {} }").as("testsearch").consider("data.testsearch.hits.total.value").greaterOrEqual(1.0).as(SeverityLevel.ERROR).greaterOrEqual(2.0).as(SeverityLevel.CRITICAL).when(SeverityLevel.ERROR, SeverityLevel.CRITICAL).index(testSink).name("a1").throttledFor("24h").and().whenResolved(SeverityLevel.ERROR, new SeverityLevel[0]).index(testSinkResolve1).name("r1").and().whenResolved(SeverityLevel.CRITICAL, new SeverityLevel[0]).index(testSinkResolve2).name("r2").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.get(watchPath, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            watch = Watch.parseFromElasticDocument((WatchInitializationService)this.watchInitializationService, (String)"test", (String)"put_test", (String)response.getBody(), (long)-1L);
            log.info("Created watch; as it should find one doc in " + testSource + ", it should go to severity ERROR and write exactly one doc to " + testSink);
            this.awaitMinCountOfDocuments(client, testSink, 1L);
            Thread.sleep(500L);
            Assert.assertEquals((long)0L, (long)this.getCountOfDocuments(client, testSinkResolve1));
            Assert.assertEquals((long)0L, (long)this.getCountOfDocuments(client, testSinkResolve2));
            Assert.assertEquals((long)1L, (long)this.getCountOfDocuments(client, testSink));
            log.info("Adding one doc to " + testSource + "; this should raise severity from ERROR to CRITICAL and write exactly one doc to " + testSink);
            client.index(((IndexRequest)new IndexRequest(testSource).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).id("2").source(XContentType.JSON, new Object[]{"a", "x", "b", "y"})).actionGet();
            this.awaitMinCountOfDocuments(client, testSink, 2L);
            Thread.sleep(500L);
            Assert.assertEquals((long)0L, (long)this.getCountOfDocuments(client, testSinkResolve1));
            Assert.assertEquals((long)0L, (long)this.getCountOfDocuments(client, testSinkResolve2));
            Assert.assertEquals((String)this.getDocs(client, testSink), (long)2L, (long)this.getCountOfDocuments(client, testSink));
            client.delete(((DeleteRequest)new DeleteRequest(testSource).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).id("1")).actionGet();
            this.awaitMinCountOfDocuments(client, testSinkResolve2, 1L);
            Thread.sleep(200L);
            Assert.assertEquals((String)this.getDocs(client, testSinkResolve1), (long)0L, (long)this.getCountOfDocuments(client, testSinkResolve1));
            Assert.assertEquals((String)this.getDocs(client, testSinkResolve2), (long)1L, (long)this.getCountOfDocuments(client, testSinkResolve2));
            Assert.assertEquals((String)this.getDocs(client, testSink), (long)2L, (long)this.getCountOfDocuments(client, testSink));
            client.delete(((DeleteRequest)new DeleteRequest(testSource).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).id("2")).actionGet();
            this.awaitMinCountOfDocuments(client, testSinkResolve1, 1L);
            Thread.sleep(200L);
            Assert.assertEquals((String)this.getDocs(client, testSinkResolve1), (long)1L, (long)this.getCountOfDocuments(client, testSinkResolve1));
            Assert.assertEquals((String)this.getDocs(client, testSinkResolve2), (long)1L, (long)this.getCountOfDocuments(client, testSinkResolve2));
            Assert.assertEquals((String)this.getDocs(client, testSink), (long)2L, (long)this.getCountOfDocuments(client, testSink));
        }
    }

    @Test
    public void testPutWatchWithDash() throws Exception {
        String tenant = "dash-tenant";
        String watchId = "dash-watch";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest("testsink_put_watch_with_dash")).actionGet();
            Watch watch = new WatchBuilder(watchId).cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink_put_watch_with_dash").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.get(watchPath, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            watch = Watch.parseFromElasticDocument((WatchInitializationService)this.watchInitializationService, (String)"test", (String)"put_test", (String)response.getBody(), (long)-1L);
            this.awaitMinCountOfDocuments(client, "testsink_put_watch_with_dash", 1L);
            restClient.delete(watchPath, new Header[0]);
            Thread.sleep(500L);
            response = restClient.get(watchPath, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)404L, (long)response.getStatusCode());
        }
    }

    @Test
    public void testPutWatchWithoutSchedule() throws Exception {
        String tenant = "_main";
        String watchId = "without_schedule";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Watch watch = new WatchBuilder(watchId).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink_put_watch_with_dash").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.get(watchPath, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            watch = Watch.parseFromElasticDocument((WatchInitializationService)this.watchInitializationService, (String)"test", (String)"put_test", (String)response.getBody(), (long)-1L);
            Assert.assertTrue((String)response.getBody(), (boolean)watch.getSchedule().getTriggers().isEmpty());
        }
    }

    @Test
    public void testAuthTokenFilter() throws Exception {
        String tenant = "_main";
        String watchId = "filter";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Watch watch = new WatchBuilder(watchId).cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink_put_watch").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.get(watchPath, new Header[0]);
            Assert.assertFalse((String)response.getBody(), (boolean)response.getBody().contains("auth_token"));
            watch = Watch.parseFromElasticDocument((WatchInitializationService)this.watchInitializationService, (String)"test", (String)watchId, (String)response.getBody(), (long)-1L);
            Assert.assertNull((String)response.getBody(), (Object)watch.getAuthToken());
        }
    }

    @Test
    public void testPutInvalidWatch() throws Exception {
        String tenant = "_main";
        String watchId = "put_invalid_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            String watchJson = "{\"trigger\":{\"schedule\":{\"timezone\":\"Europe/Berlino\",\"cron\":[\"* * argh * * ?\"],\"x\": 2}},\"checks\":[{\"type\":\"searchx\",\"name\":\"testsearch\",\"target\":\"testsearch\",\"request\":{\"indices\":[\"testsource\"],\"body\":{\"query\":{\"match_all\":{}}}}},{\"type\":\"static\",\"name\":\"teststatic\",\"target\":\"teststatic\",\"value\":{\"bla\":{\"blub\":42}}},{\"type\":\"transform\",\"target\":\"testtransform\",\"source\":\"1 + x\"},{\"type\":\"calc\",\"name\":\"testcalc\",\"source\":\"1 +\"}],\"actions\":[{\"type\":\"index\",\"index\":\"testsink_put_watch\"}],\"horst\": true}";
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watchJson, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)400L, (long)response.getStatusCode());
            DocNode parsedResponse = response.getBodyAsDocNode();
            Assert.assertEquals((String)response.getBody(), (Object)400, (Object)parsedResponse.get("status"));
            Assert.assertEquals((String)response.getBody(), (Object)"Invalid value", (Object)((DocNode)parsedResponse.getAsNode("detail").getAsListOfNodes("checks[testsearch].type").get(0)).get("error"));
            Assert.assertEquals((String)response.getBody(), (Object)"searchx", (Object)((DocNode)parsedResponse.getAsNode("detail").getAsListOfNodes("checks[testsearch].type").get(0)).get("value"));
            Assert.assertEquals((String)response.getBody(), (Object)"cannot resolve symbol [x]", (Object)((DocNode)parsedResponse.getAsNode("detail").getAsListOfNodes("checks[].source").get(0)).get("error"));
            Assert.assertTrue((String)response.getBody(), (boolean)((DocNode)parsedResponse.getAsNode("detail").getAsListOfNodes("trigger.schedule.cron.0").get(0)).get("error").toString().contains("Invalid cron expression"));
            Assert.assertTrue((String)response.getBody(), (boolean)((DocNode)parsedResponse.getAsNode("detail").getAsListOfNodes("trigger.schedule.x").get(0)).get("error").toString().contains("Unsupported attribute"));
            Assert.assertEquals((String)response.getBody(), (Object)"Required attribute is missing", (Object)((DocNode)parsedResponse.getAsNode("detail").getAsListOfNodes("actions[].name").get(0)).get("error"));
            Assert.assertEquals((String)response.getBody(), (Object)"unexpected end of script.", (Object)((DocNode)parsedResponse.getAsNode("detail").getAsListOfNodes("checks[testcalc].source").get(0)).get("error"));
            Assert.assertEquals((String)response.getBody(), (Object)"Unsupported attribute", (Object)((DocNode)parsedResponse.getAsNode("detail").getAsListOfNodes("horst").get(0)).get("error"));
        }
    }

    @Test
    public void testPutInvalidWatchJsonSyntaxError() throws Exception {
        String tenant = "_main";
        String watchId = "put_invalid_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            String watchJson = "{\"trigger\":{";
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watchJson, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)400L, (long)response.getStatusCode());
            DocNode parsedResponse = response.getBodyAsDocNode();
            Assert.assertEquals((String)response.getBody(), (Object)400, (Object)parsedResponse.get("status"));
            Assert.assertTrue((String)response.getBody(), (boolean)((DocNode)parsedResponse.getAsNode("detail").getAsListOfNodes("_").get(0)).get("error").toString().contains("Invalid JSON document"));
        }
    }

    @Test
    public void testPutInvalidWatch_invalidHttpRequestBodyConfig_bothBodyAndJsonBodyFromAreSet() throws Exception {
        String tenant = "_main";
        String watchId = "put_invalid_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            DocNode watch = DocNode.of((String)"actions", Collections.singletonList(DocNode.of((String)"type", (Object)"webhook", (String)"name", (Object)"webhook_with_two_request_bodies", (String)"request", (Object)DocNode.of((String)"method", (Object)"POST", (String)"url", (Object)"https://my.test.web.hook/endpoint", (String)"body", (Object)"first_body", (String)"json_body_from", (Object)"second.body"))));
            System.out.println(watch.toJsonString());
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJsonString(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)400L, (long)response.getStatusCode());
            DocNode parsedResponse = DocNode.parse((Format)Format.getByContentType((String)response.getContentType())).from(response.getBody());
            Assert.assertEquals((String)response.getBody(), (long)400L, (long)response.getStatusCode());
            Assert.assertEquals((String)response.getBody(), (long)2L, (long)parsedResponse.getAsNode("detail").size());
            Assert.assertEquals((String)response.getBody(), (Object)"Both body and json_body_from are set. These are mutually exclusive.", (Object)parsedResponse.findSingleNodeByJsonPath("detail['actions[webhook_with_two_request_bodies].request.body'][0]").getAsString("error"));
            Assert.assertEquals((String)response.getBody(), (Object)"Both body and json_body_from are set. These are mutually exclusive.", (Object)parsedResponse.findSingleNodeByJsonPath("detail['actions[webhook_with_two_request_bodies].request.json_body_from'][0]").getAsString("error"));
        }
    }

    @Test
    public void testPutInvalidWatch_httpRequestContentTypeAppXml_whenJsonBodyFromIsSet() throws Exception {
        String tenant = "_main";
        String watchId = "put_invalid_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            DocNode watch = DocNode.of((String)"actions", Collections.singletonList(DocNode.of((String)"type", (Object)"webhook", (String)"name", (Object)"json_body_from_and_wrong_content_type", (String)"request", (Object)DocNode.of((String)"method", (Object)"POST", (String)"url", (Object)"https://my.test.web.hook/endpoint", (String)"json_body_from", (Object)"data.test", (String)"headers", (Object)DocNode.of((String)"Content-Type", (Object)"application/xml")))));
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJsonString(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)400L, (long)response.getStatusCode());
            DocNode parsedResponse = DocNode.parse((Format)Format.getByContentType((String)response.getContentType())).from(response.getBody());
            Assert.assertEquals((String)response.getBody(), (long)400L, (long)response.getStatusCode());
            Assert.assertEquals((String)response.getBody(), (long)1L, (long)parsedResponse.getAsNode("detail").size());
            Assert.assertEquals((String)response.getBody(), (Object)"Content type header should be set to application/json when json_body_from is used.", (Object)parsedResponse.findSingleNodeByJsonPath("detail['actions[json_body_from_and_wrong_content_type].request.headers.Content-Type'][0]").getAsString("error"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPutWatch_bodyFromRuntimeDataPath_contentTypeShouldDefaultToAppJson() throws Exception {
        String tenant = "_main";
        String watchId = "put_watch_with_body_from_runtime_data_default_content_type_header";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (MockWebserviceProvider webhookProvider = new MockWebserviceProvider("/hook");
             GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            try {
                Watch watch = new WatchBuilder("put_test").cronTrigger("* * * * * ?").put("{\"test\": \"test\"}").as("teststatic").then().postWebhook(webhookProvider.getUri()).jsonBodyFrom("data.teststatic.test").name("webhook_with_default_content_type").build();
                GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                Thread.sleep(3000L);
                Assert.assertTrue((webhookProvider.getRequestCount() > 0 ? 1 : 0) != 0);
                Header header = webhookProvider.getLastRequestHeader("Content-Type");
                Assert.assertNotNull((String)"content type header should be present", (Object)header);
                Assert.assertEquals((String)("content type header should contain " + ContentType.APPLICATION_JSON.getMimeType()), (Object)ContentType.APPLICATION_JSON.getMimeType(), (Object)header.getValue());
                Assert.assertEquals((String)"webhook request body should match", (Object)"\"test\"", (Object)webhookProvider.getLastRequestBody());
            }
            finally {
                restClient.delete(watchPath, new Header[0]);
            }
        }
    }

    @Test
    public void testPutWatchUnauthorized() throws Exception {
        String tenant = "_main";
        String watchId = "put_watch_unauth";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient("redshirt3", "redshirt", new Header[0]).trackResources();){
            Watch watch = new WatchBuilder("put_test").cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink_put_watch").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)403L, (long)response.getStatusCode());
        }
    }

    @Test
    public void testPutWatchWithUnauthorizedCheck() throws Exception {
        String tenant = "_main";
        String watchId = "put_watch_with_unauth_check";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient("redshirt2", "redshirt", new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest("testsink_put_watch_with_unauth_check")).actionGet();
            Watch watch = new WatchBuilder("put_test").cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink_put_watch_with_unauth_action").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            WatchLog watchLog = this.awaitWatchLog(client, tenant, watchId);
            Assert.assertEquals((String)watchLog.toString(), (Object)Status.Code.EXECUTION_FAILED, (Object)watchLog.getStatus().getCode());
            Assert.assertTrue((String)watchLog.toString(), (boolean)watchLog.getStatus().getDetail().contains("Error while executing SearchInput testsearch"));
            Assert.assertTrue((String)watchLog.toString(), (boolean)watchLog.getStatus().getDetail().contains("Insufficient permissions"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHttpWhitelist() throws Exception {
        String tenant = "_main";
        String watchId = "http_whitelist";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (MockWebserviceProvider webhookProvider = new MockWebserviceProvider("/hook");
             GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            Client client = cluster.getInternalNodeClient();
            try {
                client.admin().indices().create(new CreateIndexRequest("testsink_put_watch_with_credentials")).actionGet();
                Watch watch = new WatchBuilder("put_test").atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().postWebhook(webhookProvider.getUri()).throttledFor("0").name("testhook").build();
                GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                Thread.sleep(600L);
                Assert.assertTrue((webhookProvider.getRequestCount() > 0 ? 1 : 0) != 0);
                response = restClient.putJson("/_signals/settings/http.allowed_endpoints", "[\"https://unkown*\",\"https://whatever*\"]", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                response = restClient.get("/_signals/settings/http.allowed_endpoints", new Header[0]);
                Thread.sleep(300L);
                long requestCount = webhookProvider.getRequestCount();
                Thread.sleep(600L);
                Assert.assertEquals((long)requestCount, (long)webhookProvider.getRequestCount());
            }
            finally {
                restClient.putJson("/_signals/settings/http.allowed_endpoints", "[\"*\"]", new Header[0]);
                restClient.delete(watchPath, new Header[0]);
            }
        }
    }

    @Test
    public void testWebhookTruststore() throws Exception {
        String tenant = "_main";
        String watchId = "webhook-with-truststore";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (MockWebserviceProvider webhookProvider = new MockWebserviceProvider("/hook", true, false);
             GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            webhookProvider.uploadMockServerCertificateAsTruststore((LocalCluster)cluster, USER_CERTIFICATE, UPLOADED_TRUSTSTORE_ID);
            Watch watch = new WatchBuilder("tls-webhook-test").atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().postWebhook(webhookProvider.getUri()).truststoreId(UPLOADED_TRUSTSTORE_ID).throttledFor("0").name("testhook").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            Thread.sleep(600L);
            Assert.assertTrue((webhookProvider.getRequestCount() > 0 ? 1 : 0) != 0);
        }
    }

    @Test
    public void testExecuteAnonymousWatchWhichUsesStoredTruststore() throws Exception {
        String tenant = "_main";
        String executePath = "/_signals/watch/" + tenant + "/_execute";
        try (MockWebserviceProvider webhookProvider = new MockWebserviceProvider("/hook", true, false);
             GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            webhookProvider.uploadMockServerCertificateAsTruststore((LocalCluster)cluster, USER_CERTIFICATE, UPLOADED_TRUSTSTORE_ID);
            Watch watch = new WatchBuilder("tls-webhook-execute-test").atMsInterval(100L).put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().postWebhook(webhookProvider.getUri()).truststoreId(UPLOADED_TRUSTSTORE_ID).throttledFor("0").name("testhook").build();
            GenericRestClient.HttpResponse response = restClient.postJson(executePath, "{\"watch\":" + watch.toJson() + ",\"skip_actions\": false, \"simulate\": false}", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Thread.sleep(600L);
            Assert.assertTrue((webhookProvider.getRequestCount() > 0 ? 1 : 0) != 0);
        }
    }

    @Test
    public void testWebhookTruststoreFailureWithoutCorrectTruststore() throws Exception {
        String tenant = "_main";
        String watchId = "webhook-missing-truststore-configuration";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (MockWebserviceProvider webhookProvider = new MockWebserviceProvider("/hook", true, false);
             GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest("testsink-" + watchId)).actionGet();
            Watch watch = new WatchBuilder("tls-webhook-test").atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().postWebhook(webhookProvider.getUri()).throttledFor("0").name("testhook").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            Thread.sleep(600L);
            Assert.assertTrue((webhookProvider.getRequestCount() == 0 ? 1 : 0) != 0);
        }
    }

    @Test
    public void shouldNotCreateWatchWhenWatchContainsIncorrectTruststoreId() throws Exception {
        String tenant = "_main";
        String watchId = "webhook-incorrect-truststore-id";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (MockWebserviceProvider webhookProvider = new MockWebserviceProvider("/hook", true, false);
             GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest("testsink-" + watchId)).actionGet();
            Watch watch = new WatchBuilder("tls-webhook-test").atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().postWebhook(webhookProvider.getUri()).truststoreId("not-existing-truststore-id").throttledFor("0").name("testhook").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)400L, (long)response.getStatusCode());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHttpDefaultProxy() throws Exception {
        String tenant = "_main";
        String watchId = "http_default_proxy";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (MockWebserviceProvider webhookProvider = new MockWebserviceProvider("/hook");
             GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            try {
                webhookProvider.acceptOnlyRequestsWithHeader(REQUEST_HEADER_ADDING_FILTER.getHeader());
                Watch watch = new WatchBuilder("put_test").atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().postWebhook(webhookProvider.getUri()).throttledFor("0").name("testhook").build();
                GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                Thread.sleep(600L);
                Assert.assertEquals((long)0L, (long)webhookProvider.getRequestCount());
                response = restClient.putJson("/_signals/settings/http.proxy", "\"http://127.0.0.8:" + this.wireMockProxy.port() + "\"", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                response = restClient.get("/_signals/settings/http.proxy", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                Assert.assertEquals((Object)response.getBody(), (Object)("\"http://127.0.0.8:" + this.wireMockProxy.port() + "\""));
                Thread.sleep(600L);
                Assert.assertTrue((webhookProvider.getRequestCount() > 0 ? 1 : 0) != 0);
                response = restClient.delete("/_signals/settings/http.proxy", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            }
            finally {
                restClient.delete(watchPath, new Header[0]);
                restClient.delete("/_signals/settings/http.proxy", new Header[0]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHttpExplicitProxy() throws Exception {
        String tenant = "_main";
        String watchId = "http_explicit_proxy";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (MockWebserviceProvider webhookProvider = new MockWebserviceProvider("/hook");
             GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            try {
                webhookProvider.acceptOnlyRequestsWithHeader(REQUEST_HEADER_ADDING_FILTER.getHeader());
                Watch watch = new WatchBuilder("put_test").atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().postWebhook(webhookProvider.getUri()).throttledFor("0").name("testhook").build();
                GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                Thread.sleep(600L);
                Assert.assertEquals((long)0L, (long)webhookProvider.getRequestCount());
                watch = new WatchBuilder("put_test").atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().postWebhook(webhookProvider.getUri()).proxy("http://127.0.0.8:" + this.wireMockProxy.port()).throttledFor("0").name("testhook").build();
                response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                Thread.sleep(600L);
                Assert.assertTrue((webhookProvider.getRequestCount() > 0 ? 1 : 0) != 0);
            }
            finally {
                restClient.delete(watchPath, new Header[0]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHttpExplicitNoProxy() throws Exception {
        String tenant = "_main";
        String watchId = "http_explicit_no_proxy";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (MockWebserviceProvider webhookProvider = new MockWebserviceProvider("/hook");
             GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            try {
                GenericRestClient.HttpResponse response = restClient.putJson("/_signals/settings/http.proxy", "\"http://127.0.0.8:" + this.wireMockProxy.port() + "\"", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                Thread.sleep(200L);
                Watch watch = new WatchBuilder("put_test").atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().postWebhook(webhookProvider.getUri()).proxy("none").throttledFor("0").name("testhook").build();
                response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                Thread.sleep(600L);
                Assert.assertTrue((webhookProvider.getRequestCount() > 0 ? 1 : 0) != 0);
                Assert.assertEquals((Object)webhookProvider.getLastRequestClientAddress(), (Object)InetAddress.getByName("127.0.0.1"));
            }
            finally {
                restClient.delete(watchPath, new Header[0]);
                restClient.delete("/_signals/settings/http.proxy", new Header[0]);
            }
        }
    }

    @Test
    public void testWatchWithProxyLoadedFromConfiguration() throws Exception {
        String tenant = "_main";
        String watchId = "http_proxy_loaded_from_config";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String proxyId = "proxy-1";
        String proxyPath = "/_signals/proxies/" + proxyId;
        try (MockWebserviceProvider webhookProvider = new MockWebserviceProvider("/hook");
             GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            webhookProvider.acceptOnlyRequestsWithHeader(REQUEST_HEADER_ADDING_FILTER.getHeader());
            GenericRestClient.HttpResponse response = restClient.putJson(proxyPath, (Document)DocNode.of((String)"name", (Object)"proxy", (String)"uri", (Object)("http://127.0.0.8:" + this.wireMockProxy.port())));
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Watch watch = new WatchBuilder("put_test").atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().postWebhook(webhookProvider.getUri()).throttledFor("0").name("testhook").build();
            response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            Thread.sleep(600L);
            Assert.assertEquals((long)0L, (long)webhookProvider.getRequestCount());
            watch = new WatchBuilder("put_test").atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().postWebhook(webhookProvider.getUri()).proxy(proxyId).throttledFor("0").name("testhook").build();
            response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Thread.sleep(600L);
            Assert.assertTrue((webhookProvider.getRequestCount() > 0 ? 1 : 0) != 0);
        }
    }

    @Test
    public void testWatchWithProxyLoadedFromConfiguration_givenProxyDoesNotExist() throws Exception {
        String tenant = "_main";
        String watchId = "http_proxy_loaded_from_config";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (MockWebserviceProvider webhookProvider = new MockWebserviceProvider("/hook");
             GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Watch watch = new WatchBuilder("put_test").atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().postWebhook(webhookProvider.getUri()).proxy("missing-proxy-id").throttledFor("0").name("testhook").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)400L, (long)response.getStatusCode());
            Assert.assertEquals((Object)"Http proxy 'missing-proxy-id' not found.", (Object)((DocNode)response.getBodyAsDocNode().getAsNode("detail").getAsListOfNodes("actions[testhook].proxy").get(0)).get("error"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Ignore(value="TODO why is this ignored?")
    @Test
    public void testPutWatchWithCredentials() throws Exception {
        String tenant = "_main";
        String watchId = "put_watch_with_credentials";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (MockWebserviceProvider webhookProvider = new MockWebserviceProvider("/hook");
             GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            try {
                client.admin().indices().create(new CreateIndexRequest("testsink_put_watch_with_credentials")).actionGet();
                Watch watch = new WatchBuilder("put_test").cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().postWebhook(webhookProvider.getUri()).basicAuth("admin", "secret").name("testhook").build();
                GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                response = restClient.get(watchPath + "?pretty", new Header[0]);
                Assert.assertFalse((String)response.getBody(), (boolean)response.getBody().contains("secret"));
                Assert.assertTrue((String)response.getBody(), (boolean)response.getBody().contains("password__protected"));
                Thread.sleep(3000L);
                Assert.assertEquals((long)1L, (long)webhookProvider.getRequestCount());
            }
            finally {
                restClient.delete(watchPath, new Header[0]);
            }
        }
    }

    @Test
    public void testPutWatchWithUnauthorizedAction() throws Exception {
        String tenant = "_main";
        String watchId = "put_watch_with_unauth_action";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient("redshirt1", "redshirt", new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest("testsink_put_watch_with_unauth_action")).actionGet();
            Watch watch = new WatchBuilder("put_test").cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink_put_watch_with_unauth_action").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            WatchLog watchLog = this.awaitWatchLog(client, tenant, watchId);
            Assert.assertEquals((String)watchLog.toString(), (Object)Status.Code.ACTION_FAILED, (Object)watchLog.getStatus().getCode());
            ActionLog actionLog = (ActionLog)watchLog.getActions().get(0);
            Assert.assertEquals((String)actionLog.toString(), (Object)Status.Code.ACTION_FAILED, (Object)actionLog.getStatus().getCode());
            Assert.assertTrue((String)actionLog.toString(), (boolean)actionLog.getStatus().getDetail().contains("Insufficient permissions"));
        }
    }

    @Test
    public void testPutWatchWithTenant() throws Exception {
        String tenant = "test1";
        String watchId = "put_watch_with_tenant";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String watchPathWithWrongTenant = "/_signals/watch/_main/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest("testsink_put_watch_with_tenant")).actionGet();
            Watch watch = new WatchBuilder("put_test").cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink_put_watch_with_tenant").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.get(watchPath, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            watch = Watch.parseFromElasticDocument((WatchInitializationService)this.watchInitializationService, (String)"test", (String)"put_test", (String)response.getBody(), (long)-1L);
            response = restClient.get(watchPathWithWrongTenant, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)404L, (long)response.getStatusCode());
        }
    }

    @Test
    public void testPutWatchWithTenant2() throws Exception {
        String tenant = "redshirt_club";
        String watchId = "put_watch_with_tenant2";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String watchPathWithWrongTenant = "/_signals/watch/_main/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient("redshirt3", "redshirt", new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest("testsink_put_watch_with_tenant2")).actionGet();
            Watch watch = new WatchBuilder("put_test").cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink_put_watch_with_tenant2").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.get(watchPath, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            watch = Watch.parseFromElasticDocument((WatchInitializationService)this.watchInitializationService, (String)"test", (String)"put_test", (String)response.getBody(), (long)-1L);
            response = restClient.get(watchPathWithWrongTenant, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)404L, (long)response.getStatusCode());
        }
    }

    @Test
    public void testPutWatchWithUnauthorizedTenant() throws Exception {
        String tenant = "test1";
        String watchId = "put_watch_with_unauthorized_tenant";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient("redshirt1", "redshirt", new Header[0]).trackResources();){
            Watch watch = new WatchBuilder("put_test").cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink_put_watch_with_tenant").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)403L, (long)response.getStatusCode());
        }
    }

    @Test
    public void testDeleteWatch() throws Exception {
        String tenant = "_main";
        String watchId = "delete_watch";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest("testsink_delete_watch")).actionGet();
            Watch watch = new WatchBuilder("put_test").atMsInterval(10L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink_delete_watch").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.get(watchPath, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            this.awaitMinCountOfDocuments(client, "testsink_delete_watch", 1L);
            restClient.delete(watchPath, new Header[0]);
            response = restClient.get(watchPath, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)404L, (long)response.getStatusCode());
            Thread.sleep(1500L);
            long docCount = this.getCountOfDocuments(client, "testsink_delete_watch");
            Thread.sleep(1000L);
            long newDocCount = this.getCountOfDocuments(client, "testsink_delete_watch");
            Assert.assertEquals((long)docCount, (long)newDocCount);
        }
    }

    @Test
    public void testExecuteAnonymousWatch() throws Exception {
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            Watch watch = new WatchBuilder("execution_test_anon").cronTrigger("*/2 * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.postJson("/_signals/watch/_main/_execute", "{\"watch\": " + watch.toJson() + "}", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
        }
    }

    @Test
    public void testExecuteWatchById() throws Exception {
        String tenant = "_main";
        String watchId = "execution_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Watch watch = new WatchBuilder(watchId).cronTrigger("0 0 */1 * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.postJson(watchPath + "/_execute", "{}", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
        }
    }

    @Test
    public void testExecuteWatchByIdWhichUsesUploadedTruststore() throws Exception {
        String tenant = "_main";
        String watchId = "tls_execution_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();
             MockWebserviceProvider webhookProvider = new MockWebserviceProvider("/tls_endpoint", true, false);){
            webhookProvider.uploadMockServerCertificateAsTruststore((LocalCluster)cluster, USER_CERTIFICATE, UPLOADED_TRUSTSTORE_ID);
            Watch watch = new WatchBuilder(watchId).cronTrigger("0 0 */1 * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().postWebhook(webhookProvider.getUri()).truststoreId(UPLOADED_TRUSTSTORE_ID).throttledFor("0").name("send-http-request").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.postJson(watchPath + "/_execute", "{}", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            DocNode body = response.getBodyAsDocNode();
            MatcherAssert.assertThat((Object)body, (org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"status.code", (Object)"ACTION_EXECUTED"));
            MatcherAssert.assertThat((Object)webhookProvider.getRequestCount(), (org.hamcrest.Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(0)));
        }
    }

    @Test
    public void testExecuteAnonymousWatchWithGoto() throws Exception {
        String testSink = "testsink_anon_watch_with_goto";
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            Client client = cluster.getInternalNodeClient();
            Watch watch = new WatchBuilder("execution_test_anon").cronTrigger("*/2 * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}, \"x\": \"1\"}").as("teststatic").then().index(testSink).docId("1").refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.postJson("/_signals/watch/_main/_execute", "{\"watch\": " + watch.toJson() + ", \"goto\": \"teststatic\"}", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            GetResponse getResponse = (GetResponse)client.get(new GetRequest(testSink, "1")).actionGet();
            Assert.assertTrue((String)getResponse.toString(), (getResponse.getSource().get("testsource") == null ? 1 : 0) != 0);
            Assert.assertTrue((String)getResponse.toString(), (getResponse.getSource().get("teststatic") != null ? 1 : 0) != 0);
        }
    }

    @Test
    public void testExecuteWatchByIdWhichUsesStoredProxyConfig() throws Exception {
        String tenant = "_main";
        String watchId = "stored_proxy_exec_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String proxyId = "stored-proxy";
        String proxyPath = "/_signals/proxies/" + proxyId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();
             MockWebserviceProvider webhookProvider = new MockWebserviceProvider("/tls_endpoint");){
            webhookProvider.acceptOnlyRequestsWithHeader(REQUEST_HEADER_ADDING_FILTER.getHeader());
            GenericRestClient.HttpResponse response = restClient.putJson(proxyPath, (Document)DocNode.of((String)"name", (Object)"stored-proxy", (String)"uri", (Object)("http://127.0.0.8:" + this.wireMockProxy.port())));
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Watch watch = new WatchBuilder("test_with_stored_proxy").cronTrigger("0 0 */1 * * ?").then().postWebhook(webhookProvider.getUri()).proxy(proxyId).name("webhook").build();
            response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.postJson(watchPath + "/_execute", "{}", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            DocNode body = response.getBodyAsDocNode();
            MatcherAssert.assertThat((Object)body, (org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"status.code", (Object)"ACTION_EXECUTED"));
            MatcherAssert.assertThat((Object)webhookProvider.getRequestCount(), (org.hamcrest.Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(0)));
        }
    }

    @Test
    public void testExecuteAnonymousWatchWithInput() throws Exception {
        String testSink = "testsink_anon_watch_with_input";
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            Client client = cluster.getInternalNodeClient();
            Watch watch = new WatchBuilder("execution_test_anon").cronTrigger("*/2 * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}, \"x\": \"1\"}").as("teststatic").then().index(testSink).docId("1").refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.postJson("/_signals/watch/_main/_execute", "{\"watch\": " + watch.toJson() + ", \"goto\": \"_actions\", \"input\": { \"ext_input\": \"a\"}}", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            GetResponse getResponse = (GetResponse)client.get(new GetRequest(testSink, "1")).actionGet();
            Assert.assertTrue((String)getResponse.toString(), (getResponse.getSource().get("testsource") == null ? 1 : 0) != 0);
            Assert.assertTrue((String)getResponse.toString(), (getResponse.getSource().get("teststatic") == null ? 1 : 0) != 0);
            Assert.assertTrue((String)getResponse.toString(), (getResponse.getSource().get("ext_input") != null ? 1 : 0) != 0);
        }
    }

    @Test
    public void testExecuteAnonymousWatchWithShowAllRuntimeAttributes() throws Exception {
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            Watch watch = new WatchBuilder("execution_test_anon").cronTrigger("*/2 * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").consider("data.testsearch.hits.total.value").greaterOrEqual(1.0).as(SeverityLevel.ERROR).when(SeverityLevel.ERROR, new SeverityLevel[0]).index("testsink").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.postJson("/_signals/watch/_main/_execute", "{\"watch\": " + watch.toJson() + ", \"show_all_runtime_attributes\": true}", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            DocNode responseJson = response.getBodyAsDocNode();
            Assert.assertEquals((String)response.getBody(), (Object)"error", (Object)responseJson.get("runtime_attributes", new String[]{"severity", "level"}));
            Assert.assertFalse((String)response.getBody(), (responseJson.get("runtime_attributes", new String[]{"trigger"}) == null ? 1 : 0) != 0);
            Assert.assertTrue((String)response.getBody(), (responseJson.get("runtime_attributes", new String[]{"trigger", "triggered_time"}) == null ? 1 : 0) != 0);
            Assert.assertEquals((String)response.getBody(), (Object)"42", (Object)responseJson.get("runtime_attributes", new String[]{"data", "teststatic", "bla", "blub"}).toString());
        }
    }

    @Test
    public void testActivateWatchAuth() throws Exception {
        String tenant = "_main";
        String watchId = "activate_auth_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Watch watch = new WatchBuilder("deactivate_test").inactive().atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.putJson(watchPath + "/_active", "", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            watch = this.getWatchByRest(tenant, watchId, restClient);
            Assert.assertEquals((Object)true, (Object)watch.isActive());
            response = restClient.delete(watchPath + "/_active", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            watch = this.getWatchByRest(tenant, watchId, restClient);
            Assert.assertFalse((boolean)watch.isActive());
            response = restClient.delete(watchPath + "/_active", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            watch = this.getWatchByRest(tenant, watchId, restClient);
            Assert.assertFalse((boolean)watch.isActive());
            response = restClient.putJson(watchPath + "/_active", "", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            watch = this.getWatchByRest(tenant, watchId, restClient);
            Assert.assertTrue((boolean)watch.isActive());
        }
    }

    @Test
    public void testDeactivateWatch() throws Exception {
        String tenant = "_main";
        String watchId = "deactivate_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest("testsink_deactivate_watch")).actionGet();
            Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink_deactivate_watch").throttledFor("0").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            this.awaitMinCountOfDocuments(client, "testsink_deactivate_watch", 1L);
            response = restClient.delete(watchPath + "/_active", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Watch updatedWatch = this.getWatchByRest(tenant, watchId, restClient);
            Assert.assertFalse((boolean)updatedWatch.isActive());
            Thread.sleep(1500L);
            long executionCountWhenDeactivated = this.getCountOfDocuments(client, "testsink_deactivate_watch");
            Thread.sleep(1000L);
            long lastExecutionCount = this.getCountOfDocuments(client, "testsink_deactivate_watch");
            Assert.assertEquals((long)executionCountWhenDeactivated, (long)lastExecutionCount);
            response = restClient.putJson(watchPath + "/_active", "", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            this.awaitMinCountOfDocuments(client, "testsink_deactivate_watch", lastExecutionCount + 1L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeactivateTenant() throws Exception {
        GenericRestClient restClient;
        String tenant = "_main";
        String watchId = "deactivate_tenant_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String testSink = "testsink_" + watchId;
        try {
            restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);
            try {
                Client client = cluster.getInternalNodeClient();
                client.admin().indices().create(new CreateIndexRequest(testSink)).actionGet();
                Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index(testSink).throttledFor("0").name("testsink").build();
                GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                this.awaitMinCountOfDocuments(client, testSink, 1L);
                response = restClient.delete("/_signals/tenant/" + tenant + "/_active", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                Thread.sleep(210L);
                long executionCountWhenDeactivated = this.getCountOfDocuments(client, testSink);
                Thread.sleep(310L);
                long lastExecutionCount = this.getCountOfDocuments(client, testSink);
                Assert.assertEquals((long)executionCountWhenDeactivated, (long)lastExecutionCount);
                response = restClient.put("/_signals/tenant/" + tenant + "/_active");
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                this.awaitMinCountOfDocuments(client, testSink, lastExecutionCount + 1L);
            }
            finally {
                if (restClient != null) {
                    restClient.close();
                }
            }
        }
        finally {
            restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);
            try {
                restClient.put("/_signals/tenant/" + tenant + "/_active");
                restClient.delete(watchPath, new Header[0]);
            }
            finally {
                if (restClient != null) {
                    restClient.close();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeactivateGlobally() throws Exception {
        GenericRestClient restClient;
        String tenant = "_main";
        String watchId = "deactivate_globally_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String testSink = "testsink_" + watchId;
        try {
            restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);
            try {
                Client client = cluster.getInternalNodeClient();
                client.admin().indices().create(new CreateIndexRequest(testSink)).actionGet();
                Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index(testSink).throttledFor("0").name("testsink").build();
                GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                this.awaitMinCountOfDocuments(client, testSink, 1L);
                response = restClient.delete("/_signals/admin/_active", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                Thread.sleep(210L);
                long executionCountWhenDeactivated = this.getCountOfDocuments(client, testSink);
                Thread.sleep(310L);
                long lastExecutionCount = this.getCountOfDocuments(client, testSink);
                Assert.assertEquals((long)executionCountWhenDeactivated, (long)lastExecutionCount);
                response = restClient.put("/_signals/admin/_active");
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                this.awaitMinCountOfDocuments(client, testSink, lastExecutionCount + 1L);
            }
            finally {
                if (restClient != null) {
                    restClient.close();
                }
            }
        }
        finally {
            restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);
            try {
                restClient.put("/_signals/admin/_active");
                restClient.delete(watchPath, new Header[0]);
            }
            finally {
                if (restClient != null) {
                    restClient.close();
                }
            }
        }
    }

    @Test
    public void testAckWatch() throws Exception {
        String tenant = "_main";
        String watchId = "ack_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest("testsource_ack_watch")).actionGet();
            client.admin().indices().create(new CreateIndexRequest("testsink_ack_watch")).actionGet();
            Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search("testsource_ack_watch").query("{\"match_all\" : {} }").as("testsearch").checkCondition("data.testsearch.hits.hits.length > 0").then().index("testsink_ack_watch").refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name("testaction").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            Thread.sleep(220L);
            response = restClient.put(watchPath + "/_ack/testaction");
            Assert.assertEquals((String)response.getBody(), (long)412L, (long)response.getStatusCode());
            client.index(((IndexRequest)new IndexRequest("testsource_ack_watch").id("1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, new Object[]{"key1", "val1", "key2", "val2"})).actionGet();
            this.awaitMinCountOfDocuments(client, "testsink_ack_watch", 1L);
            response = restClient.put(watchPath + "/_ack/testaction");
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Thread.sleep(500L);
            response = restClient.get(watchPath + "/_state", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            DocNode statusDoc = response.getBodyAsDocNode();
            Assert.assertEquals((String)response.getBody(), (Object)USERNAME_UHURA, (Object)statusDoc.get("actions", new String[]{"testaction", "acked", "by"}).toString());
            Thread.sleep(200L);
            long executionCountAfterAck = this.getCountOfDocuments(client, "testsink_ack_watch");
            Thread.sleep(310L);
            long currentExecutionCount = this.getCountOfDocuments(client, "testsink_ack_watch");
            Assert.assertEquals((long)executionCountAfterAck, (long)currentExecutionCount);
            client.delete((DeleteRequest)new DeleteRequest("testsource_ack_watch", "1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).actionGet();
            Thread.sleep(310L);
            currentExecutionCount = this.getCountOfDocuments(client, "testsink_ack_watch");
            Assert.assertEquals((long)executionCountAfterAck, (long)currentExecutionCount);
            response = restClient.get(watchPath + "/_state", new Header[0]);
            statusDoc = response.getBodyAsDocNode();
            Assert.assertFalse((String)response.getBody(), (boolean)statusDoc.getAsNode("actions").getAsNode("testaction").hasNonNull("acked"));
            client.index(((IndexRequest)new IndexRequest("testsource_ack_watch").id("2").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, new Object[]{"key1", "val1", "key2", "val2"})).actionGet();
            this.awaitMinCountOfDocuments(client, "testsink_ack_watch", executionCountAfterAck + 1L);
            currentExecutionCount = this.getCountOfDocuments(client, "testsink_ack_watch");
            Assert.assertNotEquals((long)executionCountAfterAck, (long)currentExecutionCount);
            response = restClient.delete(watchPath + "/_active", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
        }
    }

    @Test
    public void testAckAndGetWatchWithSingleAction() throws Exception {
        String tenant = "_main";
        String watchId = "ack_and_get_test_watch_with_single_action";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String watchedIndex = "source_index_for_watch_" + watchId;
        String sinkIndex = "sink_index_" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest(watchedIndex)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(sinkIndex)).actionGet();
            Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search(watchedIndex).query("{\"match_all\" : {} }").as("testsearch").checkCondition("data.testsearch.hits.hits.length > 0").then().index(sinkIndex).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name("testaction").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            Thread.sleep(220L);
            client.index(((IndexRequest)new IndexRequest(watchedIndex).id("1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, new Object[]{"key1", "val1", "key2", "val2"})).actionGet();
            this.awaitMinCountOfDocuments(client, sinkIndex, 1L);
            response = restClient.put(watchPath + "/_ack_and_get/testaction");
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            DocNode ackDoc = response.getBodyAsDocNode();
            Assert.assertEquals((Object)"testaction", (Object)((DocNode)ackDoc.getAsListOfNodes("acked").get(0)).get("action_id"));
            Assert.assertEquals((Object)USERNAME_UHURA, (Object)((DocNode)ackDoc.getAsListOfNodes("acked").get(0)).get("by_user"));
        }
    }

    @Test
    public void testAckAndGetShouldReturnErrorResponseWhenActionDoesNotExists() throws Exception {
        String tenant = "_main";
        String watchId = "ack_and_get_test_watch_with_non_existing_action";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String watchedIndex = "source_index_for_watch_" + watchId;
        String sinkIndex = "sink_index_" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest(watchedIndex)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(sinkIndex)).actionGet();
            Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search(watchedIndex).query("{\"match_all\" : {} }").as("testsearch").checkCondition("data.testsearch.hits.hits.length > 0").then().index(sinkIndex).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name("testaction").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            Thread.sleep(220L);
            client.index(((IndexRequest)new IndexRequest(watchedIndex).id("1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, new Object[]{"key1", "val1", "key2", "val2"})).actionGet();
            this.awaitMinCountOfDocuments(client, sinkIndex, 1L);
            response = restClient.put(watchPath + "/_ack_and_get/non_existing_action_id");
            Assert.assertEquals((String)response.getBody(), (long)404L, (long)response.getStatusCode());
        }
    }

    @Test
    public void testAckAndGetForOneActionForWatchWithDoubleAction() throws Exception {
        String tenant = "_main";
        String watchId = "ack_and_get_for_one_action_watch_with_double_action_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String watchedIndex = "source_index_for_watch_" + watchId;
        String sinkIndex = "sink_index_" + watchId;
        String additionalSinkIndex = "additional_sink_index_" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest(watchedIndex)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(sinkIndex)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(additionalSinkIndex)).actionGet();
            Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search(watchedIndex).query("{\"match_all\" : {} }").as("testsearch").checkCondition("data.testsearch.hits.hits.length > 0").then().index(sinkIndex).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name("testactionone").and().index(additionalSinkIndex).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name("testactiontwo").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            Thread.sleep(220L);
            response = restClient.put(watchPath + "/_ack/testactionone");
            Assert.assertEquals((String)response.getBody(), (long)412L, (long)response.getStatusCode());
            client.index(((IndexRequest)new IndexRequest(watchedIndex).id("1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, new Object[]{"key1", "val1", "key2", "val2"})).actionGet();
            this.awaitMinCountOfDocuments(client, sinkIndex, 1L);
            this.awaitMinCountOfDocuments(client, additionalSinkIndex, 1L);
            response = restClient.put(watchPath + "/_ack_and_get/testactionone");
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            DocNode ackDoc = response.getBodyAsDocNode();
            Assert.assertEquals((Object)"testactionone", (Object)((DocNode)ackDoc.getAsListOfNodes("acked").get(0)).get("action_id"));
            Assert.assertEquals((Object)USERNAME_UHURA, (Object)((DocNode)ackDoc.getAsListOfNodes("acked").get(0)).get("by_user"));
            RestApiTest.assertThatActionIsNotAcked(restClient, watchPath, "testactiontwo");
        }
    }

    @Test
    public void testAckAndGetForInactiveWatchShouldReturnErrorResponse() throws Exception {
        String tenant = "_main";
        String watchId = "ack_and_get_for_inactive_watch_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String watchedIndex = "source_index_for_watch_" + watchId;
        String sinkIndex = "sink_index_" + watchId;
        String additionalSinkIndex = "additional_sink_index_" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest(watchedIndex)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(sinkIndex)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(additionalSinkIndex)).actionGet();
            Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search(watchedIndex).query("{\"match_all\" : {} }").as("testsearch").checkCondition("data.testsearch.hits.hits.length > 0").then().index(sinkIndex).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name("testactionone").and().index(additionalSinkIndex).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name("testactiontwo").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            Thread.sleep(220L);
            response = restClient.put(watchPath + "/_ack_and_get/testactionone");
            Assert.assertEquals((String)response.getBody(), (long)412L, (long)response.getStatusCode());
        }
    }

    private static void assertThatActionIsNotAcked(GenericRestClient restClient, String watchPath, String actionName) throws Exception {
        Thread.sleep(500L);
        GenericRestClient.HttpResponse response = restClient.get(watchPath + "/_state", new Header[0]);
        Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
        DocNode statusDoc = response.getBodyAsDocNode();
        Assert.assertEquals((Object)"ACTION_EXECUTED", (Object)statusDoc.get("actions", new String[]{actionName, "last_status", "code"}));
        Assert.assertTrue((boolean)statusDoc.getAsNode("actions", new String[]{actionName, "acked"}).isNull());
    }

    @Test
    public void testAckAndGetBothActionForWatchWithTwoAction() throws Exception {
        String tenant = "_main";
        String watchId = "ack_and_get_both_action_for_watch_with_two_actions_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String watchedIndex = "source_index_for_watch_" + watchId;
        String sinkIndex = "sink_index_" + watchId;
        String additionalSinkIndex = "additional_sink_index_" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest(watchedIndex)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(sinkIndex)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(additionalSinkIndex)).actionGet();
            Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search(watchedIndex).query("{\"match_all\" : {} }").as("testsearch").checkCondition("data.testsearch.hits.hits.length > 0").then().index(sinkIndex).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name("testactionone").and().index(additionalSinkIndex).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name("testactiontwo").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            Thread.sleep(220L);
            client.index(((IndexRequest)new IndexRequest(watchedIndex).id("1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, new Object[]{"key1", "val1", "key2", "val2"})).actionGet();
            this.awaitMinCountOfDocuments(client, sinkIndex, 1L);
            this.awaitMinCountOfDocuments(client, additionalSinkIndex, 1L);
            response = restClient.put(watchPath + "/_ack_and_get");
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            DocNode ackDoc = response.getBodyAsDocNode();
            Assert.assertEquals((long)2L, (long)ackDoc.getAsListOfNodes("acked").size());
            Assert.assertEquals((Object)ImmutableSet.of((Object)"testactionone", (Object)"testactiontwo"), (Object)ImmutableSet.of((Collection)ackDoc.findByJsonPath("acked[*].action_id")));
        }
    }

    @Test
    public void testDeAckAndGetFirstActionForWatchWithTwoAction() throws Exception {
        String tenant = "_main";
        String watchId = "deack_and_get_first_action_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String watchedIndex = "source_index_for_watch_" + watchId;
        String sinkIndex = "sink_index_" + watchId;
        String additionalSinkIndex = "additional_sink_index_" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest(watchedIndex)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(sinkIndex)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(additionalSinkIndex)).actionGet();
            Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search(watchedIndex).query("{\"match_all\" : {} }").as("testsearch").checkCondition("data.testsearch.hits.hits.length > 0").then().index(sinkIndex).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name("testactionone").and().index(additionalSinkIndex).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name("testactiontwo").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            Thread.sleep(220L);
            client.index(((IndexRequest)new IndexRequest(watchedIndex).id("1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, new Object[]{"key1", "val1", "key2", "val2"})).actionGet();
            this.awaitMinCountOfDocuments(client, sinkIndex, 1L);
            this.awaitMinCountOfDocuments(client, additionalSinkIndex, 1L);
            response = restClient.put(watchPath + "/_ack_and_get");
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Thread.sleep(220L);
            response = restClient.delete(watchPath + "/_ack_and_get/testactionone", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            DocNode unackDoc = response.getBodyAsDocNode();
            Assert.assertEquals(Arrays.asList("testactionone"), (Object)unackDoc.findByJsonPath("unacked_action_ids[0]"));
            RestApiTest.assertThatActionIsNotAcked(restClient, watchPath, "testactionone");
        }
    }

    @Test
    public void testDeAckAndGetForNotExistingActionShouldReturnErrorResponse() throws Exception {
        String tenant = "_main";
        String watchId = "deack_and_get_non_existing_action_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String watchedIndex = "source_index_for_watch_" + watchId;
        String sinkIndex = "sink_index_" + watchId;
        String additionalSinkIndex = "additional_sink_index_" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest(watchedIndex)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(sinkIndex)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(additionalSinkIndex)).actionGet();
            Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search(watchedIndex).query("{\"match_all\" : {} }").as("testsearch").checkCondition("data.testsearch.hits.hits.length > 0").then().index(sinkIndex).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name("testactionone").and().index(additionalSinkIndex).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name("testactiontwo").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            Thread.sleep(220L);
            client.index(((IndexRequest)new IndexRequest(watchedIndex).id("1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, new Object[]{"key1", "val1", "key2", "val2"})).actionGet();
            this.awaitMinCountOfDocuments(client, sinkIndex, 1L);
            this.awaitMinCountOfDocuments(client, additionalSinkIndex, 1L);
            response = restClient.put(watchPath + "/_ack_and_get");
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Thread.sleep(220L);
            response = restClient.delete(watchPath + "/_ack_and_get/this_action_does_not_exist", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)404L, (long)response.getStatusCode());
        }
    }

    @Test
    public void testUnAckOfFreshWatch() throws Exception {
        String tenant = "_main";
        String watchId = "unack_of_fresh_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest("testsource_unack_watch")).actionGet();
            client.admin().indices().create(new CreateIndexRequest("testsink_unack_watch")).actionGet();
            Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search("testsource_unack_watch").query("{\"match_all\" : {} }").as("testsearch").checkCondition("data.testsearch.hits.hits.length > 0").then().index("testsink_unack_watch").refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name("testaction").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            Thread.sleep(1000L);
            response = restClient.delete(watchPath + "/_ack", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)412L, (long)response.getStatusCode());
            Assert.assertEquals((String)response.getBody(), (Object)"No actions are in an un-acknowlegable state", (Object)response.getBodyAsDocNode().get("error"));
        }
    }

    @Test
    public void testAckWithUnacknowledgableActions() throws Exception {
        String tenant = "_main";
        String watchId = "ack_with_unack_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String testSource = "testsource_" + watchId;
        String testSinkAck = "testsink_ack_" + watchId;
        String testSinkUnack = "testsink_unack_" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest(testSource)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(testSinkAck)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(testSinkUnack)).actionGet();
            Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search(testSource).query("{\"match_all\" : {} }").as("testsearch").checkCondition("data.testsearch.hits.hits.length > 0").then().index(testSinkAck).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name("testaction_ack").and().index(testSinkUnack).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).ackEnabled(false).throttledFor("0").name("testaction_unack").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            Thread.sleep(220L);
            response = restClient.put(watchPath + "/_ack");
            Assert.assertEquals((String)response.getBody(), (long)412L, (long)response.getStatusCode());
            client.index(((IndexRequest)new IndexRequest(testSource).id("1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, new Object[]{"key1", "val1", "key2", "val2"})).actionGet();
            this.awaitMinCountOfDocuments(client, testSinkAck, 1L);
            this.awaitMinCountOfDocuments(client, testSinkUnack, 1L);
            response = restClient.put(watchPath + "/_ack");
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Thread.sleep(500L);
            response = restClient.get(watchPath + "/_state", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            DocNode statusDoc = response.getBodyAsDocNode();
            Assert.assertEquals((String)response.getBody(), (Object)USERNAME_UHURA, (Object)statusDoc.get("actions", new String[]{"testaction_ack", "acked", "by"}).toString());
            Assert.assertTrue((String)response.getBody(), (statusDoc.get("actions", new String[]{"testaction_unack", "acked"}) == null ? 1 : 0) != 0);
            Thread.sleep(200L);
            long testSinkAckExecutionCountAfterAck = this.getCountOfDocuments(client, testSinkAck);
            long testSinkUnackExecutionCountAfterAck = this.getCountOfDocuments(client, testSinkUnack);
            Thread.sleep(310L);
            Assert.assertEquals((long)testSinkAckExecutionCountAfterAck, (long)this.getCountOfDocuments(client, testSinkAck));
            Assert.assertNotEquals((long)testSinkUnackExecutionCountAfterAck, (long)this.getCountOfDocuments(client, testSinkUnack));
        }
    }

    @Test
    public void testActionSpecificAckWithUnacknowledgableActions() throws Exception {
        String tenant = "_main";
        String watchId = "action_specific_ack_with_unack_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String testSource = "testsource_" + watchId;
        String testSinkAck = "testsink_ack_" + watchId;
        String testSinkUnack = "testsink_unack_" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest(testSource)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(testSinkAck)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(testSinkUnack)).actionGet();
            client.index(((IndexRequest)new IndexRequest(testSource).id("1").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(XContentType.JSON, new Object[]{"key1", "val1", "key2", "val2"})).actionGet();
            Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search(testSource).query("{\"match_all\" : {} }").as("testsearch").checkCondition("data.testsearch.hits.hits.length > 0").then().index(testSinkAck).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name("testaction_ack").and().index(testSinkUnack).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).ackEnabled(false).throttledFor("0").name("testaction_unack").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            this.awaitMinCountOfDocuments(client, testSinkAck, 1L);
            this.awaitMinCountOfDocuments(client, testSinkUnack, 1L);
            response = restClient.put(watchPath + "/_ack/testaction_unack");
            System.out.println(response.getBody());
            Assert.assertEquals((String)response.getBody(), (long)400L, (long)response.getStatusCode());
            Assert.assertEquals((String)response.getBody(), (Object)"The action 'testaction_unack' is not acknowledgeable", (Object)response.getBodyAsDocNode().get("error"));
        }
    }

    @Test
    public void testAckWatchLink() throws Exception {
        String tenant = "_main";
        String watchId = "test_ack_watch_link";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String frontendBaseUrl = "http://my.frontend";
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            EmailAccount account = new EmailAccount();
            account.setHost("localhost");
            account.setPort(9999);
            account.setDefaultFrom("test@test");
            GenericRestClient.HttpResponse response = restClient.putJson("/_signals/account/email/test_ack_watch_link", account.toJson(), new Header[0]);
            response = restClient.putJson("/_signals/settings/frontend_base_url", DocNode.wrap((Object)frontendBaseUrl).toJsonString(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Watch watch = new WatchBuilder(watchId).atMsInterval(100000L).put("{\"a\": 42}").as("testdata").checkCondition("data.testdata.a > 0").then().email("test").account("test_ack_watch_link").body("Watch Link: {{ack_watch_link}}\nAction Link: {{ack_action_link}}").to("test@test").name("testaction").build();
            response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            Thread.sleep(100L);
            response = restClient.postJson(watchPath + "/_execute", DocNode.of((String)"simulate", (Object)true).toJsonString(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            String mail = response.getBodyAsDocNode().findSingleNodeByJsonPath("actions[0].request").toString();
            Matcher mailMatcher = Pattern.compile("Watch Link: (\\S+)\nAction Link: (\\S+)", 8).matcher(mail);
            if (!mailMatcher.find()) {
                Assert.fail((String)response.getBody());
            }
            Assert.assertEquals((String)response.getBody(), (Object)"http://my.frontend/app/searchguard-signals?sg_tenant=SGS_GLOBAL_TENANT#/watch/test_ack_watch_link/ack/", (Object)mailMatcher.group(1));
            Assert.assertEquals((String)response.getBody(), (Object)"http://my.frontend/app/searchguard-signals?sg_tenant=SGS_GLOBAL_TENANT#/watch/test_ack_watch_link/ack/testaction/", (Object)mailMatcher.group(2));
        }
    }

    @Test
    public void testSearchWatch() throws Exception {
        String tenant = "_main";
        String watchId = "search_watch";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            Watch watch = new WatchBuilder("put_test").cronTrigger("0 0 1 * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").then().index("testsink_search_watch").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath + "1", watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            watch = new WatchBuilder("put_test").cronTrigger("0 0 1 * * ?").search("testsource").query("{\"match_all\" : {} }").as("findme").then().index("testsink_search_watch").name("testsink").build();
            response = restClient.putJson(watchPath + "2", watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            watch = new WatchBuilder("put_test").cronTrigger("0 0 1 * * ?").search("testsource").query("{\"match_all\" : {} }").as("findme").then().index("testsink_search_watch").name("testsink").build();
            response = restClient.putJson(watchPath + "3", watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.postJson("/_signals/watch/" + tenant + "/_search", "{ \"query\": {\"match\": {\"checks.name\": \"findme\"}}}", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Assert.assertTrue((String)response.getBody(), (boolean)response.getBody().contains("\"hits\":{\"total\":{\"value\":2,\"relation\":\"eq\"}"));
            response = restClient.postJson("/_signals/watch/" + tenant + "/_search", "{ \"query\": {\"match\": {\"_name\": \"search_watch3\"}}}", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Assert.assertTrue((String)response.getBody(), (boolean)response.getBody().contains("\"hits\":{\"total\":{\"value\":1,\"relation\":\"eq\"}"));
        }
    }

    @Test
    public void testSearchWatchWithoutBody() throws Exception {
        String tenant = "unit_test_search_watch_without_body";
        String watchId = "search_watch_without_body";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Watch watch = new WatchBuilder("put_test").cronTrigger("0 0 1 * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").then().index("testsink_search_watch").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath + "1", watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            watch = new WatchBuilder("put_test").cronTrigger("0 0 1 * * ?").search("testsource").query("{\"match_all\" : {} }").as("findme").then().index("testsink_search_watch").name("testsink").build();
            response = restClient.putJson(watchPath + "2", watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            watch = new WatchBuilder("put_test").cronTrigger("0 0 1 * * ?").search("testsource").query("{\"match_all\" : {} }").as("findme").then().index("testsink_search_watch").name("testsink").build();
            response = restClient.putJson(watchPath + "3", watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.get("/_signals/watch/" + tenant + "/_search", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Assert.assertTrue((String)response.getBody(), (boolean)response.getBody().contains("\"hits\":{\"total\":{\"value\":3,\"relation\":\"eq\"}"));
        }
    }

    @Test
    public void testSearchWatchScroll() throws Exception {
        String tenant = "_main";
        String watchId = "search_watch_scroll";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Watch watch = new WatchBuilder("put_test").cronTrigger("0 0 1 * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").then().index("testsink_search_watch").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath + "1", watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            watch = new WatchBuilder("put_test").cronTrigger("0 0 1 * * ?").search("testsource").query("{\"match_all\" : {} }").as("findme").then().index("testsink_search_watch").name("testsink").build();
            response = restClient.putJson(watchPath + "2", watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            watch = new WatchBuilder("put_test").cronTrigger("0 0 1 * * ?").search("testsource").query("{\"match_all\" : {} }").as("findme").then().index("testsink_search_watch").name("testsink").build();
            response = restClient.putJson(watchPath + "3", watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.postJson("/_signals/watch/" + tenant + "/_search?scroll=60s&size=1", "{ \"sort\": [{\"_meta.last_edit.date\": {\"order\": \"asc\"}}], \"query\": {\"match\": {\"checks.name\": \"findme\"}}}", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Assert.assertTrue((String)response.getBody(), (boolean)response.getBody().contains("\"_id\":\"_main/search_watch_scroll2\""));
            DocNode docNode = response.getBodyAsDocNode();
            String scrollId = docNode.getAsString("_scroll_id");
            Assert.assertNotNull((Object)scrollId);
            response = restClient.postJson("/_search/scroll", "{ \"scroll\": \"60s\", \"scroll_id\": \"" + scrollId + "\"}", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Assert.assertTrue((String)response.getBody(), (boolean)response.getBody().contains("\"_id\":\"_main/search_watch_scroll3\""));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEmailDestination() throws Exception {
        String tenant = "_main";
        String watchId = "smtp_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        int smtpPort = SocketUtils.findAvailableTcpPort();
        GreenMail greenMail = new GreenMail(new ServerSetup(smtpPort, "127.0.0.1", "smtp"));
        greenMail.start();
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            try {
                EmailAccount destination = new EmailAccount();
                destination.setHost("localhost");
                destination.setPort(smtpPort);
                Assert.assertTrue((boolean)destination.toJson().contains("\"type\":\"email\""));
                Assert.assertFalse((boolean)destination.toJson().contains("session_timeout"));
                EmailAction.Attachment attachment = new EmailAction.Attachment();
                attachment.setType(EmailAction.Attachment.AttachmentType.RUNTIME);
                GenericRestClient.HttpResponse response = restClient.putJson("/_signals/account/email/default", destination.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                response = restClient.putJson("/_signals/account/email/default", destination.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                response = restClient.delete("/_signals/account/email/aaa", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)404L, (long)response.getStatusCode());
                response = restClient.get("/_signals/account/email/aaabbb", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)404L, (long)response.getStatusCode());
                response = restClient.get("/_signals/account/email/default", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                Watch watch = new WatchBuilder("smtp_test").cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").then().email("Test Mail Subject").to("mustache@cc.xx").from("mustache@df.xx").account("default").body("We searched {{data.testsearch._shards.total}} shards").attach("attachment.txt", attachment).name("testsmtpsink").build();
                response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                if (!greenMail.waitForIncomingEmail(20000L, 1)) {
                    Assert.fail((String)"Timeout waiting for mails");
                }
                String message = GreenMailUtil.getWholeMessage((Part)greenMail.getReceivedMessages()[0]);
                Assert.assertTrue((String)message, (boolean)message.contains("Test Mail Subject"));
            }
            finally {
                restClient.delete(watchPath, new Header[0]);
                restClient.delete("/_signals/account/email/default", new Header[0]);
                greenMail.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEmailDestinationWithRuntimeDataAndBasicText() throws Exception {
        String tenant = "_main";
        String watchId = "smtp_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        int smtpPort = SocketUtils.findAvailableTcpPort();
        GreenMail greenMail = new GreenMail(new ServerSetup(smtpPort, "127.0.0.1", "smtp"));
        greenMail.start();
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            try (MockWebserviceProvider webhookProvider = new MockWebserviceProvider("/hook");){
                HttpRequestConfig httpRequestConfig = new HttpRequestConfig(HttpRequestConfig.Method.POST, new URI(webhookProvider.getUri()), "/{{data.teststatic.path}}", null, "{{data.teststatic.body}}", null, null, null, null);
                httpRequestConfig.compileScripts(this.watchInitializationService);
                EmailAccount destination = new EmailAccount();
                destination.setHost("localhost");
                destination.setPort(smtpPort);
                Assert.assertTrue((boolean)destination.toJson().contains("\"type\":\"email\""));
                Assert.assertFalse((boolean)destination.toJson().contains("session_timeout"));
                EmailAction.Attachment attachment = new EmailAction.Attachment();
                attachment.setType(EmailAction.Attachment.AttachmentType.RUNTIME);
                EmailAction.Attachment attachment2 = new EmailAction.Attachment();
                attachment2.setType(EmailAction.Attachment.AttachmentType.REQUEST);
                attachment2.setRequestConfig(httpRequestConfig);
                GenericRestClient.HttpResponse response = restClient.putJson("/_signals/account/email/default", destination.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                response = restClient.putJson("/_signals/account/email/default", destination.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                response = restClient.delete("/_signals/account/email/aaa", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)404L, (long)response.getStatusCode());
                response = restClient.get("/_signals/account/email/aaabbb", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)404L, (long)response.getStatusCode());
                response = restClient.get("/_signals/account/email/default", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                Watch watch = new WatchBuilder("smtp_test").put("{\n   \"path\":\"hook\",\n   \"body\":\"stuff\",\n   \"x\":\"y\"\n}").as("teststatic").cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").then().email("Test Mail Subject").to("mustache@cc.xx").from("mustache@df.xx").account("default").body("We searched {{data.testsearch._shards.total}} shards").attach("runtime.txt", attachment).attach("some_response.txt", attachment2).name("testsmtpsink").build();
                response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                if (!greenMail.waitForIncomingEmail(20000L, 1)) {
                    Assert.fail((String)"Timeout waiting for mails");
                }
                String message = GreenMailUtil.getWholeMessage((Part)greenMail.getReceivedMessages()[0]);
                Assert.assertTrue((String)message, (boolean)message.contains("Content-Type: application/json; filename=runtime.txt; name=runtime"));
                Assert.assertTrue((String)message, (boolean)message.contains("Content-Type: text/plain; filename=some_response.txt; name=some_response"));
                Assert.assertTrue((String)message, (boolean)message.contains("Mockery"));
                Assert.assertTrue((String)message, (boolean)message.contains("Test Mail Subject"));
            }
            finally {
                restClient.delete(watchPath, new Header[0]);
                restClient.delete("/_signals/account/email/default", new Header[0]);
                greenMail.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testEmailDestinationWithHtmlBody() throws Exception {
        String tenant = "_main";
        String watchId = "smtp_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        int smtpPort = SocketUtils.findAvailableTcpPort();
        GreenMail greenMail = new GreenMail(new ServerSetup(smtpPort, "127.0.0.1", "smtp"));
        greenMail.start();
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            try {
                EmailAccount destination = new EmailAccount();
                destination.setHost("localhost");
                destination.setPort(smtpPort);
                Assert.assertTrue((boolean)destination.toJson().contains("\"type\":\"email\""));
                Assert.assertFalse((boolean)destination.toJson().contains("session_timeout"));
                EmailAction.Attachment attachment = new EmailAction.Attachment();
                attachment.setType(EmailAction.Attachment.AttachmentType.RUNTIME);
                GenericRestClient.HttpResponse response = restClient.putJson("/_signals/account/email/default", destination.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                response = restClient.putJson("/_signals/account/email/default", destination.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                response = restClient.delete("/_signals/account/email/aaa", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)404L, (long)response.getStatusCode());
                response = restClient.get("/_signals/account/email/aaabbb", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)404L, (long)response.getStatusCode());
                response = restClient.get("/_signals/account/email/default", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                Watch watch = new WatchBuilder("smtp_test").cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").then().email("Test Mail Subject").to("mustache@cc.xx").from("mustache@df.xx").account("default").body("a body").htmlBody("<p>We searched {{data.x}} shards<p/>").attach("attachment.txt", attachment).name("testsmtpsink").build();
                response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                if (!greenMail.waitForIncomingEmail(20000L, 1)) {
                    Assert.fail((String)"Timeout waiting for mails");
                }
                String message = GreenMailUtil.getWholeMessage((Part)greenMail.getReceivedMessages()[0]);
                Assert.assertTrue((String)message, (boolean)message.contains("<p>We searched  shards<p/>"));
                Assert.assertTrue((String)message, (boolean)message.contains("a body"));
                Assert.assertTrue((String)message, (boolean)message.contains("Test Mail Subject"));
            }
            finally {
                restClient.delete(watchPath, new Header[0]);
                restClient.delete("/_signals/account/email/default", new Header[0]);
                greenMail.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNonExistingEmailAccount() throws Exception {
        String tenant = "_main";
        String watchId = "smtp_test_non_existing_account";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            restClient.delete("/_signals/account/email/default", new Header[0]);
            try {
                Watch watch = new WatchBuilder("smtp_test").cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").then().email("Test Mail Subject").to("mustache@cc.xx").from("mustache@df.xx").body("We searched {{data.testsearch._shards.total}} shards").name("testsmtpsink").build();
                GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)400L, (long)response.getStatusCode());
                Assert.assertTrue((String)response.getBody(), (boolean)response.getBody().contains("Account does not exist: email/default"));
            }
            finally {
                restClient.delete(watchPath, new Header[0]);
                restClient.delete("/_signals/account/email/default", new Header[0]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlackDestination() throws Exception {
        String tenant = "_main";
        String watchId = "slack_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            try {
                SlackAccount destination = new SlackAccount();
                destination.setUrl(new URI("https://hooks.slack.com/services/SECRET"));
                Assert.assertTrue((boolean)destination.toJson().contains("\"type\":\"slack\""));
                SlackActionConf slackActionConf = new SlackActionConf();
                slackActionConf.setText("Test from slack action");
                slackActionConf.setChannel("some channel");
                slackActionConf.setFrom("xyz");
                slackActionConf.setIconEmoji(":got:");
                slackActionConf.setAccount("default");
                GenericRestClient.HttpResponse response = restClient.putJson("/_signals/account/slack/default", destination.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                Watch watch = new WatchBuilder("slack_test").cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").then().slack(slackActionConf).name("testslacksink").build();
                response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            }
            finally {
                restClient.delete(watchPath, new Header[0]);
                restClient.delete("/_signals/account/slack/default", new Header[0]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlackDestinationWithBlocksAndText() throws Exception {
        String tenant = "_main";
        String watchId = "slack_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            try {
                SlackAccount destination = new SlackAccount();
                destination.setUrl(new URI("https://hooks.slack.com/services/SECRET"));
                Assert.assertTrue((boolean)destination.toJson().contains("\"type\":\"slack\""));
                String blocksRawJson = "[\n\t\t{\n\t\t\t\"type\": \"section\",\n\t\t\t\"text\": {\n\t\t\t\t\"type\": \"mrkdwn\",\n\t\t\t\t\"text\": \"A message *with some bold text*}.\"\n\t\t\t}\n\t\t}\n\t]";
                ImmutableList blocks = DocNode.parse((Format)Format.JSON).from(blocksRawJson).toList();
                SlackActionConf slackActionConf = new SlackActionConf();
                slackActionConf.setText("Test from slack action");
                slackActionConf.setBlocks((List)blocks);
                slackActionConf.setChannel("some channel");
                slackActionConf.setFrom("xyz");
                slackActionConf.setIconEmoji(":got:");
                slackActionConf.setAccount("default");
                GenericRestClient.HttpResponse response = restClient.putJson("/_signals/account/slack/default", destination.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                Watch watch = new WatchBuilder("slack_test").cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").then().slack(slackActionConf).name("testslacksink").build();
                response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            }
            finally {
                restClient.delete(watchPath, new Header[0]);
                restClient.delete("/_signals/account/slack/default", new Header[0]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlackDestinationWithAttachmentAndText() throws Exception {
        String tenant = "_main";
        String watchId = "slack_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            try {
                SlackAccount destination = new SlackAccount();
                destination.setUrl(new URI("https://hooks.slack.com/services/SECRET"));
                Assert.assertTrue((boolean)destination.toJson().contains("\"type\":\"slack\""));
                String attachmentRawJson = "[\n      {\n          \"fallback\": \"Plain-text summary of the attachment.\",\n          \"color\": \"#2eb886\",\n          \"pretext\": \"Optional text that appears above the attachment block\",\n          \"author_name\": \"Bobby Tables\",\n          \"author_link\": \"http://flickr.com/bobby/\",\n          \"author_icon\": \"http://flickr.com/icons/bobby.jpg\",\n          \"title\": \"Slack API Documentation\",\n          \"title_link\": \"https://api.slack.com/\",\n          \"text\": \"Optional text that appears within the attachment\",\n          \"fields\": [\n              {\n                  \"title\": \"Priority\",\n                  \"value\": \"High\",\n                  \"short\": false\n              }\n          ],\n          \"image_url\": \"http://my-website.com/path/to/image.jpg\",\n          \"thumb_url\": \"http://example.com/path/to/thumb.png\",\n          \"footer\": \"Slack API\",\n          \"footer_icon\": \"https://platform.slack-edge.com/img/default_application_icon.png\",\n          \"ts\": 123456789\n      }\n  ]";
                ImmutableList attachments = DocNode.parse((Format)Format.JSON).from(attachmentRawJson).toList();
                SlackActionConf slackActionConf = new SlackActionConf();
                slackActionConf.setText("Test from slack action");
                slackActionConf.setAttachments((List)attachments);
                slackActionConf.setChannel("some channel");
                slackActionConf.setFrom("xyz");
                slackActionConf.setIconEmoji(":got:");
                slackActionConf.setAccount("default");
                GenericRestClient.HttpResponse response = restClient.putJson("/_signals/account/slack/default", destination.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                Watch watch = new WatchBuilder("slack_test").cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").then().slack(slackActionConf).name("testslacksink").build();
                response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            }
            finally {
                restClient.delete(watchPath, new Header[0]);
                restClient.delete("/_signals/account/slack/default", new Header[0]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSlackDestinationWithMissingTextAndBlocks() throws Exception {
        String tenant = "_main";
        String watchId = "slack_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            try {
                SlackAccount destination = new SlackAccount();
                destination.setUrl(new URI("https://hooks.slack.com/services/SECRET"));
                Assert.assertTrue((boolean)destination.toJson().contains("\"type\":\"slack\""));
                SlackActionConf slackActionConf = new SlackActionConf();
                slackActionConf.setChannel("some channel");
                slackActionConf.setFrom("xyz");
                slackActionConf.setIconEmoji(":got:");
                slackActionConf.setAccount("default");
                GenericRestClient.HttpResponse response = restClient.putJson("/_signals/account/slack/default", destination.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                Watch watch = new WatchBuilder("slack_test").cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").then().slack(slackActionConf).name("testslacksink").build();
                response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)400L, (long)response.getStatusCode());
                Assert.assertTrue((boolean)response.getBody().contains("Watch is invalid: 'actions[testslacksink].text': Required attribute is missing\","));
            }
            finally {
                restClient.delete(watchPath, new Header[0]);
                restClient.delete("/_signals/account/slack/default", new Header[0]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeleteAccountInUse() throws Exception {
        String tenant = "_main";
        String watchId = "slack_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            try {
                SlackAccount destination = new SlackAccount();
                destination.setUrl(new URI("https://hooks.slack.com/services/SECRET"));
                SlackActionConf slackActionConf = new SlackActionConf();
                slackActionConf.setText("Test from slack action");
                slackActionConf.setAccount("test");
                slackActionConf.setFrom("some user");
                slackActionConf.setChannel("channel 1");
                GenericRestClient.HttpResponse response = restClient.putJson("/_signals/account/slack/test", destination.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                Watch watch = new WatchBuilder(watchId).cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").then().slack(slackActionConf).name("testslacksink").build();
                response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                response = restClient.delete("/_signals/account/slack/test", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)409L, (long)response.getStatusCode());
                response = restClient.delete(watchPath, new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                response = restClient.delete("/_signals/account/slack/test", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            }
            finally {
                restClient.delete(watchPath, new Header[0]);
                restClient.delete("/_signals/account/slack/test", new Header[0]);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeleteAccountInUseFromNonDefaultTenant() throws Exception {
        String tenant = "redshirt_club";
        String watchId = "slack_test";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);
             GenericRestClient redshirtRestClient = cluster.getRestClient("redshirt3", "redshirt", new Header[0]);){
            try {
                SlackAccount destination = new SlackAccount();
                destination.setUrl(new URI("https://hooks.slack.com/services/SECRET"));
                SlackActionConf slackActionConf = new SlackActionConf();
                slackActionConf.setText("Test from slack action");
                slackActionConf.setAccount("test");
                slackActionConf.setFrom("some user");
                slackActionConf.setChannel("channel 1");
                GenericRestClient.HttpResponse response = restClient.putJson("/_signals/account/slack/test", destination.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                Watch watch = new WatchBuilder(watchId).cronTrigger("* * * * * ?").search("testsource").query("{\"match_all\" : {} }").as("testsearch").then().slack(slackActionConf).name("testslacksink").build();
                response = redshirtRestClient.putJson(watchPath, watch.toJson(), new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
                response = restClient.delete("/_signals/account/slack/test", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)409L, (long)response.getStatusCode());
                response = restClient.delete(watchPath, new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                response = restClient.delete("/_signals/account/slack/test", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            }
            finally {
                restClient.delete(watchPath, new Header[0]);
                restClient.delete("/_signals/account/slack/test", new Header[0]);
            }
        }
    }

    @Test
    public void testPutWeeklySchedule() throws Exception {
        String tenant = "_main";
        String watchId = "test_weekly_schedule";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Watch watch = new WatchBuilder("test").weekly(DayOfWeek.MONDAY, DayOfWeek.WEDNESDAY, new TimeOfDay(12, 0), new TimeOfDay(18, 0)).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.get(watchPath, new Header[0]);
        }
    }

    @Test
    public void testPutExponentialThrottling() throws Exception {
        String tenant = "_main";
        String watchId = "test_exponential_throttling";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Watch watch = new WatchBuilder("test").atMsInterval(1000L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index("testsink").throttledFor("1s**1.5|20s").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.get(watchPath, new Header[0]);
        }
    }

    @Test
    public void testSearchAccount() throws Exception {
        String accountId = "search_account";
        String accountPath = "/_signals/account/slack/" + accountId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            SlackAccount slackDestination = new SlackAccount();
            slackDestination.setUrl(new URI("https://xyz.test.com"));
            GenericRestClient.HttpResponse response = restClient.putJson(accountPath + "1", slackDestination.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            slackDestination.setUrl(new URI("https://abc.test.com"));
            response = restClient.putJson(accountPath + "2", slackDestination.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            slackDestination = new SlackAccount();
            slackDestination.setUrl(new URI("https://abcdef.test.com"));
            response = restClient.putJson(accountPath + "3", slackDestination.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.postJson("/_signals/account/_search", "{ \"sort\": [{\"type.keyword\": {\"order\": \"asc\"}}], \"query\": {\"match\": {\"_name\": \"" + accountId + "1\"}}}", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Assert.assertTrue((String)response.getBody(), (boolean)response.getBody().contains("https://xyz.test.com"));
        }
    }

    @Test
    public void testStateIsDeletedWhenWatchIsDeleted() throws Exception {
        String tenant = "_main";
        String watchId = "watch_delete_is_state_delete";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String testSink = "testsink_" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest(testSink)).actionGet();
            Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search("testsource").query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index(testSink).throttledFor("1000h").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, (ToXContentObject)watch);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            long watchVersion = Long.parseLong(response.getBodyAsDocNode().getAsString("_version"));
            List<WatchLog> watchLogs = new WatchLogSearch(client).index(SIGNALS_LOGS_INDEX_NAME).watchId(watchId).watchVersion(watchVersion).fromTheStart().count(3).await();
            log.info("First pass watchLogs: " + watchLogs);
            Assert.assertEquals((String)watchLogs.toString(), Arrays.asList(Status.Code.ACTION_EXECUTED, Status.Code.ACTION_THROTTLED, Status.Code.ACTION_THROTTLED), watchLogs.stream().map(logEntry -> logEntry.getStatus().getCode()).collect(Collectors.toList()));
            response = restClient.delete(watchPath, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Thread.sleep(1000L);
            response = restClient.putJson(watchPath, (ToXContentObject)watch);
            long newWatchVersion = Long.parseLong(response.getBodyAsDocNode().getAsString("_version"));
            Assert.assertNotEquals((String)response.getBody(), (long)watchVersion, (long)newWatchVersion);
            watchLogs = new WatchLogSearch(client).index(SIGNALS_LOGS_INDEX_NAME).watchId(watchId).watchVersion(newWatchVersion).fromTheStart().count(3).await();
            log.info("Second pass watchLogs: " + watchLogs);
            Assert.assertEquals((String)watchLogs.toString(), Arrays.asList(Status.Code.ACTION_EXECUTED, Status.Code.ACTION_THROTTLED, Status.Code.ACTION_THROTTLED), watchLogs.stream().map(logEntry -> logEntry.getStatus().getCode()).collect(Collectors.toList()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWatchLogContainsDocumentWithHugeFieldCountWithCustomMappingsTotalFieldLimit() throws Exception {
        String tenant = "_main";
        String watchId = "watch_which_creates_huge_logs";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String testSink = "testsink_" + watchId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            try {
                client.admin().indices().delete(new DeleteIndexRequest(SIGNALS_LOGS_INDEX_NAME)).actionGet();
            }
            catch (IndexNotFoundException indexNotFoundException) {
                // empty catch block
            }
            client.admin().indices().create(new CreateIndexRequest(testSink).settings(Settings.builder().put("mapping.total_fields.limit", 3000).build())).actionGet();
            Watch watch = new WatchBuilder(watchId).logRuntimeData(true).atMsInterval(100L).search(HUGE_DOCUMENT_INDEX).query("{\"match_all\" : {} }").as("testsearch").put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index(testSink).throttledFor("1000h").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, (ToXContentObject)watch);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            long watchVersion = Long.parseLong(response.getBodyAsDocNode().getAsString("_version"));
            List<WatchLog> watchLogs = new WatchLogSearch(client).index(SIGNALS_LOGS_INDEX_NAME).watchId(watchId).watchVersion(watchVersion).fromTheStart().count(1).await();
            log.info("Huge watch logs: {}", watchLogs);
            int watchSearchResultsCount = ((DocNode)DocNode.wrap((Object)watchLogs.get(0).getData()).getAsNode("testsearch").getAsNode("hits").getAsListOfNodes("hits").get(0)).getAsNode("_source").size();
            MatcherAssert.assertThat((Object)watchSearchResultsCount, (org.hamcrest.Matcher)Matchers.equalTo((Object)970));
            TermQueryBuilder queryBuilder = QueryBuilders.termQuery((String)"data.testsearch.hits.hits._source.key_418.keyword", (String)"value_418");
            SearchResponse searchResponse = (SearchResponse)client.search(new SearchRequest(new String[]{SIGNALS_LOGS_INDEX_NAME}).source(new SearchSourceBuilder().query((QueryBuilder)queryBuilder).size(1))).actionGet();
            try {
                MatcherAssert.assertThat((Object)searchResponse.getHits().getHits(), (org.hamcrest.Matcher)Matchers.arrayWithSize((int)1));
            }
            finally {
                searchResponse.decRef();
            }
        }
    }

    @Test
    public void testSearchAccountScroll() throws Exception {
        String accountId = "search_destination_scroll";
        String accountPath = "/_signals/account/slack/" + accountId;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            SlackAccount slackDestination = new SlackAccount();
            slackDestination.setUrl(new URI("https://xyz.test.com"));
            GenericRestClient.HttpResponse response = restClient.putJson(accountPath + "1", slackDestination.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            slackDestination.setUrl(new URI("https://abc.test.com"));
            response = restClient.putJson(accountPath + "2", slackDestination.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            slackDestination = new SlackAccount();
            slackDestination.setUrl(new URI("https://abcdef.test.com"));
            response = restClient.putJson(accountPath + "3", slackDestination.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.postJson("/_signals/account/_search?scroll=60s&size=1", "{ \"sort\": [{\"type.keyword\": {\"order\": \"asc\"}}], \"query\": {\"match\": {\"type\": \"SLACK\"}}}", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Assert.assertTrue((String)response.getBody(), (boolean)response.getBody().contains("slack"));
            DocNode docNode = response.getBodyAsDocNode();
            String scrollId = docNode.getAsString("_scroll_id");
            Assert.assertNotNull((Object)scrollId);
            response = restClient.postJson("/_search/scroll", "{ \"scroll\": \"60s\", \"scroll_id\": \"" + scrollId + "\"}", new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Assert.assertTrue((String)response.getBody(), (boolean)response.getBody().contains("slack"));
        }
    }

    @Test
    public void testConvEs() throws Exception {
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            String input = DocNode.of((String)"trigger.schedule.daily.at", (Object)"noon", (String)"input.simple.x", (Object)"y", (String)"actions", (Object)DocNode.of((String)"email_action.email", (Object)DocNode.of((String)"to", (Object)"horst@horst", (String)"body", (Object)"Hallo {{ctx.payload.x}}", (String)"attachments", (Object)"foo"))).toJsonString();
            GenericRestClient.HttpResponse response = restClient.postJson("/_signals/convert/es", input, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPutAllowedEndpointsSetting() throws Exception {
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]);){
            String endpointJson = "[\"x\",\"y\"]";
            try {
                GenericRestClient.HttpResponse response = restClient.putJson("/_signals/settings/http.allowed_endpoints", endpointJson, new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                Thread.sleep(1000L);
                response = restClient.get("/_signals/settings/http.allowed_endpoints", new Header[0]);
                Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
                Assert.assertEquals((Object)endpointJson, (Object)response.getBody());
            }
            finally {
                restClient.putJson("/_signals/settings/http.allowed_endpoints", "\"*\"", new Header[0]);
            }
        }
    }

    @Test
    public void staticInputMapping() throws Exception {
        String tenant = "_main";
        String watchId1 = "static_input_mapping1";
        String watchPath1 = "/_signals/watch/" + tenant + "/" + watchId1;
        String watchId2 = "static_input_mapping2";
        String watchPath2 = "/_signals/watch/" + tenant + "/" + watchId2;
        String testSink = "testsink_" + watchId1;
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest(testSink)).actionGet();
            Watch watch = new WatchBuilder(watchId1).atMsInterval(100L).put("{\"bla\": {\"blub\": 42}}").as("teststatic").then().index(testSink).throttledFor("1000h").name("testsink").build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath1, (ToXContentObject)watch);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            watch = new WatchBuilder(watchId1).atMsInterval(100L).put("{\"bla\": \"now_a_different_type\"}").as("teststatic").then().index(testSink).throttledFor("1000h").name("testsink").build();
            response = restClient.putJson(watchPath1, (ToXContentObject)watch);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            watch = new WatchBuilder(watchId2).atMsInterval(100L).put("{\"bla\": 1234}").as("teststatic").then().index(testSink).throttledFor("1000h").name("testsink").build();
            response = restClient.putJson(watchPath2, (ToXContentObject)watch);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            response = restClient.get(watchPath2, new Header[0]);
            Assert.assertEquals((String)response.getBody(), (Object)1234, (Object)((DocNode)response.getBodyAsDocNode().getAsNode("_source").getAsListOfNodes("checks").get(0)).getAsNode("value").get("bla"));
        }
    }

    @Test
    public void testPutWatch_throttlePeriodCannotBeShorterThanLowerBound() throws Exception {
        String tenant = "_main";
        String watchId = "put_test_throttle_period_shorter_than_lower_bound";
        String actionName = "indexAction";
        String testSink = "throttle_period_shorter_than_lower_bound";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        DurationExpression watchThrottle = DurationExpression.parse((String)"2m");
        DurationExpression lowerBound = DurationExpression.parse((String)"6m");
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest(testSink)).actionGet();
            Watch watch = new WatchBuilder(watchId).cronTrigger("* * * * * ?").throttledFor(watchThrottle).then().index(testSink).name(actionName).throttledFor(watchThrottle).build();
            this.putDynamicSetting("execution.throttle_period_lower_bound", lowerBound.toString(), restClient);
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)400L, (long)response.getStatusCode());
            String errorMsg = String.format("Throttle period: %s longer than configured lower bound: %s", watchThrottle, lowerBound);
            Assert.assertEquals((String)response.getBody(), (Object)errorMsg, (Object)((DocNode)response.getBodyAsDocNode().getAsNode("detail").getAsListOfNodes("throttle_period").get(0)).getAsString("expected"));
            Assert.assertEquals((String)response.getBody(), (Object)errorMsg, (Object)((DocNode)response.getBodyAsDocNode().getAsNode("detail").getAsListOfNodes("actions[indexAction].throttle_period").get(0)).getAsString("expected"));
            watchThrottle = DurationExpression.parse((String)"1m**3");
            lowerBound = DurationExpression.parse((String)"2m**2");
            watch = new WatchBuilder(watchId).cronTrigger("* * * * * ?").throttledFor(watchThrottle).then().index(testSink).name(actionName).throttledFor(watchThrottle).build();
            this.putDynamicSetting("execution.throttle_period_lower_bound", lowerBound.toString(), restClient);
            response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)400L, (long)response.getStatusCode());
            errorMsg = String.format("Throttle period: %s longer than configured lower bound: %s", watchThrottle, lowerBound);
            Assert.assertEquals((String)response.getBody(), (Object)errorMsg, (Object)((DocNode)response.getBodyAsDocNode().getAsNode("detail").getAsListOfNodes("throttle_period").get(0)).getAsString("expected"));
            Assert.assertEquals((String)response.getBody(), (Object)errorMsg, (Object)((DocNode)response.getBodyAsDocNode().getAsNode("detail").getAsListOfNodes("actions[indexAction].throttle_period").get(0)).getAsString("expected"));
        }
    }

    @Test
    public void testPutWatch_throttlePeriodLongerThanLowerBound() throws Exception {
        String tenant = "_main";
        String watchId = "put_test_throttle_period_longer_than_lower_bound";
        String actionName = "indexAction";
        String testSink = "throttle_period_longer_than_lower_bound";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        DurationExpression watchThrottle = DurationExpression.parse((String)"6m");
        DurationExpression lowerBound = DurationExpression.parse((String)"2m");
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest(testSink)).actionGet();
            Watch watch = new WatchBuilder(watchId).cronTrigger("* * * * * ?").throttledFor(watchThrottle).then().index(testSink).name(actionName).throttledFor(watchThrottle).build();
            this.putDynamicSetting("execution.throttle_period_lower_bound", lowerBound.toString(), restClient);
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            watch = this.getWatchByRest(tenant, watchId, restClient);
            Assert.assertEquals((String)watch.toJson(), (Object)watchThrottle.toString(), (Object)watch.getThrottlePeriod().toString());
            Assert.assertEquals((String)watch.toJson(), (Object)watchThrottle.toString(), (Object)watch.getActionByName(actionName).getThrottlePeriod().toString());
            watchThrottle = DurationExpression.parse((String)"3m**2");
            lowerBound = DurationExpression.parse((String)"1m**2");
            watch = new WatchBuilder(watchId).cronTrigger("* * * * * ?").throttledFor(watchThrottle).then().index(testSink).name(actionName).throttledFor(watchThrottle).build();
            this.putDynamicSetting("execution.throttle_period_lower_bound", lowerBound.toString(), restClient);
            response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            watch = this.getWatchByRest(tenant, watchId, restClient);
            Assert.assertEquals((String)watch.toJson(), (Object)watchThrottle.toString(), (Object)watch.getThrottlePeriod().toString());
            Assert.assertEquals((String)watch.toJson(), (Object)watchThrottle.toString(), (Object)watch.getActionByName(actionName).getThrottlePeriod().toString());
        }
    }

    @Test
    public void testPutWatch_unsetThrottlePeriodDefaultsToLowerBound() throws Exception {
        String tenant = "_main";
        String watchId = "put_test_throttle_period_defaults_to_lower_bound";
        String actionName = "indexAction";
        String testSink = "throttle_period_defaults_to_than_lower_bound";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        DurationExpression lowerBound = DurationExpression.parse((String)"10m");
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest(testSink)).actionGet();
            Watch watch = new WatchBuilder(watchId).cronTrigger("* * * * * ?").then().index(testSink).name(actionName).build();
            this.putDynamicSetting("execution.throttle_period_lower_bound", lowerBound.toString(), restClient);
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            Watch watchState = this.getWatchByRest(tenant, watchId, restClient);
            Assert.assertEquals((String)watch.toJson(), (Object)lowerBound.toString(), (Object)watchState.getThrottlePeriod().toString());
            Assert.assertEquals((String)watch.toJson(), (Object)lowerBound.toString(), (Object)watchState.getActionByName(actionName).getThrottlePeriod().toString());
            this.deleteDynamicSetting("execution.throttle_period_lower_bound", restClient);
            response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            watchState = this.getWatchByRest(tenant, watchId, restClient);
            Assert.assertNull((String)watch.toJson(), (Object)watchState.getThrottlePeriod());
            Assert.assertNull((String)watch.toJson(), (Object)watchState.getActionByName(actionName).getThrottlePeriod());
        }
    }

    @Test
    public void endpointsSupportingTenantParameterShouldNotAcceptPrivateTenant() throws Exception {
        try (GenericRestClient restClient = cluster.getAdminCertRestClient();){
            GenericRestClient.HttpResponse response = restClient.get("/_signals/watch/__user__/_search", new Header[0]);
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getStatusCode(), (org.hamcrest.Matcher)Matchers.equalTo((Object)400));
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getBodyAsDocNode(), (org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"error.message", (Object)"Signals does not support private tenants"));
            response = restClient.get("/_signals/watch/_main/_search", new Header[0]);
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getBodyAsDocNode(), (org.hamcrest.Matcher)Matchers.not((org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"error.message", (Object)"Signals does not support private tenants")));
            response = restClient.get("/_signals/watch/__user__/1", new Header[0]);
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getStatusCode(), (org.hamcrest.Matcher)Matchers.equalTo((Object)400));
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getBodyAsDocNode(), (org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"error.message", (Object)"Signals does not support private tenants"));
            response = restClient.get("/_signals/watch/_main/1", new Header[0]);
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getBodyAsDocNode(), (org.hamcrest.Matcher)Matchers.not((org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"error.message", (Object)"Signals does not support private tenants")));
            response = restClient.post("/_signals/watch/__user__/_search");
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getStatusCode(), (org.hamcrest.Matcher)Matchers.equalTo((Object)400));
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getBodyAsDocNode(), (org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"error.message", (Object)"Signals does not support private tenants"));
            response = restClient.post("/_signals/watch/_main/_search");
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getBodyAsDocNode(), (org.hamcrest.Matcher)Matchers.not((org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"error.message", (Object)"Signals does not support private tenants")));
            response = restClient.post("/_signals/watch/__user__/1/_execute");
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getStatusCode(), (org.hamcrest.Matcher)Matchers.equalTo((Object)400));
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getBodyAsDocNode(), (org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"error.message", (Object)"Signals does not support private tenants"));
            response = restClient.post("/_signals/watch/_main/1/_execute");
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getBodyAsDocNode(), (org.hamcrest.Matcher)Matchers.not((org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"error.message", (Object)"Signals does not support private tenants")));
            response = restClient.put("/_signals/tenant/__user__/_active");
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getStatusCode(), (org.hamcrest.Matcher)Matchers.equalTo((Object)400));
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getBodyAsDocNode(), (org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"error.message", (Object)"Signals does not support private tenants"));
            response = restClient.put("/_signals/tenant/_main/_active");
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getBodyAsDocNode(), (org.hamcrest.Matcher)Matchers.not((org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"error.message", (Object)"Signals does not support private tenants")));
            response = restClient.put("/_signals/watch/__user__/1/_ack_and_get");
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getStatusCode(), (org.hamcrest.Matcher)Matchers.equalTo((Object)400));
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getBodyAsDocNode(), (org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"error.message", (Object)"Signals does not support private tenants"));
            response = restClient.put("/_signals/watch/_main/1/_ack_and_get");
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getBodyAsDocNode(), (org.hamcrest.Matcher)Matchers.not((org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"error.message", (Object)"Signals does not support private tenants")));
            response = restClient.delete("/_signals/watch/__user__/1", new Header[0]);
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getStatusCode(), (org.hamcrest.Matcher)Matchers.equalTo((Object)400));
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getBodyAsDocNode(), (org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"error.message", (Object)"Signals does not support private tenants"));
            response = restClient.delete("/_signals/watch/_main/1", new Header[0]);
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getBodyAsDocNode(), (org.hamcrest.Matcher)Matchers.not((org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"error.message", (Object)"Signals does not support private tenants")));
            response = restClient.delete("/_signals/watch/__user__/1/_ack/1", new Header[0]);
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getStatusCode(), (org.hamcrest.Matcher)Matchers.equalTo((Object)400));
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getBodyAsDocNode(), (org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"error.message", (Object)"Signals does not support private tenants"));
            response = restClient.delete("/_signals/watch/_main/1/_ack/1", new Header[0]);
            MatcherAssert.assertThat((String)response.getBody(), (Object)response.getBodyAsDocNode(), (org.hamcrest.Matcher)Matchers.not((org.hamcrest.Matcher)DocNodeMatchers.containsValue((String)"error.message", (Object)"Signals does not support private tenants")));
        }
    }

    @Test
    public void testAckWatchShouldNotAckAlreadyAckedActionsAgain() throws Exception {
        String tenant = "_main";
        String watchId = "watch_with_one_acked_action";
        String watchPath = "/_signals/watch/" + tenant + "/" + watchId;
        String firstAction = "firstAction";
        String secondAction = "secondAction";
        String sourceIndex = "testsource_ack_watch_with_acked_action";
        String sinkIndex = "testsink_ack_watch_with_acked_action";
        try (GenericRestClient restClient = cluster.getRestClient(USERNAME_UHURA, USERNAME_UHURA, new Header[0]).trackResources();){
            Client client = cluster.getInternalNodeClient();
            client.admin().indices().create(new CreateIndexRequest(sourceIndex)).actionGet();
            client.admin().indices().create(new CreateIndexRequest(sinkIndex)).actionGet();
            client.index(((IndexRequest)new IndexRequest(sourceIndex).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).id("1").source(XContentType.JSON, new Object[]{"a", "x", "b", "y"})).actionGet();
            Watch watch = new WatchBuilder(watchId).atMsInterval(100L).search(sourceIndex).query("{\"match_all\" : {} }").as("testsearch").checkCondition("data.testsearch.hits.hits.length > 0").then().index(sinkIndex).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name(firstAction).and().index(sinkIndex).refreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).throttledFor("0").name(secondAction).build();
            GenericRestClient.HttpResponse response = restClient.putJson(watchPath, watch.toJson(), new Header[0]);
            Assert.assertEquals((String)response.getBody(), (long)201L, (long)response.getStatusCode());
            this.awaitMinCountOfDocuments(client, sinkIndex, 2L);
            response = restClient.put(watchPath + "/_ack/" + secondAction);
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            AtomicReference firstActionOriginAckTime = new AtomicReference();
            AtomicReference secondActionOriginAckTime = new AtomicReference();
            Awaitility.await((String)"One action should be acked").atMost(Duration.ofSeconds(2L)).pollInterval(Duration.ofMillis(200L)).untilAsserted(() -> {
                GenericRestClient.HttpResponse stateResponse = restClient.get(watchPath + "/_state", new Header[0]);
                Assert.assertEquals((String)stateResponse.getBody(), (long)200L, (long)stateResponse.getStatusCode());
                Assert.assertTrue((boolean)stateResponse.getBodyAsDocNode().getAsNode("actions").getAsNode(secondAction).hasNonNull("acked"));
                Assert.assertFalse((boolean)stateResponse.getBodyAsDocNode().getAsNode("actions").getAsNode(firstAction).hasNonNull("acked"));
                secondActionOriginAckTime.set((String)stateResponse.getBodyAsDocNode().findSingleValueByJsonPath("$.actions." + secondAction + ".acked.on", String.class));
            });
            response = restClient.put(watchPath + "/_ack/");
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Awaitility.await((String)"Two actions should be acked at different time").atMost(Duration.ofSeconds(2L)).pollInterval(Duration.ofMillis(200L)).untilAsserted(() -> {
                GenericRestClient.HttpResponse stateResponse = restClient.get(watchPath + "/_state", new Header[0]);
                Assert.assertEquals((String)stateResponse.getBody(), (long)200L, (long)stateResponse.getStatusCode());
                Assert.assertTrue((boolean)stateResponse.getBodyAsDocNode().getAsNode("actions").getAsNode(secondAction).hasNonNull("acked"));
                Assert.assertTrue((boolean)stateResponse.getBodyAsDocNode().getAsNode("actions").getAsNode(firstAction).hasNonNull("acked"));
                Assert.assertEquals(secondActionOriginAckTime.get(), (Object)stateResponse.getBodyAsDocNode().findSingleValueByJsonPath("$.actions." + secondAction + ".acked.on", String.class));
                Assert.assertNotEquals(secondActionOriginAckTime.get(), (Object)stateResponse.getBodyAsDocNode().findSingleValueByJsonPath("$.actions." + firstAction + ".acked.on", String.class));
                firstActionOriginAckTime.set((String)stateResponse.getBodyAsDocNode().findSingleValueByJsonPath("$.actions." + firstAction + ".acked.on", String.class));
            });
            response = restClient.put(watchPath + "/_ack/");
            Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
            Awaitility.await((String)"Actions should not be re-acked").pollDelay(Duration.ofSeconds(2L)).untilAsserted(() -> {
                GenericRestClient.HttpResponse stateResponse = restClient.get(watchPath + "/_state", new Header[0]);
                Assert.assertEquals((String)stateResponse.getBody(), (long)200L, (long)stateResponse.getStatusCode());
                Assert.assertTrue((boolean)stateResponse.getBodyAsDocNode().getAsNode("actions").getAsNode(secondAction).hasNonNull("acked"));
                Assert.assertTrue((boolean)stateResponse.getBodyAsDocNode().getAsNode("actions").getAsNode(firstAction).hasNonNull("acked"));
                Assert.assertEquals(secondActionOriginAckTime.get(), (Object)stateResponse.getBodyAsDocNode().findSingleValueByJsonPath("$.actions." + secondAction + ".acked.on", String.class));
                Assert.assertEquals(firstActionOriginAckTime.get(), (Object)stateResponse.getBodyAsDocNode().findSingleValueByJsonPath("$.actions." + firstAction + ".acked.on", String.class));
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long getCountOfDocuments(Client client, String index) throws InterruptedException, ExecutionException {
        SearchRequest request = new SearchRequest(new String[]{index});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query((QueryBuilder)QueryBuilders.matchAllQuery());
        request.source(searchSourceBuilder);
        SearchResponse response = (SearchResponse)client.search(request).get();
        try {
            long l = response.getHits().getTotalHits().value;
            return l;
        }
        finally {
            response.decRef();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String getDocs(Client client, String index) throws InterruptedException, ExecutionException {
        SearchRequest request = new SearchRequest(new String[]{index});
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.query((QueryBuilder)QueryBuilders.matchAllQuery());
        request.source(searchSourceBuilder);
        SearchResponse response = (SearchResponse)client.search(request).get();
        try {
            String string = Strings.toString((ChunkedToXContent)response.getHits());
            return string;
        }
        finally {
            response.decRef();
        }
    }

    private long awaitMinCountOfDocuments(Client client, String index, long minCount) throws Exception {
        long start = System.currentTimeMillis();
        for (int i = 0; i < 2000; ++i) {
            Thread.sleep(10L);
            long count = this.getCountOfDocuments(client, index);
            if (count < minCount) continue;
            log.info("Found " + count + " documents in " + index + " after " + (System.currentTimeMillis() - start) + " ms");
            return count;
        }
        Assert.fail((String)("Did not find " + minCount + " documents in " + index + " after " + (System.currentTimeMillis() - start) + " ms"));
        return 0L;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private List<WatchLog> getMostRecentWatchLogs(Client client, String tenantName, String watchName, Long watchVersion, int count) {
        try {
            Object queryBuilder = watchVersion == null ? new TermQueryBuilder("watch_id", watchName) : QueryBuilders.boolQuery().must((QueryBuilder)new TermQueryBuilder("watch_id", watchName)).must((QueryBuilder)new TermQueryBuilder("watch_version", (Object)watchVersion));
            SearchResponse searchResponse = (SearchResponse)client.search(new SearchRequest(new String[]{".signals_" + tenantName + "_log"}).source(new SearchSourceBuilder().size(count).sort("execution_end", SortOrder.DESC).query((QueryBuilder)queryBuilder))).actionGet();
            try {
                if (searchResponse.getHits().getHits().length == 0) {
                    List<WatchLog> list = Collections.emptyList();
                    return list;
                }
                ArrayList<WatchLog> result = new ArrayList<WatchLog>(count);
                for (SearchHit searchHit : searchResponse.getHits().getHits()) {
                    result.add(WatchLog.parse((String)searchHit.getId(), (String)searchHit.getSourceAsString()));
                }
                Collections.reverse(result);
                ArrayList<WatchLog> arrayList = result;
                return arrayList;
            }
            finally {
                searchResponse.decRef();
            }
        }
        catch (SearchPhaseExecutionException | IndexNotFoundException e) {
            throw e;
        }
        catch (Exception e) {
            throw new RuntimeException("Error in getMostRecenWatchLog(" + tenantName + ", " + watchName + ")", e);
        }
    }

    private WatchLog awaitWatchLog(Client client, String tenantName, String watchName) throws Exception {
        return this.awaitWatchLogs(client, tenantName, watchName, null, 1).get(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<WatchLog> awaitWatchLogs(Client client, String tenantName, String watchName, Long watchVersion, int count) throws Exception {
        try {
            long start = System.currentTimeMillis();
            Throwable indexNotFoundException = null;
            for (int i = 0; i < 1000; ++i) {
                Thread.sleep(10L);
                try {
                    List<WatchLog> watchLogs = this.getMostRecentWatchLogs(client, tenantName, watchName, watchVersion, count);
                    if (watchLogs.size() == count) {
                        log.info("Found " + watchLogs + " for " + watchName + " after " + (System.currentTimeMillis() - start) + " ms");
                        return watchLogs;
                    }
                    if (i != 0 && i % 200 == 0) {
                        log.debug("Still waiting for watch logs; found so far: " + watchLogs);
                    }
                    indexNotFoundException = null;
                    continue;
                }
                catch (SearchPhaseExecutionException | IndexNotFoundException e) {
                    indexNotFoundException = e;
                }
            }
            if (indexNotFoundException != null) {
                Assert.fail((String)("Did not find watch log index for " + watchName + " after " + (System.currentTimeMillis() - start) + " ms: " + (Exception)indexNotFoundException));
            } else {
                SearchResponse searchResponse = (SearchResponse)client.search(new SearchRequest(new String[]{"signals_" + tenantName + "_log"}).source(new SearchSourceBuilder().sort("execution_end", SortOrder.DESC).query((QueryBuilder)new MatchAllQueryBuilder()))).actionGet();
                try {
                    log.info("Did not find watch log for " + watchName + " after " + (System.currentTimeMillis() - start) + " ms\n\n" + searchResponse.getHits());
                }
                finally {
                    searchResponse.decRef();
                }
                Assert.fail((String)("Did not find watch log for " + watchName + " after " + (System.currentTimeMillis() - start) + " ms"));
            }
            return null;
        }
        catch (Exception e) {
            log.error("Exception in awaitWatchLog for " + watchName + ")", (Throwable)e);
            throw new RuntimeException("Exception in awaitWatchLog for " + watchName + ")", e);
        }
    }

    private Watch getWatchByRest(String tenant, String id, GenericRestClient restClient) throws Exception {
        GenericRestClient.HttpResponse response = restClient.get("/_signals/watch/" + tenant + "/" + id, new Header[0]);
        Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
        return Watch.parseFromElasticDocument((WatchInitializationService)this.watchInitializationService, (String)"test", (String)id, (String)response.getBody(), (long)-1L);
    }

    private GenericRestClient.HttpResponse awaitRestGet(String request, GenericRestClient restClient) throws Exception {
        GenericRestClient.HttpResponse response = null;
        long start = System.currentTimeMillis();
        for (int i = 0; i < 100; ++i) {
            response = restClient.get(request, new Header[0]);
            if (response.getStatusCode() != 404) {
                log.info(request + " returned " + response.getStatusCode() + " after " + (System.currentTimeMillis() - start) + "ms (" + i + " retries)");
                return response;
            }
            Thread.sleep(10L);
        }
        return response;
    }

    private void putDynamicSetting(String settingName, String settingValue, GenericRestClient restClient) throws Exception {
        GenericRestClient.HttpResponse response = restClient.putJson("/_signals/settings/" + settingName, (Document)DocNode.wrap((Object)settingValue));
        Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
    }

    private void deleteDynamicSetting(String settingName, GenericRestClient restClient) throws Exception {
        GenericRestClient.HttpResponse response = restClient.delete("/_signals/settings/" + settingName, new Header[0]);
        Assert.assertEquals((String)response.getBody(), (long)200L, (long)response.getStatusCode());
    }

    static {
        REQUEST_HEADER_ADDING_FILTER = new WireMockRequestHeaderAddingFilter("Proxy", "wire-mock");
        USER_CERTIFICATE = new TestSgConfig.User("certificate-user").roles(new TestSgConfig.Role[]{TestSgConfig.Role.ALL_ACCESS});
        cluster = new LocalCluster.Builder().singleNode().sslEnabled().resources("sg_config/signals").nodeSettings(new Object[]{"signals.enabled", true, "signals.index_names.log", SIGNALS_LOGS_INDEX_NAME, "signals.enterprise.enabled", false, "searchguard.diagnosis.action_stack.enabled", true, "signals.watch_log.refresh_policy", "immediate", "signals.watch_log.sync_indexing", true, "searchguard.unsupported.single_index_mt_enabled", true, "signals.watch_log.mapping_total_fields_limit", 3000}).user(USER_CERTIFICATE).enableModule(SignalsModule.class).waitForComponents(new String[]{"signals"}).enterpriseModulesEnabled().embedded().build();
    }
}

