Use Kafka

This commit is contained in:
Nicolas Héron 2020-06-24 23:21:21 +02:00
commit a79802646c
37 changed files with 814 additions and 658 deletions

View file

@ -1,6 +1,12 @@
package org.chtijbug.drools.console;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.chtijbug.drools.ReverseProxyUpdate;
import org.chtijbug.drools.common.KafkaTopicConstants;
import org.chtijbug.drools.console.middle.DababaseContentInit;
import org.chtijbug.drools.console.service.model.kie.KieConfigurationData;
import org.chtijbug.drools.console.service.util.ApplicationContextProvider;
@ -11,20 +17,34 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.annotation.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.event.EventListener;
import org.springframework.data.mongodb.repository.config.EnableMongoRepositories;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@SpringBootApplication
@EnableMongoRepositories("org.chtijbug.drools.proxy.persistence.repository")
@EnableKafka
@PropertySource("classpath:application.properties")
public class DroolsSpringBootConsoleApplication extends SpringBootServletInitializer {
@Value("${kie-wb.baseurl}")
private String kiewbUrl;
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Autowired
private DababaseContentInit dababaseContentInit;
@ -45,6 +65,39 @@ public class DroolsSpringBootConsoleApplication extends SpringBootServletInitial
return application.sources(DroolsSpringBootConsoleApplication.class);
}
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic loggingTopic() {
return new NewTopic(KafkaTopicConstants.REVERSE_PROXY, 1, (short) 1);
}
@Bean
public ProducerFactory<String, ReverseProxyUpdate> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, ReverseProxyUpdate> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
public static void main(String[] args) {
SpringApplication.run(DroolsSpringBootConsoleApplication.class, args);
}

View file

