Handle conflict properly.

This commit is contained in:
Michael Lipp 2025-02-02 12:10:22 +01:00
parent 21108771d9
commit d5e589709f
3 changed files with 100 additions and 82 deletions

View file

@ -193,42 +193,41 @@ public class K8sGenericStub<O extends KubernetesObject,
} }
/** /**
* Updates the object's status, retrying for the given number of times * Updates the object's status.
* if the update fails due to a conflict.
* *
* @param object the current state of the object (passed to `status`) * @param object the current state of the object (passed to `status`)
* @param status function that returns the new status * @param status function that returns the new status
* @param retries the retries * @return the updated model or empty if the object was not found
* @return the updated model or empty if not successful
* @throws ApiException the api exception * @throws ApiException the api exception
*/ */
@SuppressWarnings("PMD.AssignmentInOperand") @SuppressWarnings("PMD.AssignmentInOperand")
public Optional<O> updateStatus(O object, public Optional<O> updateStatus(O object, Function<O, Object> status)
Function<O, Object> status, int retries) throws ApiException { throws ApiException {
while (true) { return K8s.optional(api.updateStatus(object, status));
try {
return K8s.optional(api.updateStatus(object, status));
} catch (ApiException e) {
if (HttpURLConnection.HTTP_CONFLICT != e.getCode()
|| retries-- <= 0) {
throw e;
}
}
}
} }
/** /**
* Updates the object's status, retrying up to 16 times if there * Gets the object and updates the status. In case of conflict, retries
* is a conflict. * up to `retries` times.
* *
* @param object the current state of the object (passed to `status`) * @param status the status
* @param status function that returns the new status * @param retries the retries in case of conflict
* @return the updated model or empty if not successful * @return the updated model or empty if the object was not found
* @throws ApiException the api exception * @throws ApiException the api exception
*/ */
public Optional<O> updateStatus(O object, @SuppressWarnings({ "PMD.AssignmentInOperand", "PMD.UnusedAssignment" })
Function<O, Object> status) throws ApiException { public Optional<O> updateStatus(Function<O, Object> status, int retries)
return updateStatus(object, status, 16); throws ApiException {
try {
return updateStatus(api.get(namespace, name).throwsApiException()
.getObject(), status);
} catch (ApiException e) {
if (HttpURLConnection.HTTP_CONFLICT != e.getCode()
|| retries-- <= 0) {
throw e;
}
}
return Optional.empty();
} }
/** /**
@ -241,8 +240,7 @@ public class K8sGenericStub<O extends KubernetesObject,
*/ */
public Optional<O> updateStatus(Function<O, Object> status) public Optional<O> updateStatus(Function<O, Object> status)
throws ApiException { throws ApiException {
return updateStatus( return updateStatus(status, 16);
api.get(namespace, name).throwsApiException().getObject(), status);
} }
/** /**

View file

@ -24,6 +24,7 @@ import io.kubernetes.client.custom.V1Patch;
import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration; import io.kubernetes.client.openapi.Configuration;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Instant; import java.time.Instant;
@ -211,7 +212,10 @@ public class Controller extends Component {
} }
/** /**
* Update the assignment information in the status of the VM CR. * Attempt to Update the assignment information in the status of the
* VM CR. Returns true if successful. The handler does not attempt
* retries, because in case of failure it will be necessary to
* re-evaluate the chosen VM.
* *
* @param event the event * @param event the event
* @param channel the channel * @param channel the channel
@ -220,18 +224,27 @@ public class Controller extends Component {
@Handler @Handler
public void onUpdatedAssignment(UpdateAssignment event, VmChannel channel) public void onUpdatedAssignment(UpdateAssignment event, VmChannel channel)
throws ApiException { throws ApiException {
var vmDef = channel.vmDefinition(); try {
var vmStub = VmDefinitionStub.get(channel.client(), var vmDef = channel.vmDefinition();
new GroupVersionKind(VM_OP_GROUP, "", VM_OP_KIND_VM), var vmStub = VmDefinitionStub.get(channel.client(),
vmDef.namespace(), vmDef.name()); new GroupVersionKind(VM_OP_GROUP, "", VM_OP_KIND_VM),
vmStub.updateStatus(from -> { vmDef.namespace(), vmDef.name());
JsonObject status = from.statusJson(); if (vmStub.updateStatus(vmDef, from -> {
var assignment = GsonPtr.to(status).to("assignment"); JsonObject status = from.statusJson();
assignment.set("pool", event.usedPool()); var assignment = GsonPtr.to(status).to("assignment");
assignment.set("user", event.toUser()); assignment.set("pool", event.usedPool());
assignment.set("lastUsed", Instant.now().toString()); assignment.set("user", event.toUser());
return status; assignment.set("lastUsed", Instant.now().toString());
}); return status;
event.setResult(true); }).isPresent()) {
event.setResult(true);
}
} catch (ApiException e) {
// Log exceptions except for conflict, which can be expected
if (HttpURLConnection.HTTP_CONFLICT != e.getCode()) {
throw e;
}
}
event.setResult(false);
} }
} }

View file

@ -256,49 +256,56 @@ public class VmMonitor extends
@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
public void onAssignVm(AssignVm event) public void onAssignVm(AssignVm event)
throws ApiException, InterruptedException { throws ApiException, InterruptedException {
// Search for existing assignment. while (true) {
var assignedVm = channelManager.channels().stream() // Search for existing assignment.
.filter(c -> c.vmDefinition().assignedFrom() var vmQuery = channelManager.channels().stream()
.map(p -> p.equals(event.fromPool())).orElse(false)) .filter(c -> c.vmDefinition().assignedFrom()
.filter(c -> c.vmDefinition().assignedTo() .map(p -> p.equals(event.fromPool())).orElse(false))
.map(u -> u.equals(event.toUser())).orElse(false)) .filter(c -> c.vmDefinition().assignedTo()
.findFirst(); .map(u -> u.equals(event.toUser())).orElse(false))
if (assignedVm.isPresent()) { .findFirst();
var vmDef = assignedVm.get().vmDefinition(); if (vmQuery.isPresent()) {
event.setResult(new VmData(vmDef, assignedVm.get())); var vmDef = vmQuery.get().vmDefinition();
return; event.setResult(new VmData(vmDef, vmQuery.get()));
return;
}
// Get the pool definition for checking possible assignment
VmPool vmPool = newEventPipeline().fire(new GetPools()
.withName(event.fromPool())).get().stream().findFirst()
.orElse(null);
if (vmPool == null) {
return;
}
// Find available VM.
vmQuery = channelManager.channels().stream()
.filter(c -> vmPool.isAssignable(c.vmDefinition()))
.sorted(Comparator.comparing((VmChannel c) -> c.vmDefinition()
.assignmentLastUsed().orElse(Instant.ofEpochSecond(0)))
.thenComparing(preferRunning))
.findFirst();
// None found
if (vmQuery.isEmpty()) {
return;
}
// Assign to user
var chosenVm = vmQuery.get();
var vmPipeline = chosenVm.pipeline();
if (Optional.ofNullable(vmPipeline.fire(new UpdateAssignment(
vmPool.name(), event.toUser()), chosenVm).get())
.orElse(false)) {
var vmDef = chosenVm.vmDefinition();
event.setResult(new VmData(vmDef, chosenVm));
// Make sure that a newly assigned VM is running.
chosenVm.pipeline().fire(new ModifyVm(vmDef.name(),
"state", "Running", chosenVm));
return;
}
} }
// Get the pool definition assignability check
VmPool vmPool = newEventPipeline().fire(new GetPools()
.withName(event.fromPool())).get().stream().findFirst()
.orElse(null);
if (vmPool == null) {
return;
}
// Find available VM.
assignedVm = channelManager.channels().stream()
.filter(c -> vmPool.isAssignable(c.vmDefinition()))
.sorted(Comparator.comparing((VmChannel c) -> c.vmDefinition()
.assignmentLastUsed().orElse(Instant.ofEpochSecond(0)))
.thenComparing(preferRunning))
.findFirst();
// None found
if (assignedVm.isEmpty()) {
return;
}
// Assign to user
assignedVm.get().pipeline().fire(new UpdateAssignment(vmPool.name(),
event.toUser()), assignedVm.get()).get();
var vmDef = assignedVm.get().vmDefinition();
event.setResult(new VmData(vmDef, assignedVm.get()));
// Make sure that a newly assigned VM is running.
assignedVm.get().pipeline().fire(new ModifyVm(vmDef.name(),
"state", "Running", assignedVm.get()));
} }
private static Comparator<VmChannel> preferRunning private static Comparator<VmChannel> preferRunning