Add assignment based on last usage.

This commit is contained in:
Michael Lipp 2025-01-25 13:35:51 +01:00
parent 877d4c69cd
commit 5d722abd2e
9 changed files with 186 additions and 32 deletions

View file

@ -18,21 +18,26 @@
package org.jdrupes.vmoperator.manager;
import com.google.gson.JsonObject;
import io.kubernetes.client.apimachinery.GroupVersionKind;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.util.Watch;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP;
import static org.jdrupes.vmoperator.common.Constants.VM_OP_KIND_VM;
import org.jdrupes.vmoperator.common.K8s;
import org.jdrupes.vmoperator.common.K8sClient;
import org.jdrupes.vmoperator.common.K8sDynamicModel;
import org.jdrupes.vmoperator.common.K8sDynamicModels;
import org.jdrupes.vmoperator.common.K8sDynamicStub;
import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
import org.jdrupes.vmoperator.common.VmDefinitionStub;
import org.jdrupes.vmoperator.common.VmPool;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_KIND_VM_POOL;
import org.jdrupes.vmoperator.manager.events.GetPools;
@ -129,6 +134,7 @@ public class PoolMonitor extends
var vmPool = pools.computeIfAbsent(poolName, k -> new VmPool(poolName));
var newData = client().getJSON().getGson().fromJson(
GsonPtr.to(poolModel.data()).to("spec").get(), VmPool.class);
vmPool.setRetention(newData.retention());
vmPool.setPermissions(newData.permissions());
vmPool.setDefined(true);
poolPipeline.fire(new VmPoolChanged(vmPool));
@ -138,13 +144,15 @@ public class PoolMonitor extends
* Track VM definition changes.
*
* @param event the event
* @throws ApiException
*/
@Handler
public void onVmDefChanged(VmDefChanged event) {
String vmName = event.vmDefinition().name();
public void onVmDefChanged(VmDefChanged event) throws ApiException {
final var vmDef = event.vmDefinition();
final String vmName = vmDef.name();
switch (event.type()) {
case ADDED:
event.vmDefinition().<List<String>> fromSpec("pools")
vmDef.<List<String>> fromSpec("pools")
.orElse(Collections.emptyList()).stream().forEach(p -> {
pools.computeIfAbsent(p, k -> new VmPool(p))
.vms().add(vmName);
@ -157,10 +165,34 @@ public class PoolMonitor extends
poolPipeline.fire(new VmPoolChanged(p));
}
});
break;
return;
default:
break;
}
// Sync last usage to console state change if user matches
var assignedTo = vmDef.assignedTo().orElse(null);
if (assignedTo == null || !assignedTo
.equals(vmDef.<String> fromStatus("consoleUser").orElse(null))) {
return;
}
var lastUsed
= vmDef.assignmentLastUsed().orElse(Instant.ofEpochSecond(0));
var conChange = vmDef.condition("ConsoleConnected")
.map(c -> c.getLastTransitionTime().toInstant())
.orElse(Instant.ofEpochSecond(0));
if (!conChange.isAfter(lastUsed)) {
return;
}
var vmStub = VmDefinitionStub.get(client(),
new GroupVersionKind(VM_OP_GROUP, "", VM_OP_KIND_VM),
vmDef.namespace(), vmDef.name());
vmStub.updateStatus(from -> {
JsonObject status = from.status();
var assignment = GsonPtr.to(status).to("assignment");
assignment.set("lastUsed", conChange.toString());
return status;
});
}
/**

View file

@ -1,6 +1,6 @@
/*
* VM-Operator
* Copyright (C) 2023,2024 Michael N. Lipp
* Copyright (C) 2023,2025 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
@ -25,7 +25,9 @@ import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.util.Watch;
import io.kubernetes.client.util.generic.options.ListOptions;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Level;
@ -43,10 +45,12 @@ import org.jdrupes.vmoperator.common.VmDefinition;
import org.jdrupes.vmoperator.common.VmDefinitionModel;
import org.jdrupes.vmoperator.common.VmDefinitionModels;
import org.jdrupes.vmoperator.common.VmDefinitionStub;
import org.jdrupes.vmoperator.common.VmPool;
import static org.jdrupes.vmoperator.manager.Constants.APP_NAME;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
import org.jdrupes.vmoperator.manager.events.AssignVm;
import org.jdrupes.vmoperator.manager.events.ChannelManager;
import org.jdrupes.vmoperator.manager.events.GetPools;
import org.jdrupes.vmoperator.manager.events.GetVms;
import org.jdrupes.vmoperator.manager.events.GetVms.VmData;
import org.jdrupes.vmoperator.manager.events.ModifyVm;
@ -245,28 +249,59 @@ public class VmMonitor extends
*
* @param event the event
* @throws ApiException the api exception
* @throws InterruptedException
*/
@Handler
public void onAssignVm(AssignVm event) throws ApiException {
// Search for existing assignment.
var assignedVm = channelManager.channels().stream()
.filter(c -> c.vmDefinition().assignedFrom()
.map(p -> p.equals(event.fromPool())).orElse(false))
.filter(c -> c.vmDefinition().assignedTo()
.map(u -> u.equals(event.toUser())).orElse(false))
.findFirst();
if (assignedVm.isPresent()) {
event.setResult(new VmData(assignedVm.get().vmDefinition(),
assignedVm.get()));
return;
}
@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
public void onAssignVm(AssignVm event)
throws ApiException, InterruptedException {
VmPool vmPool = null;
while (true) {
// Search for existing assignment.
var assignedVm = channelManager.channels().stream()
.filter(c -> c.vmDefinition().assignedFrom()
.map(p -> p.equals(event.fromPool())).orElse(false))
.filter(c -> c.vmDefinition().assignedTo()
.map(u -> u.equals(event.toUser())).orElse(false))
.findFirst();
if (assignedVm.isPresent()) {
var vmDef = assignedVm.get().vmDefinition();
event.setResult(new VmData(vmDef, assignedVm.get()));
return;
}
// Find available VM.
assignedVm = channelManager.channels().stream()
.filter(c -> c.vmDefinition().pools().contains(event.fromPool()))
.filter(c -> c.vmDefinition().assignedTo().isEmpty())
.findFirst();
if (assignedVm.isPresent()) {
// Get the pool definition for retention time calculations
if (vmPool == null) {
vmPool = newEventPipeline().fire(new GetPools()
.withName(event.fromPool())).get().stream().findFirst()
.orElse(null);
if (vmPool == null) {
return;
}
}
// Find available VM.
var pool = vmPool;
assignedVm = channelManager.channels().stream()
.filter(c -> c.vmDefinition().pools()
.contains(event.fromPool()))
.filter(c -> !c.vmDefinition()
.conditionStatus("ConsoleConnected").orElse(false))
.filter(c -> c.vmDefinition().assignedTo().isEmpty()
|| pool.retainUntil(c.vmDefinition()
.<String> fromStatus("assignment", "lastUsed")
.map(Instant::parse).orElse(Instant.ofEpochSecond(0)))
.isBefore(Instant.now()))
.sorted(Comparator.comparing(c -> c.vmDefinition()
.assignmentLastUsed().orElse(Instant.ofEpochSecond(0))))
.findFirst();
// None found
if (assignedVm.isEmpty()) {
return;
}
// Assign to user
var vmDef = assignedVm.get().vmDefinition();
var vmStub = VmDefinitionStub.get(client(),
new GroupVersionKind(VM_OP_GROUP, "", VM_OP_KIND_VM),
@ -276,14 +311,13 @@ public class VmMonitor extends
var assignment = GsonPtr.to(status).to("assignment");
assignment.set("pool", event.fromPool());
assignment.set("user", event.toUser());
assignment.set("lastUsed", Instant.now().toString());
return status;
});
// Always start a newly assigned VM.
// Make sure that a newly assigned VM is running.
fire(new ModifyVm(vmDef.name(), "state", "Running",
assignedVm.get()));
event.setResult(new VmData(vmDef, assignedVm.get()));
}
}
}