Merge branch 'cleanup/v3.4' into 'main'
Some checks failed
Java CI with Gradle / build (push) Has been cancelled

Prepare release v3.4

See merge request org/jdrupes/vm-operator!5
This commit is contained in:
Michael Lipp 2024-10-06 10:05:10 +00:00
commit 1bc63abadf
12 changed files with 274 additions and 504 deletions

View file

@ -0,0 +1,113 @@
/*
* VM-Operator
* Copyright (C) 2024 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.manager.events;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
import org.jgrapes.core.Channel;
/**
* Supports the lookup of a channel by a name (an id). As a convenience,
* it is possible to additionally associate arbitrary data with the entry
* (and thus with the channel). Note that this interface defines a
* read-only view of the dictionary.
*
* @param <K> the key type
* @param <C> the channel type
* @param <A> the type of the associated data
*/
public interface ChannelDictionary<K, C extends Channel, A> {
/**
* Combines the channel and the associated data.
*
* @param <C> the channel type
* @param <A> the type of the associated data
* @param channel the channel
* @param associated the associated
*/
@SuppressWarnings("PMD.ShortClassName")
public record Value<C extends Channel, A>(C channel, A associated) {
}
/**
* Returns all known keys.
*
* @return the keys
*/
Set<K> keys();
/**
* Return all known values.
*
* @return the collection
*/
Collection<Value<C, A>> values();
/**
* Returns the channel and associates data registered for the key
* or an empty optional if no entry exists.
*
* @param key the key
* @return the result
*/
Optional<Value<C, A>> value(K key);
/**
* Return all known channels.
*
* @return the collection
*/
default Collection<C> channels() {
return values().stream().map(v -> v.channel).toList();
}
/**
* Returns the channel registered for the key or an empty optional
* if no mapping exists.
*
* @param key the key
* @return the optional
*/
default Optional<C> channel(K key) {
return value(key).map(b -> b.channel);
}
/**
* Returns all known associated data.
*
* @return the collection
*/
default Collection<A> associated() {
return values().stream()
.filter(v -> v.associated() != null)
.map(v -> v.associated).toList();
}
/**
* Return the data associated with the entry for the channel.
*
* @param key the key
* @return the data
*/
default Optional<A> associated(K key) {
return value(key).map(b -> b.associated);
}
}

View file

