Implement basic reconciliation "loop".

This commit is contained in:
Michael Lipp 2023-07-22 14:36:42 +02:00
parent de17d323c3
commit 50bff5d38f
10 changed files with 461 additions and 68 deletions

View file

@ -18,6 +18,8 @@ dependencies {
implementation 'commons-cli:commons-cli:1.5.0'
implementation 'io.kubernetes:client-java:18.0.0'
runtimeOnly 'org.slf4j:slf4j-jdk14:[2.0.7,3)'
}
application {

View file

@ -18,17 +18,6 @@
package org.jdrupes.vmoperator.manager;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.apis.CustomObjectsApi;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Yaml;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
@ -38,48 +27,38 @@ import org.jgrapes.core.Channel;
import org.jgrapes.core.Component;
import org.jgrapes.core.Components;
import org.jgrapes.core.annotation.Handler;
import org.jgrapes.core.events.Start;
import org.jgrapes.core.events.Stop;
import org.jgrapes.io.NioDispatcher;
/**
* The application class.
*/
public class Manager extends Component {
/** The Constant APP_NAME. */
public static final String APP_NAME = "vmoperator";
private static Manager app;
/**
* Instantiates a new manager.
*
* @throws IOException Signals that an I/O exception has occurred.
*/
public Manager() throws IOException {
// Attach a general nio dispatcher
// Prepare component tree
attach(new NioDispatcher());
attach(new VmDefinitionWatcher(channel()));
attach(new Reconciliator(channel()));
}
/**
* Handle the start event.
* On stop.
*
* @param event the event
* @throws IOException
* @throws ApiException
*/
@Handler
public void onStart(Start event) throws IOException, ApiException {
ApiClient client = Config.defaultClient();
Configuration.setDefaultApiClient(client);
CoreV1Api api = new CoreV1Api();
V1PodList list = api.listPodForAllNamespaces(null, null, null, null,
null, null, null, null, null, null);
for (V1Pod item : list.getItems()) {
System.out.println(item.getMetadata().getName());
}
// CustomObjectsApi cApi = new CustomObjectsApi();
// var obj = cApi.getNamespacedCustomObject("vmoperator.jdrupes.org", "v1",
// "default", "vms", "test");
// obj = null;
}
@Handler
@Handler(priority = -1000)
public void onStop(Stop event) {
System.out.println("(Done.)");
logger.fine(() -> "Applictaion stopped.");
}
static {

View file

@ -0,0 +1,18 @@
package org.jdrupes.vmoperator.manager;
import org.jgrapes.core.Channel;
import org.jgrapes.core.Component;
import org.jgrapes.core.annotation.Handler;
public class Reconciliator extends Component {
public Reconciliator(Channel componentChannel) {
super(componentChannel);
}
@Handler
public void onVmChanged(VmChangedEvent event, WatchChannel channel) {
event = null;
}
}

View file

@ -0,0 +1,83 @@
/*
* JGrapes Event Driven Framework
* Copyright (C) 2018 Michael N. Lipp
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, see <http://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.manager;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import org.jgrapes.core.Channel;
import org.jgrapes.core.Components;
import org.jgrapes.core.Event;
/**
* Indicates a change in a VM definition.
*/
public class VmChangedEvent extends Event<Void> {
/**
* The type of change.
*/
public enum Type {
ADDED, MODIFIED, DELETED
}
private final Type type;
private final V1ObjectMeta metadata;
/**
* Instantiates a new VM changed event.
*
* @param type the type
* @param metadata the metadata
*/
public VmChangedEvent(Type type, V1ObjectMeta metadata) {
this.type = type;
this.metadata = metadata;
}
/**
* Returns the type.
*
* @return the type
*/
public Type type() {
return type;
}
/**
* Returns the metadata.
*
* @return the metadata
*/
public V1ObjectMeta metadata() {
return metadata;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(Components.objectName(this)).append(" [").append(type)
.append(' ').append(metadata.getName());
if (channels() != null) {
builder.append(", channels=");
builder.append(Channel.toString(channels()));
}
builder.append(']');
return builder.toString();
}
}

View file

@ -0,0 +1,131 @@
/*
* JGrapes Event Driven Framework
* Copyright (C) 2018 Michael N. Lipp
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, see <http://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.manager;
import com.google.gson.reflect.TypeToken;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.apis.CustomObjectsApi;
import io.kubernetes.client.openapi.models.V1APIResource;
import io.kubernetes.client.openapi.models.V1Namespace;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import okhttp3.Call;
import org.jdrupes.vmoperator.manager.VmChangedEvent.Type;
import org.jgrapes.core.Channel;
import org.jgrapes.core.Component;
import org.jgrapes.core.annotation.Handler;
import org.jgrapes.core.events.Start;
import org.jgrapes.core.events.Stop;
/**
* Watches for changes of VM definitions.
*/
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
public class VmDefinitionWatcher extends Component {
private static final String CR_GROUP = "vmoperator.jdrupes.org";
private static final String CR_VERSION = "v1";
private static final String CR_KIND = "VirtualMachine";
private CoreV1Api api;
private CustomObjectsApi coa;
private V1APIResource vmsCrd;
private String managedNamespace = "default";
private final Map<String, WatchChannel> channels
= new ConcurrentHashMap<>();
/**
* Instantiates a new VM definition watcher.
*
* @param componentChannel the component channel
*/
public VmDefinitionWatcher(Channel componentChannel) {
super(componentChannel);
}
/**
* Handle the start event.
*
* @param event the event
* @throws IOException
* @throws ApiException
*/
@Handler
public void onStart(Start event) throws IOException, ApiException {
ApiClient client = Config.defaultClient();
Configuration.setDefaultApiClient(client);
// Get access to APIs
api = new CoreV1Api();
coa = new CustomObjectsApi();
// Derive all information from the CRD
var resources = coa.getAPIResources(CR_GROUP, CR_VERSION);
vmsCrd = resources.getResources().stream()
.filter(r -> CR_KIND.equals(r.getKind())).findFirst().get();
// Watch the resources (vm definitions)
Call call = coa.listNamespacedCustomObjectCall(
CR_GROUP, CR_VERSION, managedNamespace, vmsCrd.getName(), null,
false, null, null, null, null, null, null, null, true, null);
new Thread(() -> {
try (Watch<V1Namespace> watch = Watch.createWatch(client,
call, new TypeToken<Watch.Response<V1Namespace>>() {
}.getType())) {
for (Watch.Response<V1Namespace> item : watch) {
handleCrEvent(item);
}
} catch (IOException | ApiException e) {
logger.log(Level.FINE, e, () -> "Probem while watching: "
+ e.getMessage());
}
fire(new Stop());
}).start();
}
private void handleCrEvent(Watch.Response<V1Namespace> item) {
V1ObjectMeta metadata = item.object.getMetadata();
WatchChannel channel = channels.computeIfAbsent(metadata.getName(),
k -> new WatchChannel(channel(), newEventPipeline(), api, coa));
channel.pipeline().fire(new VmChangedEvent(
VmChangedEvent.Type.valueOf(item.type), metadata), channel);
}
/**
* Remove VM channel when VM is deleted.
*
* @param event the event
* @param channel the channel
*/
@Handler(priority = -10_000)
public void onVmChanged(VmChangedEvent event, WatchChannel channel) {
if (event.type() == Type.DELETED) {
channels.remove(event.metadata().getName());
}
}
}

View file

@ -0,0 +1,78 @@
/*
* JGrapes Event Driven Framework
* Copyright (C) 2018 Michael N. Lipp
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, see <http://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.manager;
import io.kubernetes.client.openapi.apis.CoreV1Api;
import io.kubernetes.client.openapi.apis.CustomObjectsApi;
import org.jgrapes.core.Channel;
import org.jgrapes.core.EventPipeline;
import org.jgrapes.core.Subchannel.DefaultSubchannel;
/**
* A subchannel used to send the events related to a specific
* VM.
*/
public class WatchChannel extends DefaultSubchannel {
private final EventPipeline pipeline;
private final CoreV1Api api;
private final CustomObjectsApi coa;
/**
* Instantiates a new watch channel.
*
* @param mainChannel the main channel
* @param pipeline the pipeline
*/
public WatchChannel(Channel mainChannel, EventPipeline pipeline,
CoreV1Api api, CustomObjectsApi coa) {
super(mainChannel);
this.pipeline = pipeline;
this.api = api;
this.coa = coa;
}
/**
* Returns the pipeline.
*
* @return the event pipeline
*/
public EventPipeline pipeline() {
return pipeline;
}
/**
* Returns the API object for invoking kubernetes functions.
*
* @return the API object
*/
public CoreV1Api api() {
return api;
}
/**
* Returns the API object for invoking kubernetes custom object
* functions.
*
* @return the API object
*/
public CustomObjectsApi coa() {
return coa;
}
}