Prepare release v3.4

This commit is contained in:
Michael Lipp 2024-10-06 10:05:09 +00:00
parent 31a3f79e2a
commit 54445ef531
12 changed files with 274 additions and 504 deletions

View file

@ -0,0 +1,113 @@
/*
* VM-Operator
* Copyright (C) 2024 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.manager.events;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
import org.jgrapes.core.Channel;
/**
* Supports the lookup of a channel by a name (an id). As a convenience,
* it is possible to additionally associate arbitrary data with the entry
* (and thus with the channel). Note that this interface defines a
* read-only view of the dictionary.
*
* @param <K> the key type
* @param <C> the channel type
* @param <A> the type of the associated data
*/
public interface ChannelDictionary<K, C extends Channel, A> {
/**
* Combines the channel and the associated data.
*
* @param <C> the channel type
* @param <A> the type of the associated data
* @param channel the channel
* @param associated the associated
*/
@SuppressWarnings("PMD.ShortClassName")
public record Value<C extends Channel, A>(C channel, A associated) {
}
/**
* Returns all known keys.
*
* @return the keys
*/
Set<K> keys();
/**
* Return all known values.
*
* @return the collection
*/
Collection<Value<C, A>> values();
/**
* Returns the channel and associates data registered for the key
* or an empty optional if no entry exists.
*
* @param key the key
* @return the result
*/
Optional<Value<C, A>> value(K key);
/**
* Return all known channels.
*
* @return the collection
*/
default Collection<C> channels() {
return values().stream().map(v -> v.channel).toList();
}
/**
* Returns the channel registered for the key or an empty optional
* if no mapping exists.
*
* @param key the key
* @return the optional
*/
default Optional<C> channel(K key) {
return value(key).map(b -> b.channel);
}
/**
* Returns all known associated data.
*
* @return the collection
*/
default Collection<A> associated() {
return values().stream()
.filter(v -> v.associated() != null)
.map(v -> v.associated).toList();
}
/**
* Return the data associated with the entry for the channel.
*
* @param key the key
* @return the data
*/
default Optional<A> associated(K key) {
return value(key).map(b -> b.associated);
}
}

View file

