feat(kafka) : support SSL connection

This commit is contained in:
Sébastien Velay 2020-10-01 14:58:15 +02:00
commit 4dd394aa68
12 changed files with 225 additions and 4 deletions

View file

@ -10,4 +10,12 @@ spring.data.mongodb.host=${PYMMA_MONGO_HOST:mongodb:27017}
spring.data.mongodb.password=${PYMMA_MONGO_PASSWORD}
spring.data.mongodb.username=${PYMMA_MONGO_USERNAME}
kafka.index.groupid=${PYMMA_KIE_SERVEUR_GROUPID:index1}
kafka.bootstrapAddress=${PYMMA_KAFKA_BOOTSTRAP:kafka1:19092,kafka2:19093,kafka3:19094}
kafka.bootstrapAddress=${PYMMA_KAFKA_BOOTSTRAP:kafka1:19092,kafka2:19093,kafka3:19094}
pymma.kafka.activateSsl=${PYMMA_KAFKA_ACTIVATE_SSL:false}
pymma.kafka.sslTruststoreLocation=${PYMMA_KAFKA_SSL_TRUSTSTORE_LOCATION}
pymma.kafka.sslTruststorePassword=${PYMMA_KAFKA_SSL_TRUSTSTORE_PASSWORD}
pymma.kafka.sslKeyPassword=${PYMMA_KAFKA_KEY_PASSWORD}
pymma.kafka.sslKeystorePassword=${PYMMA_KAFKA_SSL_KEYSTORE_PASSWORD}
pymma.kafka.sslKeystoreLocation=${PYMMA_KAFKA_SSL_KEYSTORE_LOCATION}
pymma.kafka.sslKeystoreType=${PYMMA_KAFKA_SSL_KEYSTORE_TYPE}

View file

@ -16,7 +16,10 @@
*/
package org.chtijbug.drools.indexer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.chtijbug.drools.ChtijbugObjectRequest;
import org.springframework.beans.factory.annotation.Value;
@ -44,6 +47,26 @@ public class DroolsBusinessIndexerServer {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value("${pymma.kafka.activateSsl:false}")
private boolean activateSsl;
@Value("${pymma.kafka.sslTruststoreLocation:}")
private String sslTruststoreLocation;
@Value("${pymma.kafka.sslTruststorePassword:}")
private String sslTruststorePassword;
@Value("${pymma.kafka.sslKeyPassword:}")
private String sslKeyPassword;
@Value("${pymma.kafka.sslKeystorePassword:}")
private String sslKeystorePassword;
@Value("${pymma.kafka.sslKeystoreLocation:}")
private String sslKeystoreLocation;
@Value("${pymma.kafka.sslKeystoreType:}")
private String sslKeystoreType;
@Value(value = "${kafka.index.groupid})")
private String groupID;
@ -54,6 +77,15 @@ public class DroolsBusinessIndexerServer {
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
if (activateSsl) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, this.sslTruststoreLocation);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.sslTruststorePassword);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.sslKeyPassword);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.sslKeystorePassword);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, this.sslKeystoreLocation);
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, this.sslKeystoreType);
}
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(ChtijbugObjectRequest.class));
}
@Bean

View file

@ -8,3 +8,10 @@ spring.data.mongodb.host=${PYMMA_MONGO_HOST:localhost:28017}
server.port=${port:5547}
pymma.kafka.activateSsl=${PYMMA_KAFKA_ACTIVATE_SSL:false}
pymma.kafka.sslTruststoreLocation=${PYMMA_KAFKA_SSL_TRUSTSTORE_LOCATION}
pymma.kafka.sslTruststorePassword=${PYMMA_KAFKA_SSL_TRUSTSTORE_PASSWORD}
pymma.kafka.sslKeyPassword=${PYMMA_KAFKA_KEY_PASSWORD}
pymma.kafka.sslKeystorePassword=${PYMMA_KAFKA_SSL_KEYSTORE_PASSWORD}
pymma.kafka.sslKeystoreLocation=${PYMMA_KAFKA_SSL_KEYSTORE_LOCATION}
pymma.kafka.sslKeystoreType=${PYMMA_KAFKA_SSL_KEYSTORE_TYPE}

View file

