package com.floragunn.searchguard.auditlog.sink;

import com.floragunn.searchguard.auditlog.AbstractAuditlogiUnitTest;
import com.floragunn.searchguard.auditlog.helper.MockAuditMessageFactory;
import com.floragunn.searchguard.auditlog.impl.AuditMessage;
import com.floragunn.searchguard.test.helper.file.FileHelper;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.yaml.YamlXContent;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import scala.util.Random;

/* loaded from: input_file:com/floragunn/searchguard/auditlog/sink/KafkaSinkTest.class */
public class KafkaSinkTest extends AbstractAuditlogiUnitTest {

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 1, new String[]{"compliance"});

    @Test
    public void testKafka() throws Exception {
        Settings.Builder loadFromSource = Settings.builder().loadFromSource(FileHelper.loadFile("auditlog/endpoints/sink/configuration_kafka.yml").replace("_RPLC_BOOTSTRAP_SERVERS_", embeddedKafka.getEmbeddedKafka().getBrokersAsString()), YamlXContent.yamlXContent.type());
        KafkaConsumer<Long, String> createConsumer = createConsumer();
        try {
            createConsumer.subscribe(Arrays.asList("compliance"));
            AuditLogSink defaultSink = new SinkProvider(loadFromSource.put("path.home", ".").build(), (Client) null, (ThreadPool) null, (Path) null).getDefaultSink();
            try {
                Assert.assertEquals(KafkaSink.class, defaultSink.getClass());
                Assert.assertTrue(defaultSink.doStore(MockAuditMessageFactory.validAuditMessage(AuditMessage.Category.MISSING_PRIVILEGES)));
                Assert.assertEquals(1L, createConsumer.poll(Duration.ofSeconds(10L)).count());
                defaultSink.close();
                if (createConsumer != null) {
                    createConsumer.close();
                }
            } catch (Throwable th) {
                defaultSink.close();
                throw th;
            }
        } catch (Throwable th2) {
            if (createConsumer != null) {
                try {
                    createConsumer.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    private KafkaConsumer<Long, String> createConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", embeddedKafka.getEmbeddedKafka().getBrokersAsString());
        properties.put("auto.offset.reset", "earliest");
        properties.put("group.id", "mygroup" + System.currentTimeMillis() + "_" + new Random().nextDouble());
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.LongDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer<>(properties);
    }
}