@ -27,53 +27,24 @@ import java.util.function.Function;
import org.jgrapes.core.Channel;
/**
* A channel manager that maintains mappings from a key to a channel.
* As a convenience, it is possible to additionally associate arbitrary
* data with the entry (and thus with the channel).
* Provides an actively managed implementation of the {@link ChannelDictionary}.
*
* The manager should be used by a component that defines channels for
* housekeeping. It can be shared between this component and another
* component, preferably using the {@link #fixed()} view for the
* second component. Alternatively, the second component can use a
* {@link ChannelCache} to track the mappings using events.
* The {@link ChannelManager} can be used for housekeeping by any component
* that creates channels. It can be shared between this component and
* some other component, preferably passing it as {@link ChannelDictionary}
* (the read-only view) to the second component. Alternatively, the other
* component can use a {@link ChannelTracker} to track the mappings using
* events.
*
* @param <K> the key type
* @param <C> the channel type
* @param <A> the type of the associated data
*/
public class ChannelManager<K, C extends Channel, A> {
public class ChannelManager<K, C extends Channel, A>
implements ChannelDictionary<K, C, A> {
private final Map<K, Both<C, A>> channels = new ConcurrentHashMap<>();
private final Map<K, Value<C, A>> entries = new ConcurrentHashMap<>();
private final Function<K, C> supplier;
private ChannelManager<K, C, A> readOnly;
/**
* Combines the channel and the associated data.
*
* @param <C> the generic type
* @param <A> the generic type
*/
@SuppressWarnings("PMD.ShortClassName")
public static class Both<C extends Channel, A> {
/** The channel. */
public C channel;
/** The associated. */
public A associated;
/**
* Instantiates a new both.
*
* @param channel the channel
* @param associated the associated
*/
public Both(C channel, A associated) {
super();
this.channel = channel;
this.associated = associated;
}
}
/**
* Instantiates a new channel manager.
@ -91,6 +62,21 @@ public class ChannelManager<K, C extends Channel, A> {
this(k -> null);
}
@Override
public Set<K> keys() {
return entries.keySet();
}
/**
* Return all known values.
*
* @return the collection
*/
@Override
public Collection<Value<C, A>> values() {
return entries.values();
}
/**
* Returns the channel and associates data registered for the key
* or an empty optional if no mapping exists.
@ -98,10 +84,8 @@ public class ChannelManager<K, C extends Channel, A> {
* @param key the key
* @return the result
*/
public Optional<Both<C, A>> both(K key) {
synchronized (channels) {
return Optional.ofNullable(channels.get(key));
}
public Optional<Value<C, A>> value(K key) {
return Optional.ofNullable(entries.get(key));
}
/**
@ -113,7 +97,7 @@ public class ChannelManager<K, C extends Channel, A> {
* @return the channel manager
*/
public ChannelManager<K, C, A> put(K key, C channel, A associated) {
channels.put(key, new Both<>(channel, associated));
entries.put(key, new Value<>(channel, associated));
return this;
}
@ -129,17 +113,6 @@ public class ChannelManager<K, C extends Channel, A> {
return this;
}
/**
* Returns the channel registered for the key or an empty optional
* if no mapping exists.
*
* @param key the key
* @return the optional
*/
public Optional<C> channel(K key) {
return both(key).map(b -> b.channel);
}
/**
* Returns the {@link Channel} for the given name, creating it using
* the supplier passed to the constructor if it doesn't exist yet.
@ -147,8 +120,8 @@ public class ChannelManager<K, C extends Channel, A> {
* @param key the key
* @return the channel
*/
public Optional<C> getChannel(K key) {
return getChannel(key, supplier);
public C channelGet(K key) {
return computeIfAbsent(key, supplier);
}
/**
@ -161,17 +134,9 @@ public class ChannelManager<K, C extends Channel, A> {
*/
@SuppressWarnings({ "PMD.AssignmentInOperand",
"PMD.DataflowAnomalyAnalysis" })
public Optional<C> getChannel(K key, Function<K, C> supplier) {
synchronized (channels) {
return Optional
.of(Optional.ofNullable(channels.get(key))
.map(v -> v.channel)
.orElseGet(() -> {
var channel = supplier.apply(key);
channels.put(key, new Both<>(channel, null));
return channel;
}));
}
public C computeIfAbsent(K key, Function<K, C> supplier) {
return entries.computeIfAbsent(key,
k -> new Value<>(supplier.apply(k), null)).channel();
}
/**
@ -183,121 +148,17 @@ public class ChannelManager<K, C extends Channel, A> {
* @return the channel manager
*/
public ChannelManager<K, C, A> associate(K key, A data) {
synchronized (channels) {
Optional.ofNullable(channels.get(key))
.ifPresent(v -> v.associated = data);
}
Optional.ofNullable(entries.computeIfPresent(key,
(k, existing) -> new Value<>(existing.channel(), data)));
return this;
}
/**
* Return the data associated with the entry for the channel.
*
* @param key the key
* @return the data
*/
public Optional<A> associated(K key) {
return both(key).map(b -> b.associated);
}
/**
* Returns all associated data.
*
* @return the collection
*/
public Collection<A> associated() {
synchronized (channels) {
return channels.values().stream()
.filter(v -> v.associated != null)
.map(v -> v.associated).toList();
}
}
/**
* Removes the channel with the given name.
*
* @param name the name
*/
public void remove(String name) {
synchronized (channels) {
channels.remove(name);
}
}
/**
* Returns all known keys.
*
* @return the sets the
*/
public Set<K> keys() {
return channels.keySet();
}
/**
* Returns a read only view of this channel manager. The methods
* that usually create a new entry refrain from doing so. The
* methods that change the value of channel and {@link #remove(String)}
* do nothing. The associated data, however, can still be changed.
*
* @return the channel manager
*/
public ChannelManager<K, C, A> fixed() {
if (readOnly == null) {
readOnly = new ChannelManager<>(supplier) {
@Override
public Optional<Both<C, A>> both(K key) {
return ChannelManager.this.both(key);
}
@Override
public ChannelManager<K, C, A> put(K key, C channel,
A associated) {
return associate(key, associated);
}
@Override
public Optional<C> getChannel(K key) {
return ChannelManager.this.channel(key);
}
@Override
public Optional<C> getChannel(K key, Function<K, C> supplier) {
return ChannelManager.this.channel(key);
}
@Override
public ChannelManager<K, C, A> associate(K key, A data) {
return ChannelManager.this.associate(key, data);
}
@Override
public Optional<A> associated(K key) {
return ChannelManager.this.associated(key);
}
@Override
public Collection<A> associated() {
return ChannelManager.this.associated();
}
@Override
public void remove(String name) {
// Do nothing
}
@Override
public Set<K> keys() {
return ChannelManager.this.keys();
}
@Override
public ChannelManager<K, C, A> fixed() {
return ChannelManager.this.fixed();
}
};
}
return readOnly;
entries.remove(name);
}
}

View file

@ -19,6 +19,7 @@
package org.jdrupes.vmoperator.manager.events;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
@ -27,20 +28,30 @@ import java.util.concurrent.ConcurrentHashMap;
import org.jgrapes.core.Channel;
/**
* A channel manager that tracks mappings from a key to a channel using
* "add/remove" (or "open/close") events and the channels on which they
* are delivered.
* Used to track mapping from a key to a channel. Entries must
* be maintained by handlers for "add/remove" (or "open/close")
* events delivered on the channels that are to be
* made available by the tracker.
*
* The channels are stored in the dictionary using {@link WeakReference}s.
* Removing entries is therefore best practice but not an absolute necessity
* as entries for cleared references are removed when one of the methods
* {@link #values()}, {@link #channels()} or {@link #associated()} is called.
*
* @param <K> the key type
* @param <C> the channel type
* @param <A> the type of the associated data
*/
public class ChannelCache<K, C extends Channel, A> {
public class ChannelTracker<K, C extends Channel, A>
implements ChannelDictionary<K, C, A> {
private final Map<K, Data<C, A>> channels = new ConcurrentHashMap<>();
private final Map<K, Data<C, A>> entries = new ConcurrentHashMap<>();
/**
* Helper
* Combines the channel and associated data.
*
* @param <C> the generic type
* @param <A> the generic type
*/
@SuppressWarnings("PMD.ShortClassName")
private static class Data<C extends Channel, A> {
@ -57,32 +68,24 @@ public class ChannelCache<K, C extends Channel, A> {
}
}
/**
* Combines the channel and the associated data.
*
* @param <C> the generic type
* @param <A> the generic type
*/
@SuppressWarnings("PMD.ShortClassName")
public static class Both<C extends Channel, A> {
@Override
public Set<K> keys() {
return entries.keySet();
}
/** The channel. */
public C channel;
/** The associated. */
public A associated;
/**
* Instantiates a new both.
*
* @param channel the channel
* @param associated the associated
*/
public Both(C channel, A associated) {
super();
this.channel = channel;
this.associated = associated;
@Override
public Collection<Value<C, A>> values() {
var result = new ArrayList<Value<C, A>>();
for (var itr = entries.entrySet().iterator(); itr.hasNext();) {
var value = itr.next().getValue();
var channel = value.channel.get();
if (channel == null) {
itr.remove();
continue;
}
result.add(new Value<>(channel, value.associated));
}
return result;
}
/**
@ -92,20 +95,18 @@ public class ChannelCache<K, C extends Channel, A> {
* @param key the key
* @return the result
*/
public Optional<Both<C, A>> both(K key) {
synchronized (channels) {
var value = channels.get(key);
if (value == null) {
return Optional.empty();
}
var channel = value.channel.get();
if (channel == null) {
// Cleanup old reference
channels.remove(key);
return Optional.empty();
}
return Optional.of(new Both<>(channel, value.associated));
public Optional<Value<C, A>> value(K key) {
var value = entries.get(key);
if (value == null) {
return Optional.empty();
}
var channel = value.channel.get();
if (channel == null) {
// Cleanup old reference
entries.remove(key);
return Optional.empty();
}
return Optional.of(new Value<>(channel, value.associated));
}
/**
@ -116,10 +117,10 @@ public class ChannelCache<K, C extends Channel, A> {
* @param associated the associated
* @return the channel manager
*/
public ChannelCache<K, C, A> put(K key, C channel, A associated) {
public ChannelTracker<K, C, A> put(K key, C channel, A associated) {
Data<C, A> data = new Data<>(channel);
data.associated = associated;
channels.put(key, data);
entries.put(key, data);
return this;
}
@ -130,22 +131,11 @@ public class ChannelCache<K, C extends Channel, A> {
* @param channel the channel
* @return the channel manager
*/
public ChannelCache<K, C, A> put(K key, C channel) {
public ChannelTracker<K, C, A> put(K key, C channel) {
put(key, channel, null);
return this;
}
/**
* Returns the channel registered for the key or an empty optional
* if no mapping exists.
*
* @param key the key
* @return the optional
*/
public Optional<C> channel(K key) {
return both(key).map(b -> b.channel);
}
/**
* Associate the entry for the channel with the given data. The entry
* for the channel must already exist.
@ -154,54 +144,18 @@ public class ChannelCache<K, C extends Channel, A> {
* @param data the data
* @return the channel manager
*/
public ChannelCache<K, C, A> associate(K key, A data) {
synchronized (channels) {
Optional.ofNullable(channels.get(key))
.ifPresent(v -> v.associated = data);
}
public ChannelTracker<K, C, A> associate(K key, A data) {
Optional.ofNullable(entries.get(key))
.ifPresent(v -> v.associated = data);
return this;
}
/**
* Return the data associated with the entry for the channel.
*
* @param key the key
* @return the data
*/
public Optional<A> associated(K key) {
return both(key).map(b -> b.associated);
}
/**
* Returns all associated data.
*
* @return the collection
*/
public Collection<A> associated() {
synchronized (channels) {
return channels.values().stream()
.filter(v -> v.channel.get() != null && v.associated != null)
.map(v -> v.associated).toList();
}
}
/**
* Removes the channel with the given name.
*
* @param name the name
*/
public void remove(String name) {
synchronized (channels) {
channels.remove(name);
}
}
/**
* Returns all known keys.
*
* @return the sets the
*/
public Set<K> keys() {
return channels.keySet();
entries.remove(name);
}
}

View file

@ -1,76 +0,0 @@
/*
* VM-Operator
* Copyright (C) 2024 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.manager.events;
import io.kubernetes.client.openapi.models.V1Service;
import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
import org.jgrapes.core.Channel;
import org.jgrapes.core.Components;
import org.jgrapes.core.Event;
/**
* Indicates that a service has changed.
*/
@SuppressWarnings("PMD.DataClass")
public class ServiceChanged extends Event<Void> {
private final ResponseType type;
private final V1Service service;
/**
* Initializes a new service changed event.
*
* @param type the type
* @param service the service
*/
public ServiceChanged(ResponseType type, V1Service service) {
this.type = type;
this.service = service;
}
/**
* Returns the type.
*
* @return the type
*/
public ResponseType type() {
return type;
}
/**
* Gets the service.
*
* @return the service
*/
public V1Service service() {
return service;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(Components.objectName(this)).append(" [")
.append(service.getMetadata().getName()).append(' ').append(type);
if (channels() != null) {
builder.append(", channels=").append(Channel.toString(channels()));
}
builder.append(']');
return builder.toString();
}
}