/*
 * Decompiled with CFR 0.152.
 */
package com.floragunn.searchsupport.jobs;

import com.floragunn.searchsupport.jobs.ConstantHashJobConfig;
import com.floragunn.searchsupport.jobs.LocalCluster;
import com.floragunn.searchsupport.jobs.SchedulerBuilder;
import com.floragunn.searchsupport.jobs.actions.SchedulerConfigUpdateAction;
import com.floragunn.searchsupport.jobs.cluster.NodeComparator;
import com.floragunn.searchsupport.jobs.cluster.NodeNameComparator;
import com.floragunn.searchsupport.jobs.config.JobConfigFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.jcip.annotations.NotThreadSafe;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.node.PluginAwareNode;
import org.elasticsearch.xcontent.XContentType;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;

@NotThreadSafe
public class JobExecutionEngineTest {
    private static final Logger log = LogManager.getLogger(JobExecutionEngineTest.class);
    @ClassRule
    public static LocalCluster cluster = new LocalCluster.Builder().singleNode().sslEnabled().build();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void emptyNodeFilterTest() throws Exception {
        String test = "empty_node_filter";
        String jobConfigIndex = "test_job_config_" + test;
        Scheduler scheduler = null;
        try (Client tc = cluster.getInternalClient();){
            String jobConfig = this.createIntervalJobConfig(1, "emptyNodeFilterTest", "100ms");
            tc.index(((IndexRequest)new IndexRequest(jobConfigIndex).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(jobConfig, XContentType.JSON)).actionGet();
            PluginAwareNode node = cluster.node();
            ClusterService clusterService = (ClusterService)node.injector().getInstance(ClusterService.class);
            NodeEnvironment nodeEnvironment = (NodeEnvironment)node.injector().getInstance(NodeEnvironment.class);
            scheduler = new SchedulerBuilder().client(tc).name("test_" + test).nodeFilter("node_group_1:xxx").configIndex(jobConfigIndex).jobConfigFactory((JobConfigFactory)new ConstantHashJobConfig.Factory(TestJob.class)).distributed(clusterService, nodeEnvironment).nodeComparator((NodeComparator)new NodeNameComparator(clusterService)).build();
            scheduler.start();
            Thread.sleep(3000L);
            int count = TestJob.getCounter("emptyNodeFilterTest");
            Assert.assertEquals((long)0L, (long)count);
        }
        finally {
            if (scheduler != null) {
                scheduler.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Ignore
    @Test
    public void configUpdateTest() throws Exception {
        String test = "config_update";
        String jobConfigIndex = "test_job_config_" + test;
        Scheduler scheduler = null;
        try (Client tc = cluster.getInternalClient();){
            String jobConfig = this.createIntervalJobConfig(1, "basic", "100ms");
            tc.index(((IndexRequest)new IndexRequest(jobConfigIndex).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(jobConfig, XContentType.JSON)).actionGet();
            PluginAwareNode node = cluster.node();
            ClusterService clusterService = (ClusterService)node.injector().getInstance(ClusterService.class);
            NodeEnvironment nodeEnvironment = (NodeEnvironment)node.injector().getInstance(NodeEnvironment.class);
            scheduler = new SchedulerBuilder().client(tc).name("test_" + test).configIndex(jobConfigIndex).jobConfigFactory((JobConfigFactory)new ConstantHashJobConfig.Factory(TestJob.class)).distributed(clusterService, nodeEnvironment).nodeComparator((NodeComparator)new NodeNameComparator(clusterService)).build();
            scheduler.start();
            Thread.sleep(500L);
            jobConfig = this.createIntervalJobConfig(1, "late", "100ms");
            tc.index(((IndexRequest)new IndexRequest(jobConfigIndex).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(jobConfig, XContentType.JSON)).actionGet();
            SchedulerConfigUpdateAction.send((Client)tc, (String)scheduler.getSchedulerName());
            Thread.sleep(3000L);
            int count = TestJob.getCounter("late");
            Assert.assertTrue((String)("count is " + count), (count >= 1 ? 1 : 0) != 0);
        }
        finally {
            if (scheduler != null) {
                scheduler.shutdown();
            }
        }
    }

    @Ignore
    @Test
    public void triggerUpdateTest() throws Exception {
        try (Client tc = cluster.getInternalClient();){
            String jobConfig = this.createCronJobConfig(1, "basic", null, "*/1 * * * * ?");
            tc.index(((IndexRequest)new IndexRequest("testjobconfig").id("trigger_update_test_job").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(jobConfig, XContentType.JSON)).actionGet();
            PluginAwareNode node = cluster.node();
            ClusterService clusterService = (ClusterService)node.injector().getInstance(ClusterService.class);
            NodeEnvironment nodeEnvironment = (NodeEnvironment)node.injector().getInstance(NodeEnvironment.class);
            Scheduler scheduler = new SchedulerBuilder().client(tc).name("test").configIndex("testjobconfig").nodeFilter("node_index:1").jobConfigFactory((JobConfigFactory)new ConstantHashJobConfig.Factory(TestJob.class)).distributed(clusterService, nodeEnvironment).nodeComparator((NodeComparator)new NodeNameComparator(clusterService)).build();
            scheduler.start();
            Thread.sleep(3000L);
            int count1 = TestJob.getCounter("basic");
            jobConfig = this.createCronJobConfig(1, "basic", null, "* * * * * ?");
            tc.index(((IndexRequest)new IndexRequest("testjobconfig").id("trigger_update_test_job").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)).source(jobConfig, XContentType.JSON)).actionGet();
            SchedulerConfigUpdateAction.send((Client)tc, (String)"test");
            Thread.sleep(3000L);
            int count2 = TestJob.getCounter("basic");
            System.out.println("count1: " + count1 + "; count2: " + count2);
            Assert.assertTrue((String)("count is " + count2), (count2 > count1 ? 1 : 0) != 0);
        }
    }

    private String createIntervalJobConfig(int hash, String name, String interval) {
        StringBuilder result = new StringBuilder("{");
        result.append("\"hash\": ").append(hash).append(",");
        if (name != null) {
            result.append("\"name\": \"").append(name).append("\",");
        }
        result.append("\"trigger\": {\"schedule\": {\"interval\": ");
        result.append("\"").append(interval).append("\"");
        result.append("}}}");
        return result.toString();
    }

    private String createCronJobConfig(int hash, String name, Integer delay, String ... cronSchedule) {
        StringBuilder result = new StringBuilder("{");
        result.append("\"hash\": ").append(hash).append(",");
        if (name != null) {
            result.append("\"name\": \"").append(name).append("\",");
        }
        if (delay != null) {
            result.append("\"delay\": ").append(delay).append(",");
        }
        result.append("\"trigger\": {\"schedule\": {\"cron\": ");
        if (cronSchedule.length == 1) {
            result.append("\"").append(cronSchedule[0]).append("\"");
        } else {
            result.append("[");
            boolean first = true;
            for (String cron : cronSchedule) {
                if (first) {
                    first = false;
                } else {
                    result.append(",");
                }
                result.append("\"").append(cron).append("\"");
            }
            result.append("]");
        }
        result.append("}}}");
        return result.toString();
    }

    @DisallowConcurrentExecution
    public static class NonConcurrentTestJob
    extends TestJob {
        int prevMaxConcurrency = 0;

        @Override
        public void execute(JobExecutionContext context) throws JobExecutionException {
            int maxConcurrency;
            String name = context.getMergedJobDataMap().getString("name");
            super.execute(context);
            if (name != null && (maxConcurrency = NonConcurrentTestJob.getCounter(name + "_max_concurrency")) > this.prevMaxConcurrency && maxConcurrency > 1) {
                log.error("DisallowConcurrentExecution constraint violated during last job run of " + name + " " + maxConcurrency + " (" + this.prevMaxConcurrency + ")");
                this.prevMaxConcurrency = maxConcurrency;
            }
            log.info("JOB " + name + " finished");
        }
    }

    public static class TestJob
    implements Job {
        static Map<String, Integer> counters = new ConcurrentHashMap<String, Integer>();

        public void execute(JobExecutionContext context) throws JobExecutionException {
            Number delay;
            String name = context.getMergedJobDataMap().getString("name");
            if (name != null) {
                TestJob.incrementCounter(name);
                int maxConcurrency = TestJob.incrementCounter(name + "_active_concurrent");
                if (maxConcurrency > TestJob.getCounter(name + "_max_concurrency")) {
                    TestJob.setCounter(name + "_max_concurrency", maxConcurrency);
                }
                log.info("JOB " + name + " #" + TestJob.getCounter(name));
            }
            if ((delay = (Number)context.getMergedJobDataMap().get((Object)"delay")) != null) {
                try {
                    Thread.sleep(delay.longValue());
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
            TestJob.decrementCounter(name + "_active_concurrent");
        }

        static int incrementCounter(String counterName) {
            int value = TestJob.getCounter(counterName) + 1;
            counters.put(counterName, value);
            return value;
        }

        static int decrementCounter(String counterName) {
            int value = TestJob.getCounter(counterName) - 1;
            counters.put(counterName, value);
            return value;
        }

        static void setCounter(String counterName, int number) {
            counters.put(counterName, number);
        }

        static int getCounter(String counterName) {
            Integer value = counters.get(counterName);
            if (value == null) {
                return 0;
            }
            return value;
        }
    }

    public static class LoggingTestJob
    implements Job {
        public void execute(JobExecutionContext context) throws JobExecutionException {
            try {
                System.out.println("job: " + context + " " + new HashMap(context.getMergedJobDataMap()) + " on " + context.getScheduler().getSchedulerName());
                System.out.println(context.getJobDetail());
            }
            catch (SchedulerException e) {
                e.printStackTrace();
            }
        }
    }
}

