From d5e589709fb96d106af137c57ffb59dca2b893c6 Mon Sep 17 00:00:00 2001 From: "Michael N. Lipp" Date: Sun, 2 Feb 2025 12:10:22 +0100 Subject: [PATCH] Handle conflict properly. --- .../vmoperator/common/K8sGenericStub.java | 50 +++++----- .../vmoperator/manager/Controller.java | 41 ++++++--- .../jdrupes/vmoperator/manager/VmMonitor.java | 91 ++++++++++--------- 3 files changed, 100 insertions(+), 82 deletions(-) diff --git a/org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/K8sGenericStub.java b/org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/K8sGenericStub.java index 0689a97..688f43f 100644 --- a/org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/K8sGenericStub.java +++ b/org.jdrupes.vmoperator.common/src/org/jdrupes/vmoperator/common/K8sGenericStub.java @@ -193,42 +193,41 @@ public class K8sGenericStub updateStatus(O object, - Function status, int retries) throws ApiException { - while (true) { - try { - return K8s.optional(api.updateStatus(object, status)); - } catch (ApiException e) { - if (HttpURLConnection.HTTP_CONFLICT != e.getCode() - || retries-- <= 0) { - throw e; - } - } - } + public Optional updateStatus(O object, Function status) + throws ApiException { + return K8s.optional(api.updateStatus(object, status)); } /** - * Updates the object's status, retrying up to 16 times if there - * is a conflict. + * Gets the object and updates the status. In case of conflict, retries + * up to `retries` times. * - * @param object the current state of the object (passed to `status`) - * @param status function that returns the new status - * @return the updated model or empty if not successful + * @param status the status + * @param retries the retries in case of conflict + * @return the updated model or empty if the object was not found * @throws ApiException the api exception */ - public Optional updateStatus(O object, - Function status) throws ApiException { - return updateStatus(object, status, 16); + @SuppressWarnings({ "PMD.AssignmentInOperand", "PMD.UnusedAssignment" }) + public Optional updateStatus(Function status, int retries) + throws ApiException { + try { + return updateStatus(api.get(namespace, name).throwsApiException() + .getObject(), status); + } catch (ApiException e) { + if (HttpURLConnection.HTTP_CONFLICT != e.getCode() + || retries-- <= 0) { + throw e; + } + } + return Optional.empty(); } /** @@ -241,8 +240,7 @@ public class K8sGenericStub updateStatus(Function status) throws ApiException { - return updateStatus( - api.get(namespace, name).throwsApiException().getObject(), status); + return updateStatus(status, 16); } /** diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java index 3e25a08..80ff0f7 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java @@ -24,6 +24,7 @@ import io.kubernetes.client.custom.V1Patch; import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.Configuration; import java.io.IOException; +import java.net.HttpURLConnection; import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; @@ -211,7 +212,10 @@ public class Controller extends Component { } /** - * Update the assignment information in the status of the VM CR. + * Attempt to Update the assignment information in the status of the + * VM CR. Returns true if successful. The handler does not attempt + * retries, because in case of failure it will be necessary to + * re-evaluate the chosen VM. * * @param event the event * @param channel the channel @@ -220,18 +224,27 @@ public class Controller extends Component { @Handler public void onUpdatedAssignment(UpdateAssignment event, VmChannel channel) throws ApiException { - var vmDef = channel.vmDefinition(); - var vmStub = VmDefinitionStub.get(channel.client(), - new GroupVersionKind(VM_OP_GROUP, "", VM_OP_KIND_VM), - vmDef.namespace(), vmDef.name()); - vmStub.updateStatus(from -> { - JsonObject status = from.statusJson(); - var assignment = GsonPtr.to(status).to("assignment"); - assignment.set("pool", event.usedPool()); - assignment.set("user", event.toUser()); - assignment.set("lastUsed", Instant.now().toString()); - return status; - }); - event.setResult(true); + try { + var vmDef = channel.vmDefinition(); + var vmStub = VmDefinitionStub.get(channel.client(), + new GroupVersionKind(VM_OP_GROUP, "", VM_OP_KIND_VM), + vmDef.namespace(), vmDef.name()); + if (vmStub.updateStatus(vmDef, from -> { + JsonObject status = from.statusJson(); + var assignment = GsonPtr.to(status).to("assignment"); + assignment.set("pool", event.usedPool()); + assignment.set("user", event.toUser()); + assignment.set("lastUsed", Instant.now().toString()); + return status; + }).isPresent()) { + event.setResult(true); + } + } catch (ApiException e) { + // Log exceptions except for conflict, which can be expected + if (HttpURLConnection.HTTP_CONFLICT != e.getCode()) { + throw e; + } + } + event.setResult(false); } } diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmMonitor.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmMonitor.java index e506d44..17e3c58 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmMonitor.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmMonitor.java @@ -256,49 +256,56 @@ public class VmMonitor extends @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") public void onAssignVm(AssignVm event) throws ApiException, InterruptedException { - // Search for existing assignment. - var assignedVm = channelManager.channels().stream() - .filter(c -> c.vmDefinition().assignedFrom() - .map(p -> p.equals(event.fromPool())).orElse(false)) - .filter(c -> c.vmDefinition().assignedTo() - .map(u -> u.equals(event.toUser())).orElse(false)) - .findFirst(); - if (assignedVm.isPresent()) { - var vmDef = assignedVm.get().vmDefinition(); - event.setResult(new VmData(vmDef, assignedVm.get())); - return; + while (true) { + // Search for existing assignment. + var vmQuery = channelManager.channels().stream() + .filter(c -> c.vmDefinition().assignedFrom() + .map(p -> p.equals(event.fromPool())).orElse(false)) + .filter(c -> c.vmDefinition().assignedTo() + .map(u -> u.equals(event.toUser())).orElse(false)) + .findFirst(); + if (vmQuery.isPresent()) { + var vmDef = vmQuery.get().vmDefinition(); + event.setResult(new VmData(vmDef, vmQuery.get())); + return; + } + + // Get the pool definition for checking possible assignment + VmPool vmPool = newEventPipeline().fire(new GetPools() + .withName(event.fromPool())).get().stream().findFirst() + .orElse(null); + if (vmPool == null) { + return; + } + + // Find available VM. + vmQuery = channelManager.channels().stream() + .filter(c -> vmPool.isAssignable(c.vmDefinition())) + .sorted(Comparator.comparing((VmChannel c) -> c.vmDefinition() + .assignmentLastUsed().orElse(Instant.ofEpochSecond(0))) + .thenComparing(preferRunning)) + .findFirst(); + + // None found + if (vmQuery.isEmpty()) { + return; + } + + // Assign to user + var chosenVm = vmQuery.get(); + var vmPipeline = chosenVm.pipeline(); + if (Optional.ofNullable(vmPipeline.fire(new UpdateAssignment( + vmPool.name(), event.toUser()), chosenVm).get()) + .orElse(false)) { + var vmDef = chosenVm.vmDefinition(); + event.setResult(new VmData(vmDef, chosenVm)); + + // Make sure that a newly assigned VM is running. + chosenVm.pipeline().fire(new ModifyVm(vmDef.name(), + "state", "Running", chosenVm)); + return; + } } - - // Get the pool definition assignability check - VmPool vmPool = newEventPipeline().fire(new GetPools() - .withName(event.fromPool())).get().stream().findFirst() - .orElse(null); - if (vmPool == null) { - return; - } - - // Find available VM. - assignedVm = channelManager.channels().stream() - .filter(c -> vmPool.isAssignable(c.vmDefinition())) - .sorted(Comparator.comparing((VmChannel c) -> c.vmDefinition() - .assignmentLastUsed().orElse(Instant.ofEpochSecond(0))) - .thenComparing(preferRunning)) - .findFirst(); - - // None found - if (assignedVm.isEmpty()) { - return; - } - - // Assign to user - assignedVm.get().pipeline().fire(new UpdateAssignment(vmPool.name(), - event.toUser()), assignedVm.get()).get(); - var vmDef = assignedVm.get().vmDefinition(); - event.setResult(new VmData(vmDef, assignedVm.get())); - - // Make sure that a newly assigned VM is running. - assignedVm.get().pipeline().fire(new ModifyVm(vmDef.name(), - "state", "Running", assignedVm.get())); } private static Comparator preferRunning