Simplify pool management.

This commit is contained in:
Michael Lipp 2025-01-15 21:58:08 +01:00
parent 4943baf3e3
commit bd5227fda3
2 changed files with 53 additions and 71 deletions

View file

@ -36,10 +36,20 @@ import org.jdrupes.vmoperator.util.DataPath;
public class VmPool {
private String name;
private boolean defined;
private List<Grant> permissions = Collections.emptyList();
private final Set<String> vms
= Collections.synchronizedSet(new HashSet<>());
/**
* Instantiates a new vm pool.
*
* @param name the name
*/
public VmPool(String name) {
this.name = name;
}
/**
* Returns the name.
*
@ -58,6 +68,24 @@ public class VmPool {
this.name = name;
}
/**
* Checks if is defined.
*
* @return the result
*/
public boolean isDefined() {
return defined;
}
/**
* Sets if is.
*
* @param defined the defined to set
*/
public void setDefined(boolean defined) {
this.defined = defined;
}
/**
* Permissions granted for a VM from the pool.
*

View file

@ -19,17 +19,13 @@
package org.jdrupes.vmoperator.manager;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.util.Watch;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP;
import org.jdrupes.vmoperator.common.K8s;
import org.jdrupes.vmoperator.common.K8sClient;
@ -56,8 +52,6 @@ import org.jgrapes.core.events.Attached;
public class PoolMonitor extends
AbstractMonitor<K8sDynamicModel, K8sDynamicModels, Channel> {
private final ReentrantLock pendingLock = new ReentrantLock();
private final Map<String, Set<String>> pending = new ConcurrentHashMap<>();
private final Map<String, VmPool> pools = new ConcurrentHashMap<>();
private EventPipeline poolPipeline;
@ -107,18 +101,13 @@ public class PoolMonitor extends
// When pool is deleted, save VMs in pending
if (type == ResponseType.DELETED) {
try {
pendingLock.lock();
Optional.ofNullable(pools.get(poolName)).ifPresent(
p -> {
pending.computeIfAbsent(poolName, k -> Collections
.synchronizedSet(new HashSet<>())).addAll(p.vms());
pools.remove(poolName);
poolPipeline.fire(new VmPoolChanged(p, true));
});
} finally {
pendingLock.unlock();
}
Optional.ofNullable(pools.get(poolName)).ifPresent(pool -> {
pool.setDefined(false);
if (pool.vms().isEmpty()) {
pools.remove(poolName);
}
poolPipeline.fire(new VmPoolChanged(pool, true));
});
return;
}
@ -135,32 +124,13 @@ public class PoolMonitor extends
}
}
// Convert to VM pool
var vmPool = client().getJSON().getGson().fromJson(
GsonPtr.to(poolModel.data()).to("spec").get(),
VmPool.class);
V1ObjectMeta metadata = response.object.getMetadata();
vmPool.setName(metadata.getName());
// If modified, merge changes and notify
if (type == ResponseType.MODIFIED && pools.containsKey(poolName)) {
pools.get(poolName).setPermissions(vmPool.permissions());
poolPipeline.fire(new VmPoolChanged(vmPool));
return;
}
// Add new pool
try {
pendingLock.lock();
Optional.ofNullable(pending.get(poolName)).ifPresent(s -> {
vmPool.vms().addAll(s);
});
pending.remove(poolName);
pools.put(poolName, vmPool);
poolPipeline.fire(new VmPoolChanged(vmPool));
} finally {
pendingLock.unlock();
}
// Get pool and merge changes
var vmPool = pools.computeIfAbsent(poolName, k -> new VmPool(poolName));
var newData = client().getJSON().getGson().fromJson(
GsonPtr.to(poolModel.data()).to("spec").get(), VmPool.class);
vmPool.setPermissions(newData.permissions());
vmPool.setDefined(true);
poolPipeline.fire(new VmPoolChanged(vmPool));
}
/**
@ -173,35 +143,19 @@ public class PoolMonitor extends
String vmName = event.vmDefinition().name();
switch (event.type()) {
case ADDED:
try {
pendingLock.lock();
event.vmDefinition().<List<String>> fromSpec("pools")
.orElse(Collections.emptyList()).stream().forEach(p -> {
if (pools.containsKey(p)) {
pools.get(p).vms().add(vmName);
} else {
pending.computeIfAbsent(p, k -> Collections
.synchronizedSet(new HashSet<>())).add(vmName);
}
poolPipeline.fire(new VmPoolChanged(pools.get(p)));
});
} finally {
pendingLock.unlock();
}
event.vmDefinition().<List<String>> fromSpec("pools")
.orElse(Collections.emptyList()).stream().forEach(p -> {
pools.computeIfAbsent(p, k -> new VmPool(p))
.vms().add(vmName);
poolPipeline.fire(new VmPoolChanged(pools.get(p)));
});
break;
case DELETED:
try {
pendingLock.lock();
pools.values().stream().forEach(p -> {
if (p.vms().remove(vmName)) {
poolPipeline.fire(new VmPoolChanged(p));
}
});
// Should not be necessary, but just in case
pending.values().stream().forEach(s -> s.remove(vmName));
} finally {
pendingLock.unlock();
}
pools.values().stream().forEach(p -> {
if (p.vms().remove(vmName)) {
poolPipeline.fire(new VmPoolChanged(p));
}
});
break;
default:
break;