Actively add pod info, don't run queries.

This commit is contained in:
Michael Lipp 2025-03-16 15:13:16 +01:00
parent ce4d0bfb72
commit 227c097c01
8 changed files with 287 additions and 41 deletions

View file

@ -18,7 +18,6 @@
package org.jdrupes.vmoperator.common;
// TODO: Auto-generated Javadoc
/**
* Some constants.
*/

View file

@ -75,6 +75,15 @@ public class VmExtraData {
return nodeName;
}
/**
* Gets the node addresses.
*
* @return the nodeAddresses
*/
public List<String> nodeAddresses() {
return nodeAddresses;
}
/**
* Sets the reset count.
*

View file

@ -0,0 +1,75 @@
/*
* VM-Operator
* Copyright (C) 2023 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.manager.events;
import io.kubernetes.client.openapi.models.V1Pod;
import org.jdrupes.vmoperator.common.K8sObserver;
import org.jgrapes.core.Channel;
import org.jgrapes.core.Components;
import org.jgrapes.core.Event;
/**
* Indicates a change in a pod that runs a VM.
*/
public class PodChanged extends Event<Void> {
private final V1Pod pod;
private final K8sObserver.ResponseType type;
/**
* Instantiates a new VM changed event.
*
* @param pod the pod
* @param type the type
*/
public PodChanged(V1Pod pod, K8sObserver.ResponseType type) {
this.pod = pod;
this.type = type;
}
/**
* Gets the pod.
*
* @return the pod
*/
public V1Pod pod() {
return pod;
}
/**
* Returns the type.
*
* @return the type
*/
public K8sObserver.ResponseType type() {
return type;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(Components.objectName(this)).append(" [")
.append(pod.getMetadata().getName()).append(' ').append(type);
if (channels() != null) {
builder.append(", channels=").append(Channel.toString(channels()));
}
builder.append(']');
return builder.toString();
}
}

View file

