From 2f101f2acc8d2aaf05350614c8cee0eceb23dbe0 Mon Sep 17 00:00:00 2001 From: "Michael N. Lipp" Date: Thu, 3 Aug 2023 15:58:59 +0200 Subject: [PATCH] Support more than one CRD version. --- .../vmoperator/manager/CmReconciler.java | 2 +- .../jdrupes/vmoperator/manager/Constants.java | 3 - .../vmoperator/manager/Controller.java | 56 +++---------- .../vmoperator/manager/DisksReconciler.java | 4 +- .../org/jdrupes/vmoperator/manager/K8s.java | 10 +++ .../vmoperator/manager/PodReconciler.java | 2 +- .../vmoperator/manager/Reconciler.java | 6 +- .../vmoperator/manager/VmDefChanged.java | 18 ++-- .../jdrupes/vmoperator/manager/VmWatcher.java | 83 ++++++++++++------- 9 files changed, 91 insertions(+), 93 deletions(-) diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/CmReconciler.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/CmReconciler.java index ef71f48..0bcfb2f 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/CmReconciler.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/CmReconciler.java @@ -63,7 +63,7 @@ import org.jdrupes.vmoperator.manager.VmDefChanged.Type; // Get API and check if exists DynamicKubernetesApi cmApi = new DynamicKubernetesApi("", "v1", "configmaps", channel.client()); - var existing = K8s.get(cmApi, event.metadata()); + var existing = K8s.get(cmApi, event.object().getMetadata()); // If deleted, delete if (event.type() == Type.DELETED) { diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Constants.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Constants.java index 85af1eb..5c5cf3f 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Constants.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Constants.java @@ -29,9 +29,6 @@ public class Constants { /** The Constant VM_OP_GROUP. */ public static final String VM_OP_GROUP = "vmoperator.jdrupes.org"; - /** The Constant VM_OP_VERSION. */ - public static final String VM_OP_VERSION = "v1"; - /** The Constant VM_OP_KIND_VM. */ public static final String VM_OP_KIND_VM = "VirtualMachine"; diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java index 40598c2..80cd005 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java @@ -18,21 +18,20 @@ package org.jdrupes.vmoperator.manager; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.Configuration; +import io.kubernetes.client.util.Config; import java.io.IOException; -import java.util.Map; import org.jgrapes.core.Channel; import org.jgrapes.core.Component; import org.jgrapes.core.annotation.Handler; -import org.jgrapes.util.events.ConfigurationUpdate; -import org.jgrapes.util.events.InitialConfiguration; +import org.jgrapes.core.events.Start; /** * The application class. */ public class Controller extends Component { -// private final ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - /** * Instantiates a new manager. * @@ -46,47 +45,16 @@ public class Controller extends Component { } /** - * On configuration update. + * Handle the start event. Has higher priority because it configures + * the default Kubernetes client. * * @param event the event + * @throws IOException + * @throws ApiException */ - @Handler - public void onConfigurationUpdate(ConfigurationUpdate event) { - event.structured(componentPath()).ifPresent(c -> { - if (event instanceof InitialConfiguration) { - processInitialConfiguration(c); - } - }); - } - - private void processInitialConfiguration( - Map runnerConfiguration) { -// try { -// config = mapper.convertValue(runnerConfiguration, -// Configuration.class); -// if (!config.check()) { -// // Invalid configuration, not used, problems already logged. -// config = null; -// } -// -// // Prepare firmware files and add to config -// setFirmwarePaths(); -// -// // Obtain more context data from template -// var tplData = dataFromTemplate(); -// swtpmDefinition = Optional.ofNullable(tplData.get("swtpm")) -// .map(d -> new CommandDefinition("swtpm", d)).orElse(null); -// qemuDefinition = Optional.ofNullable(tplData.get("qemu")) -// .map(d -> new CommandDefinition("qemu", d)).orElse(null); -// -// // Forward some values to child components -// qemuMonitor.configure(config.monitorSocket, -// config.vm.powerdownTimeout); -// } catch (IllegalArgumentException | IOException | TemplateException e) { -// logger.log(Level.SEVERE, e, () -> "Invalid configuration: " -// + e.getMessage()); -// // Don't use default configuration -// config = null; -// } + @Handler(priority = 100) + public void onStart(Start event) throws IOException, ApiException { + var client = Config.defaultClient(); + Configuration.setDefaultApiClient(client); } } diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/DisksReconciler.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/DisksReconciler.java index 32ee7d6..a65bfcd 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/DisksReconciler.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/DisksReconciler.java @@ -111,12 +111,12 @@ import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME; throws ApiException { // Get API and check and list related var pvcApi = K8s.pvcApi(channel.client()); - var pvcs = pvcApi.list(event.metadata().getNamespace(), + var pvcs = pvcApi.list(event.object().getMetadata().getNamespace(), new ListOptions().labelSelector( "app.kubernetes.io/managed-by=" + VM_OP_NAME + ",app.kubernetes.io/name=" + APP_NAME + ",app.kubernetes.io/instance=" - + event.metadata().getName())); + + event.object().getMetadata().getName())); for (var pvc : pvcs.getObject().getItems()) { K8s.delete(pvcApi, pvc); } diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/K8s.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/K8s.java index f2f84a5..03b3545 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/K8s.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/K8s.java @@ -40,6 +40,16 @@ import java.util.Optional; @SuppressWarnings({ "PMD.ShortClassName", "PMD.UseUtilityClass" }) public class K8s { + /** + * Given a groupVersion, returns only the version. + * + * @param groupVersion the group version + * @return the string + */ + public static String version(String groupVersion) { + return groupVersion.substring(groupVersion.lastIndexOf('/') + 1); + } + /** * Get PVC API. * diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/PodReconciler.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/PodReconciler.java index 8ccc3fe..9967c8b 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/PodReconciler.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/PodReconciler.java @@ -64,7 +64,7 @@ import org.jdrupes.vmoperator.manager.VmDefChanged.Type; // Check if exists DynamicKubernetesApi podApi = new DynamicKubernetesApi("", "v1", "pods", channel.client()); - var existing = K8s.get(podApi, event.metadata()); + var existing = K8s.get(podApi, event.object().getMetadata()); // Get state var state = GsonPtr.to((JsonObject) model.get("cr")).to("spec", "vm") diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Reconciler.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Reconciler.java index 9181ef2..58f0717 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Reconciler.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Reconciler.java @@ -33,7 +33,6 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; import static org.jdrupes.vmoperator.manager.Constants.VM_OP_GROUP; -import static org.jdrupes.vmoperator.manager.Constants.VM_OP_VERSION; import org.jdrupes.vmoperator.manager.VmDefChanged.Type; import org.jdrupes.vmoperator.util.ExtendedObjectWrapper; import org.jgrapes.core.Channel; @@ -96,9 +95,10 @@ public class Reconciler extends Component { public void onVmDefChanged(VmDefChanged event, WatchChannel channel) throws ApiException, TemplateException, IOException { // Get complete VM (CR) definition + var apiVersion = K8s.version(event.object().getApiVersion()); DynamicKubernetesApi vmCrApi = new DynamicKubernetesApi(VM_OP_GROUP, - VM_OP_VERSION, event.crd().getName(), channel.client()); - var defMeta = event.metadata(); + apiVersion, event.crd().getName(), channel.client()); + var defMeta = event.object().getMetadata(); // Get common data for all reconciles DynamicKubernetesObject vmDef = null; diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmDefChanged.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmDefChanged.java index d3e6f34..a4fd905 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmDefChanged.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmDefChanged.java @@ -19,7 +19,7 @@ package org.jdrupes.vmoperator.manager; import io.kubernetes.client.openapi.models.V1APIResource; -import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1Namespace; import org.jgrapes.core.Channel; import org.jgrapes.core.Components; import org.jgrapes.core.Event; @@ -38,7 +38,7 @@ public class VmDefChanged extends Event { private final Type type; private final V1APIResource crd; - private final V1ObjectMeta metadata; + private final V1Namespace object; /** * Instantiates a new VM changed event. @@ -46,10 +46,10 @@ public class VmDefChanged extends Event { * @param type the type * @param metadata the metadata */ - public VmDefChanged(Type type, V1APIResource crd, V1ObjectMeta metadata) { + public VmDefChanged(Type type, V1APIResource crd, V1Namespace object) { this.type = type; this.crd = crd; - this.metadata = metadata; + this.object = object; } /** @@ -71,19 +71,19 @@ public class VmDefChanged extends Event { } /** - * Returns the metadata. + * Returns the object. * - * @return the metadata + * @return the object. */ - public V1ObjectMeta metadata() { - return metadata; + public V1Namespace object() { + return object; } @Override public String toString() { StringBuilder builder = new StringBuilder(); builder.append(Components.objectName(this)).append(" [").append(type) - .append(' ').append(metadata.getName()); + .append(' ').append(object.getMetadata().getName()); if (channels() != null) { builder.append(", channels="); builder.append(Channel.toString(channels())); diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmWatcher.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmWatcher.java index e3562cd..8603f95 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmWatcher.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmWatcher.java @@ -22,11 +22,13 @@ import com.google.gson.reflect.TypeToken; import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.Configuration; +import io.kubernetes.client.openapi.apis.ApisApi; import io.kubernetes.client.openapi.apis.CustomObjectsApi; +import io.kubernetes.client.openapi.models.V1APIGroup; import io.kubernetes.client.openapi.models.V1APIResource; +import io.kubernetes.client.openapi.models.V1GroupVersionForDiscovery; import io.kubernetes.client.openapi.models.V1Namespace; import io.kubernetes.client.openapi.models.V1ObjectMeta; -import io.kubernetes.client.util.Config; import io.kubernetes.client.util.Watch; import java.io.IOException; import java.util.Map; @@ -34,7 +36,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import okhttp3.Call; import static org.jdrupes.vmoperator.manager.Constants.VM_OP_GROUP; -import static org.jdrupes.vmoperator.manager.Constants.VM_OP_VERSION; import org.jdrupes.vmoperator.manager.VmDefChanged.Type; import org.jgrapes.core.Channel; import org.jgrapes.core.Component; @@ -49,7 +50,6 @@ import org.jgrapes.core.events.Stop; public class VmWatcher extends Component { private ApiClient client; - private V1APIResource vmsCrd; private String managedNamespace = "qemu-vms"; private final Map channels = new ConcurrentHashMap<>(); @@ -72,44 +72,67 @@ public class VmWatcher extends Component { */ @Handler public void onStart(Start event) throws IOException, ApiException { - client = Config.defaultClient(); - Configuration.setDefaultApiClient(client); - - // Get access to API + client = Configuration.getDefaultApiClient(); + // Get all our API versions + var apis = new ApisApi(client).getAPIVersions(); + var vmOpApiVersions = apis.getGroups().stream() + .filter(g -> g.getName().equals(VM_OP_GROUP)).findFirst() + .map(V1APIGroup::getVersions).stream().flatMap(l -> l.stream()) + .map(V1GroupVersionForDiscovery::getVersion).toList(); var coa = new CustomObjectsApi(client); + for (var version : vmOpApiVersions) { + // Start a watcher for each existing CRD version. + coa.getAPIResources(VM_OP_GROUP, version) + .getResources().stream() + .filter(r -> Constants.VM_OP_KIND_VM.equals(r.getKind())) + .findFirst() + .ifPresent(crd -> serveCrVersion(coa, crd, version)); + } + } - // Derive all information from the CRD - var resources - = coa.getAPIResources(VM_OP_GROUP, VM_OP_VERSION); - vmsCrd = resources.getResources().stream() - .filter(r -> Constants.VM_OP_KIND_VM.equals(r.getKind())) - .findFirst().get(); - - // Watch the resources (vm definitions) - Call call = coa.listNamespacedCustomObjectCall( - VM_OP_GROUP, VM_OP_VERSION, managedNamespace, vmsCrd.getName(), - null, false, null, null, null, null, null, null, null, true, null); - new Thread(() -> { - try (Watch watch = Watch.createWatch(client, call, - new TypeToken>() { - }.getType())) { - for (Watch.Response item : watch) { - handleVmDefinitionEvent(item); + private void serveCrVersion(CustomObjectsApi coa, V1APIResource crd, + String version) { + Call call; + try { + call = coa.listNamespacedCustomObjectCall(VM_OP_GROUP, version, + managedNamespace, crd.getName(), null, false, null, null, null, + null, null, null, null, true, null); + } catch (ApiException e) { + logger.log(Level.FINE, e, + () -> "Probem watching: " + e.getMessage()); + return; + } + @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") + var watcher = new Thread(() -> { + try { + // Watch sometimes terminates without apparent reason. + while (true) { + try (Watch watch + = Watch.createWatch(client, call, + new TypeToken>() { + }.getType())) { + for (Watch.Response item : watch) { + handleVmDefinitionEvent(crd, item); + } + } } } catch (IOException | ApiException e) { - logger.log(Level.FINE, e, () -> "Probem while watching: " + logger.log(Level.FINE, e, () -> "Probem watching: " + e.getMessage()); } fire(new Stop()); - }).start(); + }); + watcher.setDaemon(true); + watcher.start(); } - private void handleVmDefinitionEvent(Watch.Response item) { + private void handleVmDefinitionEvent(V1APIResource vmsCrd, + Watch.Response item) { V1ObjectMeta metadata = item.object.getMetadata(); WatchChannel channel = channels.computeIfAbsent(metadata.getName(), k -> new WatchChannel(channel(), newEventPipeline(), client)); - channel.pipeline().fire(new VmDefChanged( - VmDefChanged.Type.valueOf(item.type), vmsCrd, metadata), channel); + channel.pipeline().fire(new VmDefChanged(VmDefChanged.Type + .valueOf(item.type), vmsCrd, item.object), channel); } /** @@ -121,7 +144,7 @@ public class VmWatcher extends Component { @Handler(priority = -10_000) public void onVmDefChanged(VmDefChanged event, WatchChannel channel) { if (event.type() == Type.DELETED) { - channels.remove(event.metadata().getName()); + channels.remove(event.object().getMetadata().getName()); } }