@ -26,3 +26,11 @@ spring.data.mongodb.username=${PYMMA_MONGO_USERNAME}
kie-wb.m2repo=${PYMMA_M2_REPO:http://kie-wb:8080/kie-wb/maven2/}
kafka.bootstrapAddress=${PYMMA_KAFKA_BOOTSTRAP:kafka1:19092,kafka2:19093,kafka3:19094}
pymma.kafka.activateSsl=${PYMMA_KAFKA_ACTIVATE_SSL:false}
pymma.kafka.sslTruststoreLocation=${PYMMA_KAFKA_SSL_TRUSTSTORE_LOCATION}
pymma.kafka.sslTruststorePassword=${PYMMA_KAFKA_SSL_TRUSTSTORE_PASSWORD}
pymma.kafka.sslKeyPassword=${PYMMA_KAFKA_KEY_PASSWORD}
pymma.kafka.sslKeystorePassword=${PYMMA_KAFKA_SSL_KEYSTORE_PASSWORD}
pymma.kafka.sslKeystoreLocation=${PYMMA_KAFKA_SSL_KEYSTORE_LOCATION}
pymma.kafka.sslKeystoreType=${PYMMA_KAFKA_SSL_KEYSTORE_TYPE}

View file

@ -19,9 +19,12 @@ package org.chtijbug.drools.proxy;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
import org.chtijbug.drools.ChtijbugObjectRequest;
import org.chtijbug.drools.KieContainerResponse;
@ -57,10 +60,40 @@ public class DroolsBusinessProxyServer {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value("${pymma.kafka.activateSsl:false}")
private boolean activateSsl;
@Value("${pymma.kafka.sslTruststoreLocation:}")
private String sslTruststoreLocation;
@Value("${pymma.kafka.sslTruststorePassword:}")
private String sslTruststorePassword;
@Value("${pymma.kafka.sslKeyPassword:}")
private String sslKeyPassword;
@Value("${pymma.kafka.sslKeystorePassword:}")
private String sslKeystorePassword;
@Value("${pymma.kafka.sslKeystoreLocation:}")
private String sslKeystoreLocation;
@Value("${pymma.kafka.sslKeystoreType:}")
private String sslKeystoreType;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
if (activateSsl) {
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name);
configs.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, this.sslTruststoreLocation);
configs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.sslTruststorePassword);
configs.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.sslKeyPassword);
configs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.sslKeystorePassword);
configs.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, this.sslKeystoreLocation);
configs.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, this.sslKeystoreType);
}
return new KafkaAdmin(configs);
}
@ -83,6 +116,15 @@ public class DroolsBusinessProxyServer {
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
if (activateSsl) {
configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name);
configProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, this.sslTruststoreLocation);
configProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.sslTruststorePassword);
configProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.sslKeyPassword);
configProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.sslKeystorePassword);
configProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, this.sslKeystoreLocation);
configProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, this.sslKeystoreType);
}
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
@ -101,6 +143,15 @@ public class DroolsBusinessProxyServer {
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
if (activateSsl) {
configProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name);
configProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, this.sslTruststoreLocation);
configProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.sslTruststorePassword);
configProps.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.sslKeyPassword);
configProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.sslKeystorePassword);
configProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, this.sslKeystoreLocation);
configProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, this.sslKeystoreType);
}
DefaultKafkaProducerFactory<String, KieContainerResponse> producer = new DefaultKafkaProducerFactory<>(configProps);
producer.transactionCapable();
producer.setTransactionIdPrefix("trans");

View file

@ -25,4 +25,12 @@ spring.data.mongodb.host=${PYMMA_MONGO_HOST:localhost:28017}
eureka.client.service-url.defaultZone=http://127.0.0.1:8761/eureka/
spring.application.name=proxy-app-${org.kie.server.id}
kafka.bootstrapAddress=${PYMMA_KAFKA_BOOTSTRAP:localhost:9092,localhost:9093,localhost:9094}
kafka.bootstrapAddress=${PYMMA_KAFKA_BOOTSTRAP:localhost:9092,localhost:9093,localhost:9094}
pymma.kafka.activateSsl=${PYMMA_KAFKA_ACTIVATE_SSL:false}
pymma.kafka.sslTruststoreLocation=${PYMMA_KAFKA_SSL_TRUSTSTORE_LOCATION}
pymma.kafka.sslTruststorePassword=${PYMMA_KAFKA_SSL_TRUSTSTORE_PASSWORD}
pymma.kafka.sslKeyPassword=${PYMMA_KAFKA_KEY_PASSWORD}
pymma.kafka.sslKeystorePassword=${PYMMA_KAFKA_SSL_KEYSTORE_PASSWORD}
pymma.kafka.sslKeystoreLocation=${PYMMA_KAFKA_SSL_KEYSTORE_LOCATION}
pymma.kafka.sslKeystoreType=${PYMMA_KAFKA_SSL_KEYSTORE_TYPE}