@ -21,6 +21,7 @@ package org.jdrupes.vmoperator.manager.events;
import org.jdrupes.vmoperator.common.K8sClient;
import org.jdrupes.vmoperator.common.VmDefinition;
import org.jgrapes.core.Channel;
import org.jgrapes.core.Event;
import org.jgrapes.core.EventPipeline;
import org.jgrapes.core.Subchannel.DefaultSubchannel;
@ -104,6 +105,19 @@ public class VmChannel extends DefaultSubchannel {
return pipeline;
}
/**
* Fire the given event on this channel, using the associated
* {@link #pipeline()}.
*
* @param <T> the generic type
* @param event the event
* @return the t
*/
public <T extends Event<?>> T fire(T event) {
pipeline.fire(event, this);
return event;
}
/**
* Returns the API client.
*

View file

@ -0,0 +1 @@
/logging.properties

View file

@ -113,6 +113,7 @@ public class Controller extends Component {
// attach(new ServiceMonitor(channel()).channelManager(chanMgr));
attach(new Reconciler(channel()));
attach(new PoolMonitor(channel()));
attach(new PodMonitor(channel(), chanMgr));
}
/**

View file

@ -0,0 +1,138 @@
/*
* VM-Operator
* Copyright (C) 2025 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.manager;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.util.Watch.Response;
import io.kubernetes.client.util.generic.options.ListOptions;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import static org.jdrupes.vmoperator.common.Constants.APP_NAME;
import static org.jdrupes.vmoperator.common.Constants.VM_OP_NAME;
import org.jdrupes.vmoperator.common.K8sClient;
import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
import org.jdrupes.vmoperator.common.K8sV1PodStub;
import org.jdrupes.vmoperator.manager.events.ChannelDictionary;
import org.jdrupes.vmoperator.manager.events.PodChanged;
import org.jdrupes.vmoperator.manager.events.VmChannel;
import org.jdrupes.vmoperator.manager.events.VmDefChanged;
import org.jgrapes.core.Channel;
import org.jgrapes.core.annotation.Handler;
/**
* Watches for changes of pods that run VMs.
*/
public class PodMonitor extends AbstractMonitor<V1Pod, V1PodList, VmChannel> {
private final ChannelDictionary<String, VmChannel, ?> channelDictionary;
private final Map<String, PendingChange> pendingChanges
= new ConcurrentHashMap<>();
/**
* Instantiates a new pod monitor.
*
* @param componentChannel the component channel
* @param channelDictionary the channel dictionary
*/
public PodMonitor(Channel componentChannel,
ChannelDictionary<String, VmChannel, ?> channelDictionary) {
super(componentChannel, V1Pod.class, V1PodList.class);
this.channelDictionary = channelDictionary;
context(K8sV1PodStub.CONTEXT);
ListOptions options = new ListOptions();
options.setLabelSelector("app.kubernetes.io/name=" + APP_NAME + ","
+ "app.kubernetes.io/component=" + APP_NAME + ","
+ "app.kubernetes.io/managed-by=" + VM_OP_NAME);
options(options);
}
@Override
protected void prepareMonitoring() throws IOException, ApiException {
client(new K8sClient());
}
@Override
protected void handleChange(K8sClient client, Response<V1Pod> change) {
String vmName = change.object.getMetadata().getLabels()
.get("app.kubernetes.io/instance");
if (vmName == null) {
return;
}
var channel = channelDictionary.channel(vmName).orElse(null);
var responseType = ResponseType.valueOf(change.type);
if (channel != null && channel.vmDefinition() != null) {
pendingChanges.remove(vmName);
channel.fire(new PodChanged(change.object, responseType));
return;
}
// VM definition not available yet, may happen during startup
if (responseType == ResponseType.DELETED) {
return;
}
purgePendingChanges();
logger.finer(() -> "Add pending pod change for " + vmName);
pendingChanges.put(vmName, new PendingChange(Instant.now(), change));
}
private void purgePendingChanges() {
Instant tooOld = Instant.now().minus(Duration.ofMinutes(15));
for (var itr = pendingChanges.entrySet().iterator(); itr.hasNext();) {
var change = itr.next();
if (change.getValue().from().isBefore(tooOld)) {
itr.remove();
logger.finer(
() -> "Cleaned pending pod change for " + change.getKey());
}
}
}
/**
* Check for pending changes.
*
* @param event the event
* @param channel the channel
*/
@Handler
public void onVmDefChanged(VmDefChanged event, VmChannel channel) {
Optional.ofNullable(pendingChanges.remove(event.vmDefinition().name()))
.map(PendingChange::change).ifPresent(change -> {
logger.finer(() -> "Firing pending pod change for "
+ event.vmDefinition().name());
channel.fire(new PodChanged(change.object,
ResponseType.valueOf(change.type)));
if (logger.isLoggable(Level.FINER)
&& pendingChanges.isEmpty()) {
logger.finer("No pending pod changes left.");
}
});
}
private record PendingChange(Instant from, Response<V1Pod> change) {
}
}

View file