@ -27,7 +27,7 @@ public class ElasticSearchExpose {
BusinessTransactionPersistence tmp=null;
List<BusinessTransactionPersistence> businessTransactionPersistences = indexerService.getBusinessTransactionPersistenceRepository().findAllByTransactionId(transactionId,new PageRequest(0,5000));
List<BusinessTransactionPersistence> businessTransactionPersistences = indexerService.getBusinessTransactionPersistenceRepository().findAllByTransactionId(transactionId,PageRequest.of(0,5000));
if(businessTransactionPersistences!=null) {
for (BusinessTransactionPersistence b : businessTransactionPersistences) {
@ -46,7 +46,7 @@ public class ElasticSearchExpose {
@RequestMapping(value = "/findActionByBusinessId", method = RequestMethod.GET)
public List<BusinessTransactionAction> findActionById(@RequestParam String businessId, HttpServletRequest request, HttpServletResponse response) {
List<BusinessTransactionAction> businessTransactionPersistences = indexerService.getBusinessTransactionActionRepository().findAllByBusinessTransactionId(businessId,new Sort(new Sort.Order(Sort.Direction.ASC,"eventNumber")),new PageRequest(0,5000));
List<BusinessTransactionAction> businessTransactionPersistences = indexerService.getBusinessTransactionActionRepository().findAllByBusinessTransactionId(businessId,Sort.by(new Sort.Order(Sort.Direction.ASC,"eventNumber")),PageRequest.of(0,5000));
return businessTransactionPersistences;
}

View file

@ -2,6 +2,8 @@ package org.chtijbug.drools.console.service;
import com.vaadin.flow.component.UI;
import com.vaadin.flow.server.VaadinSession;
import org.chtijbug.drools.ReverseProxyUpdate;
import org.chtijbug.drools.common.KafkaTopicConstants;
import org.chtijbug.drools.console.AddLog;
import org.chtijbug.drools.console.service.model.UserConnected;
import org.chtijbug.drools.console.service.model.kie.JobStatus;
@ -21,6 +23,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
@ -34,7 +37,7 @@ public class ProjectPersistService {
private static final Logger logger = LoggerFactory.getLogger(ProjectPersistService.class);
private static String projectVariable = "4";
private static String projectVariable = "4";
@Autowired
private ProjectRepository projectRepository;
@ -56,6 +59,8 @@ public class ProjectPersistService {
@Autowired
private RuntimeRepository runtimeRepository;
@Autowired
private KafkaTemplate<String, ReverseProxyUpdate> kafkaTemplateProxyUpdate;
public ProjectPersistService() {
this.config = AppContext.getApplicationContext().getBean(KieConfigurationData.class);
@ -63,18 +68,16 @@ public class ProjectPersistService {
}
public void saveIfnotExist(List<PlatformProjectResponse> platformProjectResponses) {
for (PlatformProjectResponse platformProjectResponse : platformProjectResponses) {
ProjectPersist projectPersist = projectRepository.findByProjectNameAndBranch(new KieProject(platformProjectResponse.getSpaceName(), platformProjectResponse.getName()),platformProjectResponse.getBranch());
ProjectPersist projectPersist = projectRepository.findByProjectNameAndBranch(new KieProject(platformProjectResponse.getSpaceName(), platformProjectResponse.getName()), platformProjectResponse.getBranch());
if (projectPersist == null) {
projectPersist = platformProjectResponseToProjectPersist(platformProjectResponse);
projectPersist=projectRepository.save(projectPersist);
projectPersist = projectRepository.save(projectPersist);
addProjectToSession(projectPersist, true);
} else {
@ -101,12 +104,12 @@ public class ProjectPersistService {
}
if (isModifiable) {
projectPersists.put(projectPersist.getProjectName().toString()+"-"+projectPersist.getBranch(), projectPersist);
projectPersists.put(projectPersist.getProjectName().toString() + "-" + projectPersist.getBranch(), projectPersist);
} else {
ProjectPersist tmp = projectPersists.get(projectPersist.getProjectName().toString()+"-"+projectPersist.getBranch());
ProjectPersist tmp = projectPersists.get(projectPersist.getProjectName().toString() + "-" + projectPersist.getBranch());
if (tmp == null) {
projectPersists.put(projectPersist.getProjectName().toString()+"-"+projectPersist.getBranch(), projectPersist);
projectPersists.put(projectPersist.getProjectName().toString() + "-" + projectPersist.getBranch(), projectPersist);
}
}
@ -117,6 +120,8 @@ public class ProjectPersistService {
projectPersist.setStatus(ProjectPersist.Deployable);
projectPersist.setContainerID(projectPersist.getDeploymentName() + "-" + projectPersist.getProjectName());
projectPersist.getServerNames().clear();
ReverseProxyUpdate reverseProxyUpdate = new ReverseProxyUpdate();
reverseProxyUpdate.setPath("/" + projectPersist.getContainerID());
for (RuntimePersist runtimePersist : runtimePersists) {
List<String> names = new ArrayList<>();
names.add(runtimePersist.getServerName());
@ -132,14 +137,14 @@ public class ProjectPersistService {
newContainer.setArtifactId(projectPersist.getArtifactID());
newContainer.setVersion(projectPersist.getProjectVersion());
containerRepository.save(newContainer);
}
String hostName = runtimePersist.getServerUrl() + "/api/" + projectPersist.getContainerID();
reverseProxyUpdate.getServerNames().add(hostName);
}
projectRepository.save(projectPersist);
addProjectToSession(projectPersist, true);
kafkaTemplateProxyUpdate.send(KafkaTopicConstants.REVERSE_PROXY, reverseProxyUpdate);
return true;
}
@ -173,16 +178,16 @@ public class ProjectPersistService {
public void run() {
JobStatus result = kieRepositoryService.buildProject(config.getKiewbUrl(), userConnected.getUserName(),
userConnected.getUserPassword(), projectPersist.getProjectName().getSpaceName(), projectPersist.getProjectName().getName(),projectPersist.getBranch(), "compile", workOnGoingView, ui);
userConnected.getUserPassword(), projectPersist.getProjectName().getSpaceName(), projectPersist.getProjectName().getName(), projectPersist.getBranch(), "compile", workOnGoingView, ui);
executeWrite(url, username, password, workOnGoingView, result.getJobId(), ui);
result = kieRepositoryService.buildProject(config.getKiewbUrl(), userConnected.getUserName(),
userConnected.getUserPassword(), projectPersist.getProjectName().getSpaceName(), projectPersist.getProjectName().getName(),projectPersist.getBranch(), "install", workOnGoingView, ui);
userConnected.getUserPassword(), projectPersist.getProjectName().getSpaceName(), projectPersist.getProjectName().getName(), projectPersist.getBranch(), "install", workOnGoingView, ui);
executeWrite(url, username, password, workOnGoingView, result.getJobId(), ui);
for (String serverName : projectPersist.getServerNames()) {
for (String serverName : projectPersist.getServerNames()) {
List<ContainerRuntimePojoPersist> existingContainers = containerRuntimeRepository.findByServerNameAndContainerId(serverName, projectPersist.getContainerID());
if (!existingContainers.isEmpty()) {
@ -230,7 +235,7 @@ public class ProjectPersistService {
this.wait(1000);
}
} catch (InterruptedException e) {
logger.error("executeWrite",e);
logger.error("executeWrite", e);
Thread.currentThread().interrupt();
}
}

View file

@ -173,7 +173,7 @@ public class GridActionLogging extends Grid<BusinessTransactionAction> {
public void setDataProvider(String id){
List<BusinessTransactionAction> businessTransactionPersistences = indexerService.getBusinessTransactionActionRepository().findAllByBusinessTransactionId(id,new Sort(new Sort.Order(Sort.Direction.ASC,"eventNumber")),new PageRequest(0,5000));
List<BusinessTransactionAction> businessTransactionPersistences = indexerService.getBusinessTransactionActionRepository().findAllByBusinessTransactionId(id,Sort.by(new Sort.Order(Sort.Direction.ASC,"eventNumber")),PageRequest.of(0,5000));
if(businessTransactionPersistences!=null) {
dataProvider = new ListDataProvider<>(businessTransactionPersistences);

View file

@ -103,7 +103,7 @@ public class GridLogging extends Grid<BusinessTransactionPersistence> {
});
serverNameC.setHeader(serverName);
setDataProvider(indexerService.getBusinessTransactionPersistenceRepository().findAll(new PageRequest(0,100)).getContent());
setDataProvider(indexerService.getBusinessTransactionPersistenceRepository().findAll(PageRequest.of(0,100)).getContent());
}
private void refreshtGrid(String value,String type){

View file

@ -38,9 +38,9 @@ public class ActionLogging extends VerticalLayout {
if (textFieldStringComponentValueChangeEvent.getValue().isEmpty()) {
loggingView.getTitle().setText("Logging : ");
loggingView.getGridLogging().setDataProvider(indexerService.getBusinessTransactionPersistenceRepository().findAll(new PageRequest(0, 100)).getContent());
loggingView.getGridLogging().setDataProvider(indexerService.getBusinessTransactionPersistenceRepository().findAll(PageRequest.of(0, 100)).getContent());
} else {
List<BusinessTransactionPersistence> b = indexerService.getBusinessTransactionPersistenceRepository().findAllByTransactionId(textFieldStringComponentValueChangeEvent.getValue(), new PageRequest(0, 100));
List<BusinessTransactionPersistence> b = indexerService.getBusinessTransactionPersistenceRepository().findAllByTransactionId(textFieldStringComponentValueChangeEvent.getValue(), PageRequest.of(0, 100));
if (b != null) {
loggingView.getTitle().setText("Logging : " + textFieldStringComponentValueChangeEvent.getValue());

View file

@ -23,7 +23,7 @@ import org.chtijbug.drools.proxy.persistence.model.ProjectPersist;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@StyleSheet("css/accueil.css")
public class DeploymentView extends VerticalLayout implements AddLog {
@ -215,7 +215,7 @@ public class DeploymentView extends VerticalLayout implements AddLog {
public void setDataProvider() {
HashMap<String, ProjectPersist> projectPersists = projectPersistService.getProjectsSession();
Map<String, ProjectPersist> projectPersists = projectPersistService.getProjectsSession();
if (projectPersists != null) {
dataProvider = new ListDataProvider<>(projectPersists.values());

View file

@ -13,3 +13,4 @@ spring.data.mongodb.database=businessProxyDB
spring.data.mongodb.host=localhost:28017
spring.servlet.multipart.enabled=false
kafka.bootstrapAddress=localhost:9092,localhost:9093,localhost:9094