From 5efef2a083f9366cb622e5473bcebac3e1d66190 Mon Sep 17 00:00:00 2001 From: "Michael N. Lipp" Date: Sat, 23 Nov 2024 12:54:29 +0100 Subject: [PATCH] Deliver events on dedicated pipeline. --- .../vmoperator/manager/PoolManager.java | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/PoolManager.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/PoolManager.java index f47c569..fb1de27 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/PoolManager.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/PoolManager.java @@ -43,10 +43,14 @@ import org.jdrupes.vmoperator.manager.events.VmDefChanged; import org.jdrupes.vmoperator.manager.events.VmPoolChanged; import org.jdrupes.vmoperator.util.GsonPtr; import org.jgrapes.core.Channel; +import org.jgrapes.core.EventPipeline; import org.jgrapes.core.annotation.Handler; +import org.jgrapes.core.events.Attached; /** - * Watches for changes of VM pools. + * Watches for changes of VM pools. Reports the changes using + * {@link VmPoolChanged} events fired on a special pipeline to + * avoid concurrent change informations. */ @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" }) public class PoolManager extends @@ -55,6 +59,7 @@ public class PoolManager extends private final ReentrantLock pendingLock = new ReentrantLock(); private final Map> pending = new ConcurrentHashMap<>(); private final Map pools = new ConcurrentHashMap<>(); + private EventPipeline poolPipeline; /** * Instantiates a new VM pool manager. @@ -67,6 +72,19 @@ public class PoolManager extends K8sDynamicModels.class); } + /** + * On attached. + * + * @param event the event + */ + @Handler + @SuppressWarnings("PMD.CompareObjectsWithEquals") + public void onAttached(Attached event) { + if (event.node() == this) { + poolPipeline = newEventPipeline(); + } + } + @Override protected void prepareMonitoring() throws IOException, ApiException { client(new K8sClient()); @@ -96,7 +114,7 @@ public class PoolManager extends pending.computeIfAbsent(poolName, k -> Collections .synchronizedSet(new HashSet<>())).addAll(p.vms()); pools.remove(poolName); - fire(new VmPoolChanged(p, true)); + poolPipeline.fire(new VmPoolChanged(p, true)); }); } finally { pendingLock.unlock(); @@ -138,7 +156,7 @@ public class PoolManager extends }); pending.remove(poolName); pools.put(poolName, vmPool); - fire(new VmPoolChanged(vmPool)); + poolPipeline.fire(new VmPoolChanged(vmPool)); } finally { pendingLock.unlock(); } @@ -164,7 +182,7 @@ public class PoolManager extends pending.computeIfAbsent(p, k -> Collections .synchronizedSet(new HashSet<>())).add(vmName); } - fire(new VmPoolChanged(pools.get(p))); + poolPipeline.fire(new VmPoolChanged(pools.get(p))); }); } finally { pendingLock.unlock(); @@ -175,7 +193,7 @@ public class PoolManager extends pendingLock.lock(); pools.values().stream().forEach(p -> { if (p.vms().remove(vmName)) { - fire(new VmPoolChanged(p)); + poolPipeline.fire(new VmPoolChanged(p)); } }); // Should not be necessary, but just in case