Use Kafka
This commit is contained in:
parent
a79802646c
commit
033628c336
14 changed files with 340 additions and 149 deletions
|
|
@ -45,11 +45,13 @@ public class DroolsBusinessIndexerServer {
|
|||
private String bootstrapAddress;
|
||||
|
||||
|
||||
@Value(value = "${kafka.index.groupid})")
|
||||
private String groupID;
|
||||
|
||||
public ConsumerFactory<String, ChtijbugObjectRequest> greetingConsumerFactory() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "greeting");
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
|
||||
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(ChtijbugObjectRequest.class));
|
||||
}
|
||||
@Bean
|
||||
|
|
|
|||
|
|
@ -24,8 +24,9 @@ public class StoreLoggingService {
|
|||
@Autowired
|
||||
private BusinessTransactionActionRepository actionRepository;
|
||||
|
||||
|
||||
@KafkaListener(
|
||||
topics = KafkaTopicConstants.LOGING_TOPIC,
|
||||
topics = KafkaTopicConstants.LOGING_TOPIC,groupId = "${kafka.index.groupid}",
|
||||
containerFactory = "ruleKafkaListenerContainerFactory")
|
||||
public void store(ChtijbugObjectRequest result) {
|
||||
if (result != null) {
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ kafka.bootstrapAddress=localhost:9092,localhost:9093,localhost:9094
|
|||
|
||||
kieserver.login=kieserver
|
||||
kieserver.password=kieserver1
|
||||
|
||||
kafka.index.groupid=index1
|
||||
spring.data.mongodb.database=businessProxyDB
|
||||
spring.data.mongodb.host=localhost:28017
|
||||
server.port=5547
|
||||
|
|
@ -18,9 +18,13 @@ package org.chtijbug.drools.proxy;
|
|||
|
||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||
import org.apache.kafka.clients.admin.NewTopic;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.chtijbug.drools.ChtijbugObjectRequest;
|
||||
import org.chtijbug.drools.KieContainerResponse;
|
||||
import org.chtijbug.drools.KieContainerUpdate;
|
||||
import org.chtijbug.drools.common.KafkaTopicConstants;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
|
|
@ -28,10 +32,9 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.annotation.EnableKafka;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaAdmin;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||
import org.springframework.kafka.core.*;
|
||||
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
||||
import org.springframework.kafka.support.serializer.JsonSerializer;
|
||||
|
||||
import java.net.InetAddress;
|
||||
|
|
@ -45,8 +48,8 @@ import java.util.Map;
|
|||
@EnableKafka
|
||||
public class DroolsBusinessProxyServer {
|
||||
|
||||
|
||||
|
||||
@Value(value = "${org.kie.server.id}")
|
||||
private String groupID;
|
||||
|
||||
@Value(value = "${kafka.bootstrapAddress}")
|
||||
private String bootstrapAddress;
|
||||
|
|
@ -62,10 +65,12 @@ public class DroolsBusinessProxyServer {
|
|||
public NewTopic loggingTopic() {
|
||||
return new NewTopic(KafkaTopicConstants.LOGING_TOPIC, 1, (short) 1);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public NewTopic actionResponseTopic() {
|
||||
return new NewTopic(KafkaTopicConstants.RESPONSE_TOPIC, 1, (short) 1);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ProducerFactory<String, ChtijbugObjectRequest> producerFactory() {
|
||||
Map<String, Object> configProps = new HashMap<>();
|
||||
|
|
@ -80,6 +85,45 @@ public class DroolsBusinessProxyServer {
|
|||
JsonSerializer.class);
|
||||
return new DefaultKafkaProducerFactory<>(configProps);
|
||||
}
|
||||
@Bean
|
||||
public ProducerFactory<String, KieContainerResponse> producerKieContainerResponseactory() {
|
||||
Map<String, Object> configProps = new HashMap<>();
|
||||
configProps.put(
|
||||
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||
bootstrapAddress);
|
||||
configProps.put(
|
||||
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
|
||||
StringSerializer.class);
|
||||
configProps.put(
|
||||
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
|
||||
JsonSerializer.class);
|
||||
return new DefaultKafkaProducerFactory<>(configProps);
|
||||
}
|
||||
@Bean
|
||||
public KafkaTemplate<String, KieContainerResponse> kafkaKieContainerUpdateResponsableTemplate() {
|
||||
return new KafkaTemplate<>(producerKieContainerResponseactory());
|
||||
}
|
||||
@Bean
|
||||
public NewTopic actionDeployResponseTopic() {
|
||||
return new NewTopic(KafkaTopicConstants.RESPONSE_DEPLOY_TOPIC, 1, (short) 1);
|
||||
}
|
||||
|
||||
|
||||
public ConsumerFactory<String, KieContainerUpdate> greetingConsumerFactory() {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
|
||||
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(KieContainerUpdate.class));
|
||||
}
|
||||
@Bean
|
||||
public ConcurrentKafkaListenerContainerFactory<String, KieContainerUpdate>
|
||||
ruleKafkaListenerKieContainerUpdateFactory() {
|
||||
|
||||
ConcurrentKafkaListenerContainerFactory<String, KieContainerUpdate> factory =
|
||||
new ConcurrentKafkaListenerContainerFactory<>();
|
||||
factory.setConsumerFactory(greetingConsumerFactory());
|
||||
return factory;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTemplate<String, ChtijbugObjectRequest> kafkaTemplate() {
|
||||
|
|
|
|||
|
|
@ -1,12 +0,0 @@
|
|||
package org.chtijbug.drools.proxy.camel;
|
||||
|
||||
import org.apache.camel.builder.RouteBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
public class AutodeployRouter extends RouteBuilder {
|
||||
@Override
|
||||
public void configure() throws Exception {
|
||||
from("quartz2://myGroup/myTimerName?cron=0/5+*+*+?+*+*").to("bean:kieService?method=updateConfig()");
|
||||
}
|
||||
}
|
||||
|
|
@ -18,6 +18,9 @@ package org.chtijbug.drools.proxy.service;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.camel.CamelContext;
|
||||
import org.apache.camel.Route;
|
||||
import org.chtijbug.drools.KieContainerResponse;
|
||||
import org.chtijbug.drools.KieContainerUpdate;
|
||||
import org.chtijbug.drools.common.KafkaTopicConstants;
|
||||
import org.chtijbug.drools.proxy.PlatfomKieServerStateRepository;
|
||||
import org.chtijbug.drools.proxy.camel.DroolsRouter;
|
||||
import org.chtijbug.drools.proxy.persistence.model.ContainerPojoPersist;
|
||||
|
|
@ -39,6 +42,8 @@ import org.slf4j.LoggerFactory;
|
|||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.kafka.annotation.KafkaListener;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
|
|
@ -75,6 +80,9 @@ public class KieServiceCommon {
|
|||
private String hostName = "localhost";
|
||||
private Map<String, DroolsRouter> routes = new HashMap<>();
|
||||
|
||||
@Autowired
|
||||
KafkaTemplate<String, KieContainerResponse> kafkaKieContainerUpdateResponseTemplate;
|
||||
|
||||
public KieServiceCommon() {
|
||||
// for now, if no server impl is passed as parameter, create one
|
||||
// this.server = KieServerLocator.getInstance();
|
||||
|
|
@ -210,7 +218,7 @@ public class KieServiceCommon {
|
|||
Set<Class<?>> classes = kieContainerInstance.getExtraClasses();
|
||||
String className = container.getClassName();
|
||||
Class foundClass = this.getClassFromName(classes, className);
|
||||
if (foundClass!=null) {
|
||||
if (foundClass != null) {
|
||||
ClassLoader classLoader = foundClass.getClassLoader();
|
||||
Class<?> theClass = classLoader.loadClass(className);
|
||||
camelContext.setApplicationContextClassLoader(classLoader);
|
||||
|
|
@ -264,41 +272,55 @@ public class KieServiceCommon {
|
|||
return result.getResult();
|
||||
}
|
||||
|
||||
public void updateConfig() throws Exception {
|
||||
String serverName = KieServiceCommon.getKieServerID();
|
||||
String isSwarm = System.getProperty("org.kie.server.swarm");
|
||||
List<ContainerRuntimePojoPersist> containers = null;
|
||||
|
||||
containers = containerRuntimeRepository.findByServerNameAndStatusAndHostname(serverName, ContainerRuntimePojoPersist.STATUS.TODEPLOY.toString(), hostName);
|
||||
for (ContainerRuntimePojoPersist element : containers) {
|
||||
ContainerPojoPersist containerIds = containerRepository.findByServerNameAndContainerId(serverName, element.getContainerId());
|
||||
if (containerIds != null) {
|
||||
|
||||
this.disposeContainer(element.getContainerId());
|
||||
@KafkaListener(
|
||||
topics = "${org.kie.server.id}", groupId = "${org.kie.server.id}",
|
||||
containerFactory = "ruleKafkaListenerKieContainerUpdateFactory")
|
||||
public void updateConfig(KieContainerUpdate update) {
|
||||
try {
|
||||
String serverName = KieServiceCommon.getKieServerID();
|
||||
if (update.getAction().equals(KieContainerUpdate.STATUS.TODEPLOY)) {
|
||||
this.disposeContainer(update.getContainerID());
|
||||
KieContainerResource newContainer = new KieContainerResource();
|
||||
newContainer.setContainerId(element.getContainerId());
|
||||
newContainer.setContainerId(update.getContainerID());
|
||||
newContainer.setReleaseId(new ReleaseId());
|
||||
newContainer.getReleaseId().setArtifactId(containerIds.getArtifactId());
|
||||
newContainer.getReleaseId().setGroupId(containerIds.getGroupId());
|
||||
newContainer.getReleaseId().setVersion(containerIds.getVersion());
|
||||
this.createContainer(element.getContainerId(), newContainer);
|
||||
newContainer.getReleaseId().setArtifactId(update.getArtifactID());
|
||||
newContainer.getReleaseId().setGroupId(update.getGroupID());
|
||||
newContainer.getReleaseId().setVersion(update.getProjectVersion());
|
||||
this.createContainer(update.getContainerID(), newContainer);
|
||||
ContainerPojoPersist containerIds = containerRepository.findByServerNameAndContainerId(serverName, update.getContainerID());
|
||||
this.initCamelBusinessRoute(containerIds);
|
||||
element.setStatus(ContainerRuntimePojoPersist.STATUS.UP.toString());
|
||||
containerRuntimeRepository.save(element);
|
||||
}
|
||||
|
||||
}
|
||||
containers = containerRuntimeRepository.findByServerNameAndStatusAndHostname(serverName, ContainerRuntimePojoPersist.STATUS.TODELETE.toString(), hostName);
|
||||
|
||||
for (ContainerRuntimePojoPersist element : containers) {
|
||||
ContainerPojoPersist containerIds = containerRepository.findByServerNameAndContainerId(serverName, element.getContainerId());
|
||||
if (containerIds != null) {
|
||||
this.disposeContainer(element.getContainerId());
|
||||
this.deleteCamelBusinessRoute(element.getContainerId());
|
||||
List<ContainerRuntimePojoPersist> containers = containerRuntimeRepository.findByServerNameAndContainerId(serverName, update.getContainerID());
|
||||
for (ContainerRuntimePojoPersist element : containers) {
|
||||
element.setStatus(ContainerRuntimePojoPersist.STATUS.UP.toString());
|
||||
containerRuntimeRepository.save(element);
|
||||
}
|
||||
} else {
|
||||
this.disposeContainer(update.getContainerID());
|
||||
this.deleteCamelBusinessRoute(update.getContainerID());
|
||||
ContainerPojoPersist containerIds = containerRepository.findByServerNameAndContainerId(serverName, update.getContainerID());
|
||||
List<ContainerRuntimePojoPersist> containers = containerRuntimeRepository.findByServerNameAndContainerId(serverName, update.getContainerID());
|
||||
for (ContainerRuntimePojoPersist element : containers) {
|
||||
element.setStatus(ContainerRuntimePojoPersist.STATUS.UP.toString());
|
||||
containerRuntimeRepository.save(element);
|
||||
}
|
||||
containerRepository.delete(containerIds);
|
||||
}
|
||||
}
|
||||
KieContainerResponse kieContainerResponse = new KieContainerResponse();
|
||||
kieContainerResponse.setStatus(KieContainerResponse.STATUS.SUCCESS);
|
||||
|
||||
kafkaKieContainerUpdateResponseTemplate.send(KafkaTopicConstants.RESPONSE_DEPLOY_TOPIC,kieContainerResponse);
|
||||
|
||||
}catch (Exception e){
|
||||
KieContainerResponse kieContainerResponse = new KieContainerResponse();
|
||||
kieContainerResponse.setStatus(KieContainerResponse.STATUS.ERROR);
|
||||
kieContainerResponse.setKieContainerUpdate(update);
|
||||
kieContainerResponse.setMessageError(e.getMessage());
|
||||
for (StackTraceElement stackTraceElement : e.getStackTrace()){
|
||||
kieContainerResponse.getErrorMessages().add(stackTraceElement.toString());
|
||||
}
|
||||
kafkaKieContainerUpdateResponseTemplate.send(KafkaTopicConstants.RESPONSE_DEPLOY_TOPIC,kieContainerResponse);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
editor.link_modal.header
Reference in a new issue