diff --git a/org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/Constants.java b/org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/Constants.java index 67939de..b9de69f 100644 --- a/org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/Constants.java +++ b/org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/Constants.java @@ -18,7 +18,6 @@ package org.jdrupes.vmoperator.common; -// TODO: Auto-generated Javadoc /** * Some constants. */ diff --git a/org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/VmExtraData.java b/org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/VmExtraData.java index 85913c2..79f262f 100644 --- a/org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/VmExtraData.java +++ b/org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/VmExtraData.java @@ -75,6 +75,15 @@ public class VmExtraData { return nodeName; } + /** + * Gets the node addresses. + * + * @return the nodeAddresses + */ + public List nodeAddresses() { + return nodeAddresses; + } + /** * Sets the reset count. * diff --git a/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/PodChanged.java b/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/PodChanged.java new file mode 100644 index 0000000..8bbcfe8 --- /dev/null +++ b/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/PodChanged.java @@ -0,0 +1,75 @@ +/* + * VM-Operator + * Copyright (C) 2023 Michael N. Lipp + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jdrupes.vmoperator.manager.events; + +import io.kubernetes.client.openapi.models.V1Pod; +import org.jdrupes.vmoperator.common.K8sObserver; +import org.jgrapes.core.Channel; +import org.jgrapes.core.Components; +import org.jgrapes.core.Event; + +/** + * Indicates a change in a pod that runs a VM. + */ +public class PodChanged extends Event { + + private final V1Pod pod; + private final K8sObserver.ResponseType type; + + /** + * Instantiates a new VM changed event. + * + * @param pod the pod + * @param type the type + */ + public PodChanged(V1Pod pod, K8sObserver.ResponseType type) { + this.pod = pod; + this.type = type; + } + + /** + * Gets the pod. + * + * @return the pod + */ + public V1Pod pod() { + return pod; + } + + /** + * Returns the type. + * + * @return the type + */ + public K8sObserver.ResponseType type() { + return type; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append(Components.objectName(this)).append(" [") + .append(pod.getMetadata().getName()).append(' ').append(type); + if (channels() != null) { + builder.append(", channels=").append(Channel.toString(channels())); + } + builder.append(']'); + return builder.toString(); + } +} diff --git a/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/VmChannel.java b/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/VmChannel.java index 5ea282a..7b03ad3 100644 --- a/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/VmChannel.java +++ b/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/VmChannel.java @@ -21,6 +21,7 @@ package org.jdrupes.vmoperator.manager.events; import org.jdrupes.vmoperator.common.K8sClient; import org.jdrupes.vmoperator.common.VmDefinition; import org.jgrapes.core.Channel; +import org.jgrapes.core.Event; import org.jgrapes.core.EventPipeline; import org.jgrapes.core.Subchannel.DefaultSubchannel; @@ -104,6 +105,19 @@ public class VmChannel extends DefaultSubchannel { return pipeline; } + /** + * Fire the given event on this channel, using the associated + * {@link #pipeline()}. + * + * @param the generic type + * @param event the event + * @return the t + */ + public > T fire(T event) { + pipeline.fire(event, this); + return event; + } + /** * Returns the API client. * diff --git a/org.jdrupes.vmoperator.manager/.gitignore b/org.jdrupes.vmoperator.manager/.gitignore new file mode 100644 index 0000000..50a6b62 --- /dev/null +++ b/org.jdrupes.vmoperator.manager/.gitignore @@ -0,0 +1 @@ +/logging.properties diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java index 5d5c592..5531d8d 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java @@ -113,6 +113,7 @@ public class Controller extends Component { // attach(new ServiceMonitor(channel()).channelManager(chanMgr)); attach(new Reconciler(channel())); attach(new PoolMonitor(channel())); + attach(new PodMonitor(channel(), chanMgr)); } /** diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/PodMonitor.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/PodMonitor.java new file mode 100644 index 0000000..9145d9d --- /dev/null +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/PodMonitor.java @@ -0,0 +1,138 @@ +/* + * VM-Operator + * Copyright (C) 2025 Michael N. Lipp + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jdrupes.vmoperator.manager; + +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.models.V1Pod; +import io.kubernetes.client.openapi.models.V1PodList; +import io.kubernetes.client.util.Watch.Response; +import io.kubernetes.client.util.generic.options.ListOptions; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Level; +import static org.jdrupes.vmoperator.common.Constants.APP_NAME; +import static org.jdrupes.vmoperator.common.Constants.VM_OP_NAME; +import org.jdrupes.vmoperator.common.K8sClient; +import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; +import org.jdrupes.vmoperator.common.K8sV1PodStub; +import org.jdrupes.vmoperator.manager.events.ChannelDictionary; +import org.jdrupes.vmoperator.manager.events.PodChanged; +import org.jdrupes.vmoperator.manager.events.VmChannel; +import org.jdrupes.vmoperator.manager.events.VmDefChanged; +import org.jgrapes.core.Channel; +import org.jgrapes.core.annotation.Handler; + +/** + * Watches for changes of pods that run VMs. + */ +public class PodMonitor extends AbstractMonitor { + + private final ChannelDictionary channelDictionary; + + private final Map pendingChanges + = new ConcurrentHashMap<>(); + + /** + * Instantiates a new pod monitor. + * + * @param componentChannel the component channel + * @param channelDictionary the channel dictionary + */ + public PodMonitor(Channel componentChannel, + ChannelDictionary channelDictionary) { + super(componentChannel, V1Pod.class, V1PodList.class); + this.channelDictionary = channelDictionary; + context(K8sV1PodStub.CONTEXT); + ListOptions options = new ListOptions(); + options.setLabelSelector("app.kubernetes.io/name=" + APP_NAME + "," + + "app.kubernetes.io/component=" + APP_NAME + "," + + "app.kubernetes.io/managed-by=" + VM_OP_NAME); + options(options); + } + + @Override + protected void prepareMonitoring() throws IOException, ApiException { + client(new K8sClient()); + } + + @Override + protected void handleChange(K8sClient client, Response change) { + String vmName = change.object.getMetadata().getLabels() + .get("app.kubernetes.io/instance"); + if (vmName == null) { + return; + } + var channel = channelDictionary.channel(vmName).orElse(null); + var responseType = ResponseType.valueOf(change.type); + if (channel != null && channel.vmDefinition() != null) { + pendingChanges.remove(vmName); + channel.fire(new PodChanged(change.object, responseType)); + return; + } + + // VM definition not available yet, may happen during startup + if (responseType == ResponseType.DELETED) { + return; + } + purgePendingChanges(); + logger.finer(() -> "Add pending pod change for " + vmName); + pendingChanges.put(vmName, new PendingChange(Instant.now(), change)); + } + + private void purgePendingChanges() { + Instant tooOld = Instant.now().minus(Duration.ofMinutes(15)); + for (var itr = pendingChanges.entrySet().iterator(); itr.hasNext();) { + var change = itr.next(); + if (change.getValue().from().isBefore(tooOld)) { + itr.remove(); + logger.finer( + () -> "Cleaned pending pod change for " + change.getKey()); + } + } + } + + /** + * Check for pending changes. + * + * @param event the event + * @param channel the channel + */ + @Handler + public void onVmDefChanged(VmDefChanged event, VmChannel channel) { + Optional.ofNullable(pendingChanges.remove(event.vmDefinition().name())) + .map(PendingChange::change).ifPresent(change -> { + logger.finer(() -> "Firing pending pod change for " + + event.vmDefinition().name()); + channel.fire(new PodChanged(change.object, + ResponseType.valueOf(change.type))); + if (logger.isLoggable(Level.FINER) + && pendingChanges.isEmpty()) { + logger.finer("No pending pod changes left."); + } + }); + } + + private record PendingChange(Instant from, Response change) { + } + +} diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmMonitor.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmMonitor.java index e4ee0a0..f729240 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmMonitor.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmMonitor.java @@ -25,11 +25,12 @@ import io.kubernetes.client.util.generic.options.ListOptions; import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; import java.util.stream.Collectors; import org.jdrupes.vmoperator.common.Constants.Crd; import org.jdrupes.vmoperator.common.K8s; @@ -37,7 +38,6 @@ import org.jdrupes.vmoperator.common.K8sClient; import org.jdrupes.vmoperator.common.K8sDynamicStub; import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub; -import org.jdrupes.vmoperator.common.K8sV1PodStub; import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub; import org.jdrupes.vmoperator.common.VmDefinition; import org.jdrupes.vmoperator.common.VmDefinition.Assignment; @@ -53,6 +53,7 @@ import org.jdrupes.vmoperator.manager.events.GetPools; import org.jdrupes.vmoperator.manager.events.GetVms; import org.jdrupes.vmoperator.manager.events.GetVms.VmData; import org.jdrupes.vmoperator.manager.events.ModifyVm; +import org.jdrupes.vmoperator.manager.events.PodChanged; import org.jdrupes.vmoperator.manager.events.UpdateAssignment; import org.jdrupes.vmoperator.manager.events.VmChannel; import org.jdrupes.vmoperator.manager.events.VmDefChanged; @@ -140,7 +141,7 @@ public class VmMonitor extends } if (vmDef.data() != null) { // New data, augment and save - addExtraData(channel.client(), vmDef, channel.vmDefinition()); + addExtraData(vmDef, channel.vmDefinition()); channel.setVmDefinition(vmDef); } else { // Reuse cached (e.g. if deleted) @@ -166,7 +167,7 @@ public class VmMonitor extends chgEvt = Event.onCompletion(chgEvt, e -> channelManager.remove(e.vmDefinition().name())); } - channel.pipeline().fire(chgEvt, channel); + channel.fire(chgEvt); } private VmDefinition getModel(K8sClient client, VmDefinition vmDef) { @@ -179,46 +180,56 @@ public class VmMonitor extends } @SuppressWarnings("PMD.AvoidDuplicateLiterals") - private void addExtraData(K8sClient client, VmDefinition vmDef, - VmDefinition prevState) { + private void addExtraData(VmDefinition vmDef, VmDefinition prevState) { var extra = new VmExtraData(vmDef); + var prevExtra + = Optional.ofNullable(prevState).flatMap(VmDefinition::extra); // Maintain (or initialize) the resetCount - extra.resetCount( - Optional.ofNullable(prevState).flatMap(VmDefinition::extra) - .map(VmExtraData::resetCount).orElse(0L)); + extra.resetCount(prevExtra.map(VmExtraData::resetCount).orElse(0L)); - // VM definition status changes before the pod terminates. - // This results in pod information being shown for a stopped - // VM which is irritating. So check condition first. - if (!vmDef.conditionStatus("Running").orElse(false)) { + // Maintain node info + prevExtra + .ifPresent(e -> extra.nodeInfo(e.nodeName(), e.nodeAddresses())); + } + + /** + * On pod changed. + * + * @param event the event + * @param channel the channel + */ + @Handler + public void onPodChanged(PodChanged event, VmChannel channel) { + if (channel.vmDefinition().extra().isEmpty()) { return; } - - // Get pod and extract node information. - var podSearch = new ListOptions(); - podSearch.setLabelSelector("app.kubernetes.io/name=" + APP_NAME - + ",app.kubernetes.io/component=" + APP_NAME - + ",app.kubernetes.io/instance=" + vmDef.name()); - try { - var podList - = K8sV1PodStub.list(client, namespace(), podSearch); - for (var podStub : podList) { - var nodeName = podStub.model().get().getSpec().getNodeName(); - logger.finer(() -> "Adding node name " + nodeName - + " to VM info for " + vmDef.name()); - @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") - var addrs = new ArrayList(); - podStub.model().get().getStatus().getPodIPs().stream() - .map(ip -> ip.getIp()).forEach(addrs::add); - logger.finer(() -> "Adding node addresses " + addrs - + " to VM info for " + vmDef.name()); - extra.nodeInfo(nodeName, addrs); + var extra = channel.vmDefinition().extra().get(); + var pod = event.pod(); + if (event.type() == ResponseType.DELETED) { + // The status of a deleted pod is the status before deletion, + // i.e. the node info is still there. + extra.nodeInfo("", Collections.emptyList()); + } else { + var nodeName = Optional + .ofNullable(pod.getSpec().getNodeName()).orElse(""); + logger.finer(() -> "Adding node name " + nodeName + + " to VM info for " + channel.vmDefinition().name()); + @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") + var addrs = new ArrayList(); + Optional.ofNullable(pod.getStatus().getPodIPs()) + .orElse(Collections.emptyList()).stream() + .map(ip -> ip.getIp()).forEach(addrs::add); + logger.finer(() -> "Adding node addresses " + addrs + + " to VM info for " + channel.vmDefinition().name()); + if (Objects.equals(nodeName, extra.nodeName()) + && Objects.equals(addrs, extra.nodeAddresses())) { + return; } - } catch (ApiException e) { - logger.log(Level.WARNING, e, - () -> "Cannot access node information: " + e.getMessage()); + extra.nodeInfo(nodeName, addrs); } + channel.fire(new VmDefChanged(ResponseType.MODIFIED, false, + channel.vmDefinition())); } /** @@ -293,10 +304,8 @@ public class VmMonitor extends // Assign to user var chosenVm = vmQuery.get(); - var vmPipeline = chosenVm.pipeline(); - if (Optional.ofNullable(vmPipeline.fire(new UpdateAssignment( - vmPool, event.toUser()), chosenVm).get()) - .orElse(false)) { + if (Optional.ofNullable(chosenVm.fire(new UpdateAssignment( + vmPool, event.toUser())).get()).orElse(false)) { var vmDef = chosenVm.vmDefinition(); event.setResult(new VmData(vmDef, chosenVm));