View file

@ -4,4 +4,12 @@ spring.data.mongodb.database=${PYMMA_MONGO_DATABASE:businessProxyDB}
spring.data.mongodb.host=${PYMMA_MONGO_HOST:mongodb:27017}
spring.data.mongodb.password=${PYMMA_MONGO_PASSWORD}
spring.data.mongodb.username=${PYMMA_MONGO_USERNAME}
kafka.bootstrapAddress=${PYMMA_KAFKA_BOOTSTRAP:kafka1:19092,kafka2:19093,kafka3:19094}
kafka.bootstrapAddress=${PYMMA_KAFKA_BOOTSTRAP:kafka1:19092,kafka2:19093,kafka3:19094}
pymma.kafka.activateSsl=${PYMMA_KAFKA_ACTIVATE_SSL:false}
pymma.kafka.sslTruststoreLocation=${PYMMA_KAFKA_SSL_TRUSTSTORE_LOCATION}
pymma.kafka.sslTruststorePassword=${PYMMA_KAFKA_SSL_TRUSTSTORE_PASSWORD}
pymma.kafka.sslKeyPassword=${PYMMA_KAFKA_KEY_PASSWORD}
pymma.kafka.sslKeystorePassword=${PYMMA_KAFKA_SSL_KEYSTORE_PASSWORD}
pymma.kafka.sslKeystoreLocation=${PYMMA_KAFKA_SSL_KEYSTORE_LOCATION}
pymma.kafka.sslKeystoreType=${PYMMA_KAFKA_SSL_KEYSTORE_TYPE}

View file

@ -1,7 +1,10 @@
package org.chtijbug.drools.reverseproxy;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.chtijbug.drools.ReverseProxyUpdate;
import org.springframework.beans.factory.annotation.Value;
@ -24,10 +27,40 @@ public class DroolsBusinessReverseProxyServer {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value("${pymma.kafka.activateSsl:false}")
private boolean activateSsl;
@Value("${pymma.kafka.sslTruststoreLocation:}")
private String sslTruststoreLocation;
@Value("${pymma.kafka.sslTruststorePassword:}")
private String sslTruststorePassword;
@Value("${pymma.kafka.sslKeyPassword:}")
private String sslKeyPassword;
@Value("${pymma.kafka.sslKeystorePassword:}")
private String sslKeystorePassword;
@Value("${pymma.kafka.sslKeystoreLocation:}")
private String sslKeystoreLocation;
@Value("${pymma.kafka.sslKeystoreType:}")
private String sslKeystoreType;
public ConsumerFactory<String, ReverseProxyUpdate> mappingConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting");
if (activateSsl) {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, this.sslTruststoreLocation);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.sslTruststorePassword);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.sslKeyPassword);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.sslKeystorePassword);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, this.sslKeystoreLocation);
props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, this.sslKeystoreType);
}
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(ReverseProxyUpdate.class));
}
@Bean

View file

@ -3,4 +3,12 @@ spring.data.mongodb.database=${PYMMA_MONGO_DATABASE:businessProxyDB}
spring.data.mongodb.host=${PYMMA_MONGO_HOST:localhost:28017}
charon.tracing.enabled=true
kafka.bootstrapAddress=${PYMMA_KAFKA_BOOTSTRAP:localhost:9092,localhost:9093,localhost:9094}
kafka.bootstrapAddress=${PYMMA_KAFKA_BOOTSTRAP:localhost:9092,localhost:9093,localhost:9094}
pymma.kafka.activateSsl=${PYMMA_KAFKA_ACTIVATE_SSL:false}
pymma.kafka.sslTruststoreLocation=${PYMMA_KAFKA_SSL_TRUSTSTORE_LOCATION}
pymma.kafka.sslTruststorePassword=${PYMMA_KAFKA_SSL_TRUSTSTORE_PASSWORD}
pymma.kafka.sslKeyPassword=${PYMMA_KAFKA_KEY_PASSWORD}
pymma.kafka.sslKeystorePassword=${PYMMA_KAFKA_SSL_KEYSTORE_PASSWORD}
pymma.kafka.sslKeystoreLocation=${PYMMA_KAFKA_SSL_KEYSTORE_LOCATION}
pymma.kafka.sslKeystoreType=${PYMMA_KAFKA_SSL_KEYSTORE_TYPE}