@ -25,11 +25,12 @@ import io.kubernetes.client.util.generic.options.ListOptions;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.stream.Collectors;
import org.jdrupes.vmoperator.common.Constants.Crd;
import org.jdrupes.vmoperator.common.K8s;
@ -37,7 +38,6 @@ import org.jdrupes.vmoperator.common.K8sClient;
import org.jdrupes.vmoperator.common.K8sDynamicStub;
import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub;
import org.jdrupes.vmoperator.common.K8sV1PodStub;
import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub;
import org.jdrupes.vmoperator.common.VmDefinition;
import org.jdrupes.vmoperator.common.VmDefinition.Assignment;
@ -53,6 +53,7 @@ import org.jdrupes.vmoperator.manager.events.GetPools;
import org.jdrupes.vmoperator.manager.events.GetVms;
import org.jdrupes.vmoperator.manager.events.GetVms.VmData;
import org.jdrupes.vmoperator.manager.events.ModifyVm;
import org.jdrupes.vmoperator.manager.events.PodChanged;
import org.jdrupes.vmoperator.manager.events.UpdateAssignment;
import org.jdrupes.vmoperator.manager.events.VmChannel;
import org.jdrupes.vmoperator.manager.events.VmDefChanged;
@ -140,7 +141,7 @@ public class VmMonitor extends
}
if (vmDef.data() != null) {
// New data, augment and save
addExtraData(channel.client(), vmDef, channel.vmDefinition());
addExtraData(vmDef, channel.vmDefinition());
channel.setVmDefinition(vmDef);
} else {
// Reuse cached (e.g. if deleted)
@ -166,7 +167,7 @@ public class VmMonitor extends
chgEvt = Event.onCompletion(chgEvt,
e -> channelManager.remove(e.vmDefinition().name()));
}
channel.pipeline().fire(chgEvt, channel);
channel.fire(chgEvt);
}
private VmDefinition getModel(K8sClient client, VmDefinition vmDef) {
@ -179,46 +180,56 @@ public class VmMonitor extends
}
@SuppressWarnings("PMD.AvoidDuplicateLiterals")
private void addExtraData(K8sClient client, VmDefinition vmDef,
VmDefinition prevState) {
private void addExtraData(VmDefinition vmDef, VmDefinition prevState) {
var extra = new VmExtraData(vmDef);
var prevExtra
= Optional.ofNullable(prevState).flatMap(VmDefinition::extra);
// Maintain (or initialize) the resetCount
extra.resetCount(
Optional.ofNullable(prevState).flatMap(VmDefinition::extra)
.map(VmExtraData::resetCount).orElse(0L));
extra.resetCount(prevExtra.map(VmExtraData::resetCount).orElse(0L));
// VM definition status changes before the pod terminates.
// This results in pod information being shown for a stopped
// VM which is irritating. So check condition first.
if (!vmDef.conditionStatus("Running").orElse(false)) {
// Maintain node info
prevExtra
.ifPresent(e -> extra.nodeInfo(e.nodeName(), e.nodeAddresses()));
}
/**
* On pod changed.
*
* @param event the event
* @param channel the channel
*/
@Handler
public void onPodChanged(PodChanged event, VmChannel channel) {
if (channel.vmDefinition().extra().isEmpty()) {
return;
}
// Get pod and extract node information.
var podSearch = new ListOptions();
podSearch.setLabelSelector("app.kubernetes.io/name=" + APP_NAME
+ ",app.kubernetes.io/component=" + APP_NAME
+ ",app.kubernetes.io/instance=" + vmDef.name());
try {
var podList
= K8sV1PodStub.list(client, namespace(), podSearch);
for (var podStub : podList) {
var nodeName = podStub.model().get().getSpec().getNodeName();
logger.finer(() -> "Adding node name " + nodeName
+ " to VM info for " + vmDef.name());
@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
var addrs = new ArrayList<String>();
podStub.model().get().getStatus().getPodIPs().stream()
.map(ip -> ip.getIp()).forEach(addrs::add);
logger.finer(() -> "Adding node addresses " + addrs
+ " to VM info for " + vmDef.name());
extra.nodeInfo(nodeName, addrs);
var extra = channel.vmDefinition().extra().get();
var pod = event.pod();
if (event.type() == ResponseType.DELETED) {
// The status of a deleted pod is the status before deletion,
// i.e. the node info is still there.
extra.nodeInfo("", Collections.emptyList());
} else {
var nodeName = Optional
.ofNullable(pod.getSpec().getNodeName()).orElse("");
logger.finer(() -> "Adding node name " + nodeName
+ " to VM info for " + channel.vmDefinition().name());
@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
var addrs = new ArrayList<String>();
Optional.ofNullable(pod.getStatus().getPodIPs())
.orElse(Collections.emptyList()).stream()
.map(ip -> ip.getIp()).forEach(addrs::add);
logger.finer(() -> "Adding node addresses " + addrs
+ " to VM info for " + channel.vmDefinition().name());
if (Objects.equals(nodeName, extra.nodeName())
&& Objects.equals(addrs, extra.nodeAddresses())) {
return;
}
} catch (ApiException e) {
logger.log(Level.WARNING, e,
() -> "Cannot access node information: " + e.getMessage());
extra.nodeInfo(nodeName, addrs);
}
channel.fire(new VmDefChanged(ResponseType.MODIFIED, false,
channel.vmDefinition()));
}
/**
@ -293,10 +304,8 @@ public class VmMonitor extends
// Assign to user
var chosenVm = vmQuery.get();
var vmPipeline = chosenVm.pipeline();
if (Optional.ofNullable(vmPipeline.fire(new UpdateAssignment(
vmPool, event.toUser()), chosenVm).get())
.orElse(false)) {
if (Optional.ofNullable(chosenVm.fire(new UpdateAssignment(
vmPool, event.toUser())).get()).orElse(false)) {
var vmDef = chosenVm.vmDefinition();
event.setResult(new VmData(vmDef, chosenVm));