@ -27,53 +27,24 @@ import java.util.function.Function;
import org.jgrapes.core.Channel; import org.jgrapes.core.Channel;
/** /**
* A channel manager that maintains mappings from a key to a channel. * Provides an actively managed implementation of the {@link ChannelDictionary}.
* As a convenience, it is possible to additionally associate arbitrary
* data with the entry (and thus with the channel).
* *
* The manager should be used by a component that defines channels for * The {@link ChannelManager} can be used for housekeeping by any component
* housekeeping. It can be shared between this component and another * that creates channels. It can be shared between this component and
* component, preferably using the {@link #fixed()} view for the * some other component, preferably passing it as {@link ChannelDictionary}
* second component. Alternatively, the second component can use a * (the read-only view) to the second component. Alternatively, the other
* {@link ChannelCache} to track the mappings using events. * component can use a {@link ChannelTracker} to track the mappings using
* events.
* *
* @param <K> the key type * @param <K> the key type
* @param <C> the channel type * @param <C> the channel type
* @param <A> the type of the associated data * @param <A> the type of the associated data
*/ */
public class ChannelManager<K, C extends Channel, A> { public class ChannelManager<K, C extends Channel, A>
implements ChannelDictionary<K, C, A> {
private final Map<K, Both<C, A>> channels = new ConcurrentHashMap<>(); private final Map<K, Value<C, A>> entries = new ConcurrentHashMap<>();
private final Function<K, C> supplier; private final Function<K, C> supplier;
private ChannelManager<K, C, A> readOnly;
/**
* Combines the channel and the associated data.
*
* @param <C> the generic type
* @param <A> the generic type
*/
@SuppressWarnings("PMD.ShortClassName")
public static class Both<C extends Channel, A> {
/** The channel. */
public C channel;
/** The associated. */
public A associated;
/**
* Instantiates a new both.
*
* @param channel the channel
* @param associated the associated
*/
public Both(C channel, A associated) {
super();
this.channel = channel;
this.associated = associated;
}
}
/** /**
* Instantiates a new channel manager. * Instantiates a new channel manager.
@ -91,6 +62,21 @@ public class ChannelManager<K, C extends Channel, A> {
this(k -> null); this(k -> null);
} }
@Override
public Set<K> keys() {
return entries.keySet();
}
/**
* Return all known values.
*
* @return the collection
*/
@Override
public Collection<Value<C, A>> values() {
return entries.values();
}
/** /**
* Returns the channel and associates data registered for the key * Returns the channel and associates data registered for the key
* or an empty optional if no mapping exists. * or an empty optional if no mapping exists.
@ -98,10 +84,8 @@ public class ChannelManager<K, C extends Channel, A> {
* @param key the key * @param key the key
* @return the result * @return the result
*/ */
public Optional<Both<C, A>> both(K key) { public Optional<Value<C, A>> value(K key) {
synchronized (channels) { return Optional.ofNullable(entries.get(key));
return Optional.ofNullable(channels.get(key));
}
} }
/** /**
@ -113,7 +97,7 @@ public class ChannelManager<K, C extends Channel, A> {
* @return the channel manager * @return the channel manager
*/ */
public ChannelManager<K, C, A> put(K key, C channel, A associated) { public ChannelManager<K, C, A> put(K key, C channel, A associated) {
channels.put(key, new Both<>(channel, associated)); entries.put(key, new Value<>(channel, associated));
return this; return this;
} }
@ -129,17 +113,6 @@ public class ChannelManager<K, C extends Channel, A> {
return this; return this;
} }
/**
* Returns the channel registered for the key or an empty optional
* if no mapping exists.
*
* @param key the key
* @return the optional
*/
public Optional<C> channel(K key) {
return both(key).map(b -> b.channel);
}
/** /**
* Returns the {@link Channel} for the given name, creating it using * Returns the {@link Channel} for the given name, creating it using
* the supplier passed to the constructor if it doesn't exist yet. * the supplier passed to the constructor if it doesn't exist yet.
@ -147,8 +120,8 @@ public class ChannelManager<K, C extends Channel, A> {
* @param key the key * @param key the key
* @return the channel * @return the channel
*/ */
public Optional<C> getChannel(K key) { public C channelGet(K key) {
return getChannel(key, supplier); return computeIfAbsent(key, supplier);
} }
/** /**
@ -161,17 +134,9 @@ public class ChannelManager<K, C extends Channel, A> {
*/ */
@SuppressWarnings({ "PMD.AssignmentInOperand", @SuppressWarnings({ "PMD.AssignmentInOperand",
"PMD.DataflowAnomalyAnalysis" }) "PMD.DataflowAnomalyAnalysis" })
public Optional<C> getChannel(K key, Function<K, C> supplier) { public C computeIfAbsent(K key, Function<K, C> supplier) {
synchronized (channels) { return entries.computeIfAbsent(key,
return Optional k -> new Value<>(supplier.apply(k), null)).channel();
.of(Optional.ofNullable(channels.get(key))
.map(v -> v.channel)
.orElseGet(() -> {
var channel = supplier.apply(key);
channels.put(key, new Both<>(channel, null));
return channel;
}));
}
} }
/** /**
@ -183,121 +148,17 @@ public class ChannelManager<K, C extends Channel, A> {
* @return the channel manager * @return the channel manager
*/ */
public ChannelManager<K, C, A> associate(K key, A data) { public ChannelManager<K, C, A> associate(K key, A data) {
synchronized (channels) { Optional.ofNullable(entries.computeIfPresent(key,
Optional.ofNullable(channels.get(key)) (k, existing) -> new Value<>(existing.channel(), data)));
.ifPresent(v -> v.associated = data);
}
return this; return this;
} }
/**
* Return the data associated with the entry for the channel.
*
* @param key the key
* @return the data
*/
public Optional<A> associated(K key) {
return both(key).map(b -> b.associated);
}
/**
* Returns all associated data.
*
* @return the collection
*/
public Collection<A> associated() {
synchronized (channels) {
return channels.values().stream()
.filter(v -> v.associated != null)
.map(v -> v.associated).toList();
}
}
/** /**
* Removes the channel with the given name. * Removes the channel with the given name.
* *
* @param name the name * @param name the name
*/ */
public void remove(String name) { public void remove(String name) {
synchronized (channels) { entries.remove(name);
channels.remove(name);
}
}
/**
* Returns all known keys.
*
* @return the sets the
*/
public Set<K> keys() {
return channels.keySet();
}
/**
* Returns a read only view of this channel manager. The methods
* that usually create a new entry refrain from doing so. The
* methods that change the value of channel and {@link #remove(String)}
* do nothing. The associated data, however, can still be changed.
*
* @return the channel manager
*/
public ChannelManager<K, C, A> fixed() {
if (readOnly == null) {
readOnly = new ChannelManager<>(supplier) {
@Override
public Optional<Both<C, A>> both(K key) {
return ChannelManager.this.both(key);
}
@Override
public ChannelManager<K, C, A> put(K key, C channel,
A associated) {
return associate(key, associated);
}
@Override
public Optional<C> getChannel(K key) {
return ChannelManager.this.channel(key);
}
@Override
public Optional<C> getChannel(K key, Function<K, C> supplier) {
return ChannelManager.this.channel(key);
}
@Override
public ChannelManager<K, C, A> associate(K key, A data) {
return ChannelManager.this.associate(key, data);
}
@Override
public Optional<A> associated(K key) {
return ChannelManager.this.associated(key);
}
@Override
public Collection<A> associated() {
return ChannelManager.this.associated();
}
@Override
public void remove(String name) {
// Do nothing
}
@Override
public Set<K> keys() {
return ChannelManager.this.keys();
}
@Override
public ChannelManager<K, C, A> fixed() {
return ChannelManager.this.fixed();
}
};
}
return readOnly;
} }
} }

View file

@ -19,6 +19,7 @@
package org.jdrupes.vmoperator.manager.events; package org.jdrupes.vmoperator.manager.events;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
@ -27,20 +28,30 @@ import java.util.concurrent.ConcurrentHashMap;
import org.jgrapes.core.Channel; import org.jgrapes.core.Channel;
/** /**
* A channel manager that tracks mappings from a key to a channel using * Used to track mapping from a key to a channel. Entries must
* "add/remove" (or "open/close") events and the channels on which they * be maintained by handlers for "add/remove" (or "open/close")
* are delivered. * events delivered on the channels that are to be
* made available by the tracker.
*
* The channels are stored in the dictionary using {@link WeakReference}s.
* Removing entries is therefore best practice but not an absolute necessity
* as entries for cleared references are removed when one of the methods
* {@link #values()}, {@link #channels()} or {@link #associated()} is called.
* *
* @param <K> the key type * @param <K> the key type
* @param <C> the channel type * @param <C> the channel type
* @param <A> the type of the associated data * @param <A> the type of the associated data
*/ */
public class ChannelCache<K, C extends Channel, A> { public class ChannelTracker<K, C extends Channel, A>
implements ChannelDictionary<K, C, A> {
private final Map<K, Data<C, A>> channels = new ConcurrentHashMap<>(); private final Map<K, Data<C, A>> entries = new ConcurrentHashMap<>();
/** /**
* Helper * Combines the channel and associated data.
*
* @param <C> the generic type
* @param <A> the generic type
*/ */
@SuppressWarnings("PMD.ShortClassName") @SuppressWarnings("PMD.ShortClassName")
private static class Data<C extends Channel, A> { private static class Data<C extends Channel, A> {
@ -57,32 +68,24 @@ public class ChannelCache<K, C extends Channel, A> {
} }
} }
/** @Override
* Combines the channel and the associated data. public Set<K> keys() {
* return entries.keySet();
* @param <C> the generic type
* @param <A> the generic type
*/
@SuppressWarnings("PMD.ShortClassName")
public static class Both<C extends Channel, A> {
/** The channel. */
public C channel;
/** The associated. */
public A associated;
/**
* Instantiates a new both.
*
* @param channel the channel
* @param associated the associated
*/
public Both(C channel, A associated) {
super();
this.channel = channel;
this.associated = associated;
} }
@Override
public Collection<Value<C, A>> values() {
var result = new ArrayList<Value<C, A>>();
for (var itr = entries.entrySet().iterator(); itr.hasNext();) {
var value = itr.next().getValue();
var channel = value.channel.get();
if (channel == null) {
itr.remove();
continue;
}
result.add(new Value<>(channel, value.associated));
}
return result;
} }
/** /**
@ -92,20 +95,18 @@ public class ChannelCache<K, C extends Channel, A> {
* @param key the key * @param key the key
* @return the result * @return the result
*/ */
public Optional<Both<C, A>> both(K key) { public Optional<Value<C, A>> value(K key) {
synchronized (channels) { var value = entries.get(key);
var value = channels.get(key);
if (value == null) { if (value == null) {
return Optional.empty(); return Optional.empty();
} }
var channel = value.channel.get(); var channel = value.channel.get();
if (channel == null) { if (channel == null) {
// Cleanup old reference // Cleanup old reference
channels.remove(key); entries.remove(key);
return Optional.empty(); return Optional.empty();
} }
return Optional.of(new Both<>(channel, value.associated)); return Optional.of(new Value<>(channel, value.associated));
}
} }
/** /**
@ -116,10 +117,10 @@ public class ChannelCache<K, C extends Channel, A> {
* @param associated the associated * @param associated the associated
* @return the channel manager * @return the channel manager
*/ */
public ChannelCache<K, C, A> put(K key, C channel, A associated) { public ChannelTracker<K, C, A> put(K key, C channel, A associated) {
Data<C, A> data = new Data<>(channel); Data<C, A> data = new Data<>(channel);
data.associated = associated; data.associated = associated;
channels.put(key, data); entries.put(key, data);
return this; return this;
} }
@ -130,22 +131,11 @@ public class ChannelCache<K, C extends Channel, A> {
* @param channel the channel * @param channel the channel
* @return the channel manager * @return the channel manager
*/ */
public ChannelCache<K, C, A> put(K key, C channel) { public ChannelTracker<K, C, A> put(K key, C channel) {
put(key, channel, null); put(key, channel, null);
return this; return this;
} }
/**
* Returns the channel registered for the key or an empty optional
* if no mapping exists.
*
* @param key the key
* @return the optional
*/
public Optional<C> channel(K key) {
return both(key).map(b -> b.channel);
}
/** /**
* Associate the entry for the channel with the given data. The entry * Associate the entry for the channel with the given data. The entry
* for the channel must already exist. * for the channel must already exist.
@ -154,54 +144,18 @@ public class ChannelCache<K, C extends Channel, A> {
* @param data the data * @param data the data
* @return the channel manager * @return the channel manager
*/ */
public ChannelCache<K, C, A> associate(K key, A data) { public ChannelTracker<K, C, A> associate(K key, A data) {
synchronized (channels) { Optional.ofNullable(entries.get(key))
Optional.ofNullable(channels.get(key))
.ifPresent(v -> v.associated = data); .ifPresent(v -> v.associated = data);
}
return this; return this;
} }
/**
* Return the data associated with the entry for the channel.
*
* @param key the key
* @return the data
*/
public Optional<A> associated(K key) {
return both(key).map(b -> b.associated);
}
/**
* Returns all associated data.
*
* @return the collection
*/
public Collection<A> associated() {
synchronized (channels) {
return channels.values().stream()
.filter(v -> v.channel.get() != null && v.associated != null)
.map(v -> v.associated).toList();
}
}
/** /**
* Removes the channel with the given name. * Removes the channel with the given name.
* *
* @param name the name * @param name the name
*/ */
public void remove(String name) { public void remove(String name) {
synchronized (channels) { entries.remove(name);
channels.remove(name);
}
}
/**
* Returns all known keys.
*
* @return the sets the
*/
public Set<K> keys() {
return channels.keySet();
} }
} }

View file

@ -1,76 +0,0 @@
/*
* VM-Operator
* Copyright (C) 2024 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.manager.events;
import io.kubernetes.client.openapi.models.V1Service;
import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
import org.jgrapes.core.Channel;
import org.jgrapes.core.Components;
import org.jgrapes.core.Event;
/**
* Indicates that a service has changed.
*/
@SuppressWarnings("PMD.DataClass")
public class ServiceChanged extends Event<Void> {
private final ResponseType type;
private final V1Service service;
/**
* Initializes a new service changed event.
*
* @param type the type
* @param service the service
*/
public ServiceChanged(ResponseType type, V1Service service) {
this.type = type;
this.service = service;
}
/**
* Returns the type.
*
* @return the type
*/
public ResponseType type() {
return type;
}
/**
* Gets the service.
*
* @return the service
*/
public V1Service service() {
return service;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(Components.objectName(this)).append(" [")
.append(service.getMetadata().getName()).append(' ').append(type);
if (channels() != null) {
builder.append(", channels=").append(Channel.toString(channels()));
}
builder.append(']');
return builder.toString();
}
}

View file

@ -27,14 +27,11 @@ import io.kubernetes.client.util.generic.options.ListOptions;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level; import java.util.logging.Level;
import org.jdrupes.vmoperator.common.K8s; import org.jdrupes.vmoperator.common.K8s;
import org.jdrupes.vmoperator.common.K8sClient; import org.jdrupes.vmoperator.common.K8sClient;
import org.jdrupes.vmoperator.common.K8sObserver; import org.jdrupes.vmoperator.common.K8sObserver;
import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
import org.jdrupes.vmoperator.manager.events.ChannelManager;
import org.jdrupes.vmoperator.manager.events.Exit; import org.jdrupes.vmoperator.manager.events.Exit;
import org.jgrapes.core.Channel; import org.jgrapes.core.Channel;
import org.jgrapes.core.Component; import org.jgrapes.core.Component;
@ -45,7 +42,11 @@ import org.jgrapes.core.events.Stop;
import org.jgrapes.util.events.ConfigurationUpdate; import org.jgrapes.util.events.ConfigurationUpdate;
/** /**
* A base class for monitoring VM related resources. * A base class for monitoring VM related resources. When started,
* it creates observers for all versions of the the {@link APIResource}
* configured by {@link #context(APIResource)}. The APIResource is not
* passed to the constructor because in some cases it has to be
* evaluated lazily.
* *
* @param <O> the object type for the context * @param <O> the object type for the context
* @param <L> the object list type for the context * @param <L> the object list type for the context
@ -61,15 +62,17 @@ public abstract class AbstractMonitor<O extends KubernetesObject,
private String namespace; private String namespace;
private ListOptions options = new ListOptions(); private ListOptions options = new ListOptions();
private final AtomicInteger observerCounter = new AtomicInteger(0); private final AtomicInteger observerCounter = new AtomicInteger(0);
private ChannelManager<String, C, ?> channelManager;
/** /**
* Initializes the instance. * Initializes the instance.
* *
* @param componentChannel the component channel * @param componentChannel the component channel
* @param objectClass the class of the Kubernetes object to watch
* @param objectListClass the class of the list of Kubernetes objects
* to watch
*/ */
protected AbstractMonitor(Channel componentChannel, Class<O> objectClass, protected AbstractMonitor(Channel componentChannel,
Class<L> objectListClass) { Class<O> objectClass, Class<L> objectListClass) {
super(componentChannel); super(componentChannel);
this.objectClass = objectClass; this.objectClass = objectClass;
this.objectListClass = objectListClass; this.objectListClass = objectListClass;
@ -155,27 +158,6 @@ public abstract class AbstractMonitor<O extends KubernetesObject,
return this; return this;
} }
/**
* Returns the channel manager.
*
* @return the context
*/
public ChannelManager<String, C, ?> channelManager() {
return channelManager;
}
/**
* Sets the channel manager.
*
* @param channelManager the channel manager
* @return the abstract monitor
*/
public AbstractMonitor<O, L, C>
channelManager(ChannelManager<String, C, ?> channelManager) {
this.channelManager = channelManager;
return this;
}
/** /**
* Looks for a key "namespace" in the configuration and, if found, * Looks for a key "namespace" in the configuration and, if found,
* sets the namespace to its value. * sets the namespace to its value.
@ -193,7 +175,7 @@ public abstract class AbstractMonitor<O extends KubernetesObject,
} }
/** /**
* Handle the start event. Configures the namespace invokes * Handle the start event. Configures the namespace, invokes
* {@link #prepareMonitoring()} and starts the observers. * {@link #prepareMonitoring()} and starts the observers.
* *
* @param event the event * @param event the event
@ -239,9 +221,6 @@ public abstract class AbstractMonitor<O extends KubernetesObject,
K8s.preferred(context, version), namespace, options) K8s.preferred(context, version), namespace, options)
.handler((c, r) -> { .handler((c, r) -> {
handleChange(c, r); handleChange(c, r);
if (ResponseType.valueOf(r.type) == ResponseType.DELETED) {
channelManager.remove(r.object.getMetadata().getName());
}
}).onTerminated((o, t) -> { }).onTerminated((o, t) -> {
if (observerCounter.decrementAndGet() == 0) { if (observerCounter.decrementAndGet() == 0) {
unregisterAsGenerator(); unregisterAsGenerator();
@ -255,7 +234,8 @@ public abstract class AbstractMonitor<O extends KubernetesObject,
/** /**
* Invoked by {@link #onStart(Start)} after the namespace has * Invoked by {@link #onStart(Start)} after the namespace has
* been configured and before starting the observer. * been configured and before starting the observer. This is
* the last opportunity to invoke {@link #context(APIResource)}.
* *
* @throws IOException Signals that an I/O exception has occurred. * @throws IOException Signals that an I/O exception has occurred.
* @throws ApiException the api exception * @throws ApiException the api exception
@ -272,14 +252,4 @@ public abstract class AbstractMonitor<O extends KubernetesObject,
* @param change the change * @param change the change
*/ */
protected abstract void handleChange(K8sClient client, Response<O> change); protected abstract void handleChange(K8sClient client, Response<O> change);
/**
* Returns the {@link Channel} for the given name.
*
* @param name the name
* @return the channel used for events related to the specified object
*/
protected Optional<C> channel(String name) {
return channelManager.getChannel(name);
}
} }

View file

@ -100,9 +100,8 @@ public class Controller extends Component {
return null; return null;
} }
}); });
attach(new VmMonitor(channel()).channelManager(chanMgr)); attach(new VmMonitor(channel(), chanMgr));
attach(new DisplaySecretMonitor(channel()) attach(new DisplaySecretMonitor(channel(), chanMgr));
.channelManager(chanMgr.fixed()));
// Currently, we don't use the IP assigned by the load balancer // Currently, we don't use the IP assigned by the load balancer
// to access the VM's console. Might change in the future. // to access the VM's console. Might change in the future.
// attach(new ServiceMonitor(channel()).channelManager(chanMgr)); // attach(new ServiceMonitor(channel()).channelManager(chanMgr));

View file

@ -44,6 +44,7 @@ import org.jdrupes.vmoperator.common.K8sV1SecretStub;
import static org.jdrupes.vmoperator.manager.Constants.COMP_DISPLAY_SECRET; import static org.jdrupes.vmoperator.manager.Constants.COMP_DISPLAY_SECRET;
import static org.jdrupes.vmoperator.manager.Constants.DATA_DISPLAY_PASSWORD; import static org.jdrupes.vmoperator.manager.Constants.DATA_DISPLAY_PASSWORD;
import static org.jdrupes.vmoperator.manager.Constants.DATA_PASSWORD_EXPIRY; import static org.jdrupes.vmoperator.manager.Constants.DATA_PASSWORD_EXPIRY;
import org.jdrupes.vmoperator.manager.events.ChannelDictionary;
import org.jdrupes.vmoperator.manager.events.GetDisplayPassword; import org.jdrupes.vmoperator.manager.events.GetDisplayPassword;
import org.jdrupes.vmoperator.manager.events.VmChannel; import org.jdrupes.vmoperator.manager.events.VmChannel;
import org.jdrupes.vmoperator.manager.events.VmDefChanged; import org.jdrupes.vmoperator.manager.events.VmDefChanged;
@ -68,14 +69,18 @@ public class DisplaySecretMonitor
private int passwordValidity = 10; private int passwordValidity = 10;
private final List<PendingGet> pendingGets private final List<PendingGet> pendingGets
= Collections.synchronizedList(new LinkedList<>()); = Collections.synchronizedList(new LinkedList<>());
private final ChannelDictionary<String, VmChannel, ?> channelDictionary;
/** /**
* Instantiates a new display secrets monitor. * Instantiates a new display secrets monitor.
* *
* @param componentChannel the component channel * @param componentChannel the component channel
* @param channelDictionary the channel dictionary
*/ */
public DisplaySecretMonitor(Channel componentChannel) { public DisplaySecretMonitor(Channel componentChannel,
ChannelDictionary<String, VmChannel, ?> channelDictionary) {
super(componentChannel, V1Secret.class, V1SecretList.class); super(componentChannel, V1Secret.class, V1SecretList.class);
this.channelDictionary = channelDictionary;
context(K8sV1SecretStub.CONTEXT); context(K8sV1SecretStub.CONTEXT);
ListOptions options = new ListOptions(); ListOptions options = new ListOptions();
options.setLabelSelector("app.kubernetes.io/name=" + APP_NAME + "," options.setLabelSelector("app.kubernetes.io/name=" + APP_NAME + ","
@ -116,7 +121,7 @@ public class DisplaySecretMonitor
if (vmName == null) { if (vmName == null) {
return; return;
} }
var channel = channel(vmName).orElse(null); var channel = channelDictionary.channel(vmName).orElse(null);
if (channel == null || channel.vmDefinition() == null) { if (channel == null || channel.vmDefinition() == null) {
return; return;
} }
@ -248,6 +253,7 @@ public class DisplaySecretMonitor
* @param channel the channel * @param channel the channel
*/ */
@Handler @Handler
@SuppressWarnings("PMD.AvoidSynchronizedStatement")
public void onVmDefChanged(VmDefChanged event, Channel channel) { public void onVmDefChanged(VmDefChanged event, Channel channel) {
synchronized (pendingGets) { synchronized (pendingGets) {
String vmName = event.vmDefinition().metadata().getName(); String vmName = event.vmDefinition().metadata().getName();

View file

@ -1,74 +0,0 @@
/*
* VM-Operator
* Copyright (C) 2024 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.manager;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1Service;
import io.kubernetes.client.openapi.models.V1ServiceList;
import io.kubernetes.client.util.Watch.Response;
import io.kubernetes.client.util.generic.options.ListOptions;
import java.io.IOException;
import static org.jdrupes.vmoperator.common.Constants.APP_NAME;
import org.jdrupes.vmoperator.common.K8sClient;
import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
import org.jdrupes.vmoperator.common.K8sV1ServiceStub;
import org.jdrupes.vmoperator.manager.events.ServiceChanged;
import org.jdrupes.vmoperator.manager.events.VmChannel;
import org.jgrapes.core.Channel;
/**
* Watches for changes of services.
*/
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
public class ServiceMonitor
extends AbstractMonitor<V1Service, V1ServiceList, VmChannel> {
/**
* Instantiates a new display secrets monitor.
*
* @param componentChannel the component channel
*/
public ServiceMonitor(Channel componentChannel) {
super(componentChannel, V1Service.class, V1ServiceList.class);
context(K8sV1ServiceStub.CONTEXT);
ListOptions options = new ListOptions();
options.setLabelSelector("app.kubernetes.io/name=" + APP_NAME);
options(options);
}
@Override
protected void prepareMonitoring() throws IOException, ApiException {
client(new K8sClient());
}
@Override
protected void handleChange(K8sClient client, Response<V1Service> change) {
String vmName = change.object.getMetadata().getLabels()
.get("app.kubernetes.io/instance");
if (vmName == null) {
return;
}
var channel = channel(vmName).orElse(null);
if (channel == null || channel.vmDefinition() == null) {
return;
}
channel.pipeline().fire(new ServiceChanged(
ResponseType.valueOf(change.type), change.object), channel);
}
}

View file

@ -43,10 +43,12 @@ import org.jdrupes.vmoperator.common.VmDefinitionStub;
import static org.jdrupes.vmoperator.manager.Constants.APP_NAME; import static org.jdrupes.vmoperator.manager.Constants.APP_NAME;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_KIND_VM; import static org.jdrupes.vmoperator.manager.Constants.VM_OP_KIND_VM;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME; import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
import org.jdrupes.vmoperator.manager.events.ChannelManager;
import org.jdrupes.vmoperator.manager.events.VmChannel; import org.jdrupes.vmoperator.manager.events.VmChannel;
import org.jdrupes.vmoperator.manager.events.VmDefChanged; import org.jdrupes.vmoperator.manager.events.VmDefChanged;
import org.jdrupes.vmoperator.util.GsonPtr; import org.jdrupes.vmoperator.util.GsonPtr;
import org.jgrapes.core.Channel; import org.jgrapes.core.Channel;
import org.jgrapes.core.Event;
/** /**
* Watches for changes of VM definitions. * Watches for changes of VM definitions.
@ -55,14 +57,19 @@ import org.jgrapes.core.Channel;
public class VmMonitor extends public class VmMonitor extends
AbstractMonitor<VmDefinitionModel, VmDefinitionModels, VmChannel> { AbstractMonitor<VmDefinitionModel, VmDefinitionModels, VmChannel> {
private final ChannelManager<String, VmChannel, ?> channelManager;
/** /**
* Instantiates a new VM definition watcher. * Instantiates a new VM definition watcher.
* *
* @param componentChannel the component channel * @param componentChannel the component channel
* @param channelManager the channel manager
*/ */
public VmMonitor(Channel componentChannel) { public VmMonitor(Channel componentChannel,
ChannelManager<String, VmChannel, ?> channelManager) {
super(componentChannel, VmDefinitionModel.class, super(componentChannel, VmDefinitionModel.class,
VmDefinitionModels.class); VmDefinitionModels.class);
this.channelManager = channelManager;
} }
@Override @Override
@ -107,10 +114,7 @@ public class VmMonitor extends
protected void handleChange(K8sClient client, protected void handleChange(K8sClient client,
Watch.Response<VmDefinitionModel> response) { Watch.Response<VmDefinitionModel> response) {
V1ObjectMeta metadata = response.object.getMetadata(); V1ObjectMeta metadata = response.object.getMetadata();
VmChannel channel = channel(metadata.getName()).orElse(null); VmChannel channel = channelManager.channelGet(metadata.getName());
if (channel == null) {
return;
}
// Get full definition and associate with channel as backup // Get full definition and associate with channel as backup
var vmDef = response.object; var vmDef = response.object;
@ -132,13 +136,24 @@ public class VmMonitor extends
() -> "Cannot get model for " + response.object.getMetadata()); () -> "Cannot get model for " + response.object.getMetadata());
return; return;
} }
if (ResponseType.valueOf(response.type) == ResponseType.DELETED) {
channelManager.remove(metadata.getName());
}
// Create and fire event // Create and fire changed event. Remove channel from channel
// manager on completion.
channel.pipeline() channel.pipeline()
.fire(new VmDefChanged(ResponseType.valueOf(response.type), .fire(Event.onCompletion(
new VmDefChanged(ResponseType.valueOf(response.type),
channel.setGeneration( channel.setGeneration(
response.object.getMetadata().getGeneration()), response.object.getMetadata().getGeneration()),
vmDef), channel); vmDef),
e -> {
if (e.type() == ResponseType.DELETED) {
channelManager
.remove(e.vmDefinition().metadata().getName());
}
}), channel);
} }
private VmDefinitionModel getModel(K8sClient client, private VmDefinitionModel getModel(K8sClient client,

View file

@ -51,7 +51,8 @@ public class TimeSeries {
* @param numbers the numbers * @param numbers the numbers
* @return the time series * @return the time series
*/ */
@SuppressWarnings("PMD.AvoidLiteralsInIfCondition") @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition",
"PMD.AvoidSynchronizedStatement" })
public TimeSeries add(Instant time, Number... numbers) { public TimeSeries add(Instant time, Number... numbers) {
var newEntry = new Entry(time, numbers); var newEntry = new Entry(time, numbers);
boolean nothingNew = false; boolean nothingNew = false;
@ -83,6 +84,7 @@ public class TimeSeries {
* *
* @return the list * @return the list
*/ */
@SuppressWarnings("PMD.AvoidSynchronizedStatement")
public List<Entry> entries() { public List<Entry> entries() {
synchronized (data) { synchronized (data) {
return new ArrayList<>(data); return new ArrayList<>(data);

View file

@ -30,14 +30,14 @@ import java.math.BigDecimal;
import java.math.BigInteger; import java.math.BigInteger;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.HashSet; import java.util.EnumSet;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import org.jdrupes.json.JsonBeanDecoder; import org.jdrupes.json.JsonBeanDecoder;
import org.jdrupes.json.JsonDecodeException; import org.jdrupes.json.JsonDecodeException;
import org.jdrupes.vmoperator.common.K8sObserver; import org.jdrupes.vmoperator.common.K8sObserver;
import org.jdrupes.vmoperator.common.VmDefinitionModel; import org.jdrupes.vmoperator.common.VmDefinitionModel;
import org.jdrupes.vmoperator.manager.events.ChannelCache; import org.jdrupes.vmoperator.manager.events.ChannelTracker;
import org.jdrupes.vmoperator.manager.events.ModifyVm; import org.jdrupes.vmoperator.manager.events.ModifyVm;
import org.jdrupes.vmoperator.manager.events.VmChannel; import org.jdrupes.vmoperator.manager.events.VmChannel;
import org.jdrupes.vmoperator.manager.events.VmDefChanged; import org.jdrupes.vmoperator.manager.events.VmDefChanged;
@ -68,8 +68,8 @@ public class VmConlet extends FreeMarkerConlet<VmConlet.VmsModel> {
private static final Set<RenderMode> MODES = RenderMode.asSet( private static final Set<RenderMode> MODES = RenderMode.asSet(
RenderMode.Preview, RenderMode.View); RenderMode.Preview, RenderMode.View);
private final ChannelCache<String, VmChannel, private final ChannelTracker<String, VmChannel,
VmDefinitionModel> channelManager = new ChannelCache<>(); VmDefinitionModel> channelTracker = new ChannelTracker<>();
private final TimeSeries summarySeries = new TimeSeries(Duration.ofDays(1)); private final TimeSeries summarySeries = new TimeSeries(Duration.ofDays(1));
private Summary cachedSummary; private Summary cachedSummary;
@ -128,7 +128,7 @@ public class VmConlet extends FreeMarkerConlet<VmConlet.VmsModel> {
protected Set<RenderMode> doRenderConlet(RenderConletRequestBase<?> event, protected Set<RenderMode> doRenderConlet(RenderConletRequestBase<?> event,
ConsoleConnection channel, String conletId, VmsModel conletState) ConsoleConnection channel, String conletId, VmsModel conletState)
throws Exception { throws Exception {
Set<RenderMode> renderedAs = new HashSet<>(); Set<RenderMode> renderedAs = EnumSet.noneOf(RenderMode.class);
boolean sendVmInfos = false; boolean sendVmInfos = false;
if (event.renderAs().contains(RenderMode.Preview)) { if (event.renderAs().contains(RenderMode.Preview)) {
Template tpl Template tpl
@ -160,7 +160,7 @@ public class VmConlet extends FreeMarkerConlet<VmConlet.VmsModel> {
sendVmInfos = true; sendVmInfos = true;
} }
if (sendVmInfos) { if (sendVmInfos) {
for (var vmDef : channelManager.associated()) { for (var vmDef : channelTracker.associated()) {
var def var def
= JsonBeanDecoder.create(vmDef.data().toString()) = JsonBeanDecoder.create(vmDef.data().toString())
.readObject(); .readObject();
@ -188,7 +188,7 @@ public class VmConlet extends FreeMarkerConlet<VmConlet.VmsModel> {
throws JsonDecodeException, IOException { throws JsonDecodeException, IOException {
var vmName = event.vmDefinition().getMetadata().getName(); var vmName = event.vmDefinition().getMetadata().getName();
if (event.type() == K8sObserver.ResponseType.DELETED) { if (event.type() == K8sObserver.ResponseType.DELETED) {
channelManager.remove(vmName); channelTracker.remove(vmName);
for (var entry : conletIdsByConsoleConnection().entrySet()) { for (var entry : conletIdsByConsoleConnection().entrySet()) {
for (String conletId : entry.getValue()) { for (String conletId : entry.getValue()) {
entry.getKey().respond(new NotifyConletView(type(), entry.getKey().respond(new NotifyConletView(type(),
@ -198,7 +198,7 @@ public class VmConlet extends FreeMarkerConlet<VmConlet.VmsModel> {
} else { } else {
var vmDef = new VmDefinitionModel(channel.client().getJSON() var vmDef = new VmDefinitionModel(channel.client().getJSON()
.getGson(), cleanup(event.vmDefinition().data())); .getGson(), cleanup(event.vmDefinition().data()));
channelManager.put(vmName, channel, vmDef); channelTracker.put(vmName, channel, vmDef);
var def = JsonBeanDecoder.create(vmDef.data().toString()) var def = JsonBeanDecoder.create(vmDef.data().toString())
.readObject(); .readObject();
for (var entry : conletIdsByConsoleConnection().entrySet()) { for (var entry : conletIdsByConsoleConnection().entrySet()) {
@ -321,7 +321,7 @@ public class VmConlet extends FreeMarkerConlet<VmConlet.VmsModel> {
return cachedSummary; return cachedSummary;
} }
Summary summary = new Summary(); Summary summary = new Summary();
for (var vmDef : channelManager.associated()) { for (var vmDef : channelTracker.associated()) {
summary.totalVms += 1; summary.totalVms += 1;
var status = GsonPtr.to(vmDef.data()).to("status"); var status = GsonPtr.to(vmDef.data()).to("status");
summary.usedCpus += status.getAsInt("cpus").orElse(0); summary.usedCpus += status.getAsInt("cpus").orElse(0);
@ -347,7 +347,7 @@ public class VmConlet extends FreeMarkerConlet<VmConlet.VmsModel> {
throws Exception { throws Exception {
event.stop(); event.stop();
var vmName = event.params().asString(0); var vmName = event.params().asString(0);
var vmChannel = channelManager.channel(vmName).orElse(null); var vmChannel = channelTracker.channel(vmName).orElse(null);
if (vmChannel == null) { if (vmChannel == null) {
return; return;
} }

View file

@ -53,7 +53,7 @@ import org.jdrupes.vmoperator.common.K8sDynamicModel;
import org.jdrupes.vmoperator.common.K8sObserver; import org.jdrupes.vmoperator.common.K8sObserver;
import org.jdrupes.vmoperator.common.VmDefinitionModel; import org.jdrupes.vmoperator.common.VmDefinitionModel;
import org.jdrupes.vmoperator.common.VmDefinitionModel.Permission; import org.jdrupes.vmoperator.common.VmDefinitionModel.Permission;
import org.jdrupes.vmoperator.manager.events.ChannelCache; import org.jdrupes.vmoperator.manager.events.ChannelTracker;
import org.jdrupes.vmoperator.manager.events.GetDisplayPassword; import org.jdrupes.vmoperator.manager.events.GetDisplayPassword;
import org.jdrupes.vmoperator.manager.events.ModifyVm; import org.jdrupes.vmoperator.manager.events.ModifyVm;
import org.jdrupes.vmoperator.manager.events.ResetVm; import org.jdrupes.vmoperator.manager.events.ResetVm;
@ -122,8 +122,8 @@ public class VmViewer extends FreeMarkerConlet<VmViewer.ViewerModel> {
RenderMode.Preview, RenderMode.Edit); RenderMode.Preview, RenderMode.Edit);
private static final Set<RenderMode> MODES_FOR_GENERATED = RenderMode.asSet( private static final Set<RenderMode> MODES_FOR_GENERATED = RenderMode.asSet(
RenderMode.Preview, RenderMode.StickyPreview); RenderMode.Preview, RenderMode.StickyPreview);
private final ChannelCache<String, VmChannel, private final ChannelTracker<String, VmChannel,
VmDefinitionModel> channelManager = new ChannelCache<>(); VmDefinitionModel> channelTracker = new ChannelTracker<>();
private static ObjectMapper objectMapper private static ObjectMapper objectMapper
= new ObjectMapper().registerModule(new JavaTimeModule()); = new ObjectMapper().registerModule(new JavaTimeModule());
private Class<?> preferredIpVersion = Inet4Address.class; private Class<?> preferredIpVersion = Inet4Address.class;
@ -349,7 +349,7 @@ public class VmViewer extends FreeMarkerConlet<VmViewer.ViewerModel> {
// Remove conlet if definition has been removed // Remove conlet if definition has been removed
if (model.vmName() != null if (model.vmName() != null
&& !channelManager.associated(model.vmName()).isPresent()) { && !channelTracker.associated(model.vmName()).isPresent()) {
channel.respond( channel.respond(
new DeleteConlet(conletId, Collections.emptySet())); new DeleteConlet(conletId, Collections.emptySet()));
return Collections.emptySet(); return Collections.emptySet();
@ -357,7 +357,7 @@ public class VmViewer extends FreeMarkerConlet<VmViewer.ViewerModel> {
// Don't render if user has not at least one permission // Don't render if user has not at least one permission
if (model.vmName() != null if (model.vmName() != null
&& channelManager.associated(model.vmName()) && channelTracker.associated(model.vmName())
.map(d -> permissions(d, channel.session()).isEmpty()) .map(d -> permissions(d, channel.session()).isEmpty())
.orElse(true)) { .orElse(true)) {
return Collections.emptySet(); return Collections.emptySet();
@ -395,7 +395,7 @@ public class VmViewer extends FreeMarkerConlet<VmViewer.ViewerModel> {
} }
private List<String> accessibleVms(ConsoleConnection channel) { private List<String> accessibleVms(ConsoleConnection channel) {
return channelManager.associated().stream() return channelTracker.associated().stream()
.filter(d -> !permissions(d, channel.session()).isEmpty()) .filter(d -> !permissions(d, channel.session()).isEmpty())
.map(d -> d.getMetadata().getName()).sorted().toList(); .map(d -> d.getMetadata().getName()).sorted().toList();
} }
@ -419,7 +419,7 @@ public class VmViewer extends FreeMarkerConlet<VmViewer.ViewerModel> {
if (Strings.isNullOrEmpty(model.vmName())) { if (Strings.isNullOrEmpty(model.vmName())) {
return; return;
} }
channelManager.associated(model.vmName()).ifPresent(vmDef -> { channelTracker.associated(model.vmName()).ifPresent(vmDef -> {
try { try {
var def = JsonBeanDecoder.create(vmDef.data().toString()) var def = JsonBeanDecoder.create(vmDef.data().toString())
.readObject(); .readObject();
@ -465,9 +465,9 @@ public class VmViewer extends FreeMarkerConlet<VmViewer.ViewerModel> {
.remove("managedFields"); .remove("managedFields");
var vmName = vmDef.getMetadata().getName(); var vmName = vmDef.getMetadata().getName();
if (event.type() == K8sObserver.ResponseType.DELETED) { if (event.type() == K8sObserver.ResponseType.DELETED) {
channelManager.remove(vmName); channelTracker.remove(vmName);
} else { } else {
channelManager.put(vmName, channel, vmDef); channelTracker.put(vmName, channel, vmDef);
} }
for (var entry : conletIdsByConsoleConnection().entrySet()) { for (var entry : conletIdsByConsoleConnection().entrySet()) {
var connection = entry.getKey(); var connection = entry.getKey();
@ -502,12 +502,12 @@ public class VmViewer extends FreeMarkerConlet<VmViewer.ViewerModel> {
// Handle command for selected VM // Handle command for selected VM
var both = Optional.ofNullable(model.vmName()) var both = Optional.ofNullable(model.vmName())
.flatMap(vm -> channelManager.both(vm)); .flatMap(vm -> channelTracker.value(vm));
if (both.isEmpty()) { if (both.isEmpty()) {
return; return;
} }
var vmChannel = both.get().channel; var vmChannel = both.get().channel();
var vmDef = both.get().associated; var vmDef = both.get().associated();
var vmName = vmDef.metadata().getName(); var vmName = vmDef.metadata().getName();
var perms = permissions(vmDef, channel.session()); var perms = permissions(vmDef, channel.session());
var resourceBundle = resourceBundle(channel.locale()); var resourceBundle = resourceBundle(channel.locale());
@ -556,7 +556,7 @@ public class VmViewer extends FreeMarkerConlet<VmViewer.ViewerModel> {
private void openConsole(String vmName, ConsoleConnection connection, private void openConsole(String vmName, ConsoleConnection connection,
ViewerModel model, String password) { ViewerModel model, String password) {
var vmDef = channelManager.associated(vmName).orElse(null); var vmDef = channelTracker.associated(vmName).orElse(null);
if (vmDef == null) { if (vmDef == null) {
return; return;
} }