Use VmChannel's event pipeline to update assignment.
This commit is contained in:
parent
9986e4c8bf
commit
54747b25e8
4 changed files with 169 additions and 84 deletions
|
|
@ -134,6 +134,78 @@ public class VmPool {
|
|||
return vms;
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect all permissions for the given user with the given roles.
|
||||
*
|
||||
* @param user the user
|
||||
* @param roles the roles
|
||||
* @return the sets the
|
||||
*/
|
||||
public Set<Permission> permissionsFor(String user,
|
||||
Collection<String> roles) {
|
||||
return permissions.stream()
|
||||
.filter(g -> DataPath.get(g, "user").map(u -> u.equals(user))
|
||||
.orElse(false)
|
||||
|| DataPath.get(g, "role").map(roles::contains).orElse(false))
|
||||
.map(g -> DataPath.<Set<Permission>> get(g, "may")
|
||||
.orElse(Collections.emptySet()).stream())
|
||||
.flatMap(Function.identity()).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the given VM belongs to the pool and is not in use.
|
||||
*
|
||||
* @param vmDef the vm def
|
||||
* @return true, if is assignable
|
||||
*/
|
||||
@SuppressWarnings("PMD.SimplifyBooleanReturns")
|
||||
public boolean isAssignable(VmDefinition vmDef) {
|
||||
// Check if the VM is in the pool
|
||||
if (!vmDef.pools().contains(name)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if the VM is not in use
|
||||
if (vmDef.consoleConnected()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If not assigned, it's usable
|
||||
if (vmDef.assignedTo().isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check if it is to be retained
|
||||
if (vmDef.assignmentLastUsed()
|
||||
.map(this::retainUntil)
|
||||
.map(ru -> Instant.now().isBefore(ru)).orElse(false)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Additional check in case lastUsed has not been updated
|
||||
// by PoolMonitor#onVmDefChanged() yet ("race condition")
|
||||
if (vmDef.condition("ConsoleConnected")
|
||||
.map(cc -> cc.getLastTransitionTime().toInstant())
|
||||
.map(this::retainUntil)
|
||||
.map(ru -> Instant.now().isBefore(ru)).orElse(false)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the instant until which an assignment should be retained.
|
||||
*
|
||||
* @param lastUsed the last used
|
||||
* @return the instant
|
||||
*/
|
||||
public Instant retainUntil(Instant lastUsed) {
|
||||
if (retention.startsWith("P")) {
|
||||
return lastUsed.plus(Duration.parse(retention));
|
||||
}
|
||||
return Instant.parse(retention);
|
||||
}
|
||||
|
||||
/**
|
||||
* To string.
|
||||
*
|
||||
|
|
@ -158,35 +230,4 @@ public class VmPool {
|
|||
builder.append(']');
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect all permissions for the given user with the given roles.
|
||||
*
|
||||
* @param user the user
|
||||
* @param roles the roles
|
||||
* @return the sets the
|
||||
*/
|
||||
public Set<Permission> permissionsFor(String user,
|
||||
Collection<String> roles) {
|
||||
return permissions.stream()
|
||||
.filter(g -> DataPath.get(g, "user").map(u -> u.equals(user))
|
||||
.orElse(false)
|
||||
|| DataPath.get(g, "role").map(roles::contains).orElse(false))
|
||||
.map(g -> DataPath.<Set<Permission>> get(g, "may")
|
||||
.orElse(Collections.emptySet()).stream())
|
||||
.flatMap(Function.identity()).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the instant until which an assignment should be retained.
|
||||
*
|
||||
* @param lastUsed the last used
|
||||
* @return the instant
|
||||
*/
|
||||
public Instant retainUntil(Instant lastUsed) {
|
||||
if (retention.startsWith("P")) {
|
||||
return lastUsed.plus(Duration.parse(retention));
|
||||
}
|
||||
return Instant.parse(retention);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* VM-Operator
|
||||
* Copyright (C) 2024 Michael N. Lipp
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.jdrupes.vmoperator.manager.events;
|
||||
|
||||
import org.jgrapes.core.Event;
|
||||
|
||||
/**
|
||||
* Note the assignment to a user in the VM status.
|
||||
*/
|
||||
@SuppressWarnings("PMD.DataClass")
|
||||
public class UpdateAssignment extends Event<Boolean> {
|
||||
|
||||
private final String usedPool;
|
||||
private final String toUser;
|
||||
|
||||
/**
|
||||
* Instantiates a new event.
|
||||
*
|
||||
* @param usedPool the used pool
|
||||
* @param toUser the to user
|
||||
*/
|
||||
public UpdateAssignment(String usedPool, String toUser) {
|
||||
this.usedPool = usedPool;
|
||||
this.toUser = toUser;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the pool to assign from.
|
||||
*
|
||||
* @return the pool
|
||||
*/
|
||||
public String usedPool() {
|
||||
return usedPool;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the user to assign to.
|
||||
*
|
||||
* @return the to user
|
||||
*/
|
||||
public String toUser() {
|
||||
return toUser;
|
||||
}
|
||||
}
|
||||
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.jdrupes.vmoperator.manager;
|
||||
|
||||
import com.google.gson.JsonObject;
|
||||
import io.kubernetes.client.apimachinery.GroupVersionKind;
|
||||
import io.kubernetes.client.custom.V1Patch;
|
||||
import io.kubernetes.client.openapi.ApiException;
|
||||
|
|
@ -25,16 +26,20 @@ import io.kubernetes.client.openapi.Configuration;
|
|||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Instant;
|
||||
import java.util.logging.Level;
|
||||
import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP;
|
||||
import static org.jdrupes.vmoperator.common.Constants.VM_OP_KIND_VM;
|
||||
import org.jdrupes.vmoperator.common.K8sClient;
|
||||
import org.jdrupes.vmoperator.common.K8sDynamicStub;
|
||||
import org.jdrupes.vmoperator.common.VmDefinitionStub;
|
||||
import org.jdrupes.vmoperator.manager.events.ChannelManager;
|
||||
import org.jdrupes.vmoperator.manager.events.Exit;
|
||||
import org.jdrupes.vmoperator.manager.events.ModifyVm;
|
||||
import org.jdrupes.vmoperator.manager.events.UpdateAssignment;
|
||||
import org.jdrupes.vmoperator.manager.events.VmChannel;
|
||||
import org.jdrupes.vmoperator.manager.events.VmDefChanged;
|
||||
import org.jdrupes.vmoperator.util.GsonPtr;
|
||||
import org.jgrapes.core.Channel;
|
||||
import org.jgrapes.core.Component;
|
||||
import org.jgrapes.core.annotation.Handler;
|
||||
|
|
@ -204,4 +209,29 @@ public class Controller extends Component {
|
|||
() -> "Cannot patch definition for Vm " + vmStub.name());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the assignment information in the status of the VM CR.
|
||||
*
|
||||
* @param event the event
|
||||
* @param channel the channel
|
||||
* @throws ApiException the api exception
|
||||
*/
|
||||
@Handler
|
||||
public void onUpdatedAssignment(UpdateAssignment event, VmChannel channel)
|
||||
throws ApiException {
|
||||
var vmDef = channel.vmDefinition();
|
||||
var vmStub = VmDefinitionStub.get(channel.client(),
|
||||
new GroupVersionKind(VM_OP_GROUP, "", VM_OP_KIND_VM),
|
||||
vmDef.namespace(), vmDef.name());
|
||||
vmStub.updateStatus(from -> {
|
||||
JsonObject status = from.status();
|
||||
var assignment = GsonPtr.to(status).to("assignment");
|
||||
assignment.set("pool", event.usedPool());
|
||||
assignment.set("user", event.toUser());
|
||||
assignment.set("lastUsed", Instant.now().toString());
|
||||
return status;
|
||||
});
|
||||
event.setResult(true);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,8 +18,6 @@
|
|||
|
||||
package org.jdrupes.vmoperator.manager;
|
||||
|
||||
import com.google.gson.JsonObject;
|
||||
import io.kubernetes.client.apimachinery.GroupVersionKind;
|
||||
import io.kubernetes.client.openapi.ApiException;
|
||||
import io.kubernetes.client.openapi.models.V1ObjectMeta;
|
||||
import io.kubernetes.client.util.Watch;
|
||||
|
|
@ -55,9 +53,9 @@ import org.jdrupes.vmoperator.manager.events.GetPools;
|
|||
import org.jdrupes.vmoperator.manager.events.GetVms;
|
||||
import org.jdrupes.vmoperator.manager.events.GetVms.VmData;
|
||||
import org.jdrupes.vmoperator.manager.events.ModifyVm;
|
||||
import org.jdrupes.vmoperator.manager.events.UpdateAssignment;
|
||||
import org.jdrupes.vmoperator.manager.events.VmChannel;
|
||||
import org.jdrupes.vmoperator.manager.events.VmDefChanged;
|
||||
import org.jdrupes.vmoperator.util.GsonPtr;
|
||||
import org.jgrapes.core.Channel;
|
||||
import org.jgrapes.core.Event;
|
||||
import org.jgrapes.core.annotation.Handler;
|
||||
|
|
@ -277,7 +275,7 @@ public class VmMonitor extends
|
|||
return;
|
||||
}
|
||||
|
||||
// Get the pool definition for retention time calculations
|
||||
// Get the pool definition assignability check
|
||||
VmPool vmPool = newEventPipeline().fire(new GetPools()
|
||||
.withName(event.fromPool())).get().stream().findFirst()
|
||||
.orElse(null);
|
||||
|
|
@ -286,9 +284,8 @@ public class VmMonitor extends
|
|||
}
|
||||
|
||||
// Find available VM.
|
||||
var pool = vmPool;
|
||||
assignedVm = channelManager.channels().stream()
|
||||
.filter(c -> isAssignable(pool, c.vmDefinition()))
|
||||
.filter(c -> vmPool.isAssignable(c.vmDefinition()))
|
||||
.sorted(Comparator.comparing((VmChannel c) -> c.vmDefinition()
|
||||
.assignmentLastUsed().orElse(Instant.ofEpochSecond(0)))
|
||||
.thenComparing(preferRunning))
|
||||
|
|
@ -300,23 +297,14 @@ public class VmMonitor extends
|
|||
}
|
||||
|
||||
// Assign to user
|
||||
assignedVm.get().pipeline().fire(new UpdateAssignment(vmPool.name(),
|
||||
event.toUser()), assignedVm.get()).get();
|
||||
var vmDef = assignedVm.get().vmDefinition();
|
||||
var vmStub = VmDefinitionStub.get(client(),
|
||||
new GroupVersionKind(VM_OP_GROUP, "", VM_OP_KIND_VM),
|
||||
vmDef.namespace(), vmDef.name());
|
||||
vmStub.updateStatus(from -> {
|
||||
JsonObject status = from.status();
|
||||
var assignment = GsonPtr.to(status).to("assignment");
|
||||
assignment.set("pool", event.fromPool());
|
||||
assignment.set("user", event.toUser());
|
||||
assignment.set("lastUsed", Instant.now().toString());
|
||||
return status;
|
||||
});
|
||||
event.setResult(new VmData(vmDef, assignedVm.get()));
|
||||
|
||||
// Make sure that a newly assigned VM is running.
|
||||
fire(new ModifyVm(vmDef.name(), "state", "Running",
|
||||
assignedVm.get()));
|
||||
assignedVm.get().pipeline().fire(new ModifyVm(vmDef.name(),
|
||||
"state", "Running", assignedVm.get()));
|
||||
}
|
||||
|
||||
private static Comparator<VmChannel> preferRunning
|
||||
|
|
@ -332,38 +320,4 @@ public class VmMonitor extends
|
|||
}
|
||||
};
|
||||
|
||||
@SuppressWarnings("PMD.SimplifyBooleanReturns")
|
||||
private boolean isAssignable(VmPool pool, VmDefinition vmDef) {
|
||||
// Check if the VM is in the pool
|
||||
if (!vmDef.pools().contains(pool.name())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if the VM is not in use
|
||||
if (vmDef.consoleConnected()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// If not assigned, it's usable
|
||||
if (vmDef.assignedTo().isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check if it is to be retained
|
||||
if (vmDef.assignmentLastUsed()
|
||||
.map(lu -> pool.retainUntil(lu))
|
||||
.map(ru -> Instant.now().isBefore(ru)).orElse(false)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Additional check in case lastUsed has not been updated
|
||||
// by PoolMonitor#onVmDefChanged() yet ("race condition")
|
||||
if (vmDef.condition("ConsoleConnected")
|
||||
.map(cc -> cc.getLastTransitionTime().toInstant())
|
||||
.map(t -> pool.retainUntil(t))
|
||||
.map(ru -> Instant.now().isBefore(ru)).orElse(false)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue