The client library is so not thread safe, adapt usage.

See https://github.com/kubernetes-client/java/issues/100
This commit is contained in:
Michael Lipp 2023-08-15 15:54:40 +02:00
parent 6cf5ecadc2
commit f56fe228aa
5 changed files with 40 additions and 18 deletions

View file

@ -33,6 +33,9 @@ import java.io.StringWriter;
import java.util.Map; import java.util.Map;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.jdrupes.vmoperator.manager.VmDefChanged.Type; import org.jdrupes.vmoperator.manager.VmDefChanged.Type;
import org.yaml.snakeyaml.LoaderOptions;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
/** /**
* Delegee for reconciling the config map * Delegee for reconciling the config map
@ -85,7 +88,8 @@ import org.jdrupes.vmoperator.manager.VmDefChanged.Type;
fmTemplate.process(model, out); fmTemplate.process(model, out);
// Avoid Yaml.load due to // Avoid Yaml.load due to
// https://github.com/kubernetes-client/java/issues/2741 // https://github.com/kubernetes-client/java/issues/2741
var mapDef = Dynamics.newFromYaml(out.toString()); var mapDef = Dynamics.newFromYaml(
new Yaml(new SafeConstructor(new LoaderOptions())), out.toString());
// Apply and maybe force pod update // Apply and maybe force pod update
var newState = K8s.apply(cmApi, mapDef, out.toString()); var newState = K8s.apply(cmApi, mapDef, out.toString());

View file

@ -20,7 +20,6 @@ package org.jdrupes.vmoperator.manager;
import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration; import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.util.Config;
import java.io.IOException; import java.io.IOException;
import org.jgrapes.core.Channel; import org.jgrapes.core.Channel;
import org.jgrapes.core.Component; import org.jgrapes.core.Component;
@ -53,7 +52,8 @@ public class Controller extends Component {
*/ */
@Handler(priority = 100) @Handler(priority = 100)
public void onStart(Start event) throws IOException, ApiException { public void onStart(Start event) throws IOException, ApiException {
var client = Config.defaultClient(); // Make sure to use thread specific client
Configuration.setDefaultApiClient(client); // https://github.com/kubernetes-client/java/issues/100
Configuration.setDefaultApiClient(null);
} }
} }

View file

@ -28,6 +28,9 @@ import java.io.StringWriter;
import java.util.Map; import java.util.Map;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.jdrupes.vmoperator.manager.VmDefChanged.Type; import org.jdrupes.vmoperator.manager.VmDefChanged.Type;
import org.yaml.snakeyaml.LoaderOptions;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
/** /**
* Delegee for reconciling the service * Delegee for reconciling the service
@ -79,7 +82,8 @@ import org.jdrupes.vmoperator.manager.VmDefChanged.Type;
fmTemplate.process(model, out); fmTemplate.process(model, out);
// Avoid Yaml.load due to // Avoid Yaml.load due to
// https://github.com/kubernetes-client/java/issues/2741 // https://github.com/kubernetes-client/java/issues/2741
var mapDef = Dynamics.newFromYaml(out.toString()); var mapDef = Dynamics.newFromYaml(
new Yaml(new SafeConstructor(new LoaderOptions())), out.toString());
// Apply // Apply
K8s.apply(svcApi, mapDef, out.toString()); K8s.apply(svcApi, mapDef, out.toString());

View file

@ -30,6 +30,9 @@ import java.io.StringWriter;
import java.util.Map; import java.util.Map;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.jdrupes.vmoperator.manager.VmDefChanged.Type; import org.jdrupes.vmoperator.manager.VmDefChanged.Type;
import org.yaml.snakeyaml.LoaderOptions;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;
/** /**
* Delegee for reconciling the stateful set (effectively the pod). * Delegee for reconciling the stateful set (effectively the pod).
@ -88,7 +91,8 @@ import org.jdrupes.vmoperator.manager.VmDefChanged.Type;
fmTemplate.process(model, out); fmTemplate.process(model, out);
// Avoid Yaml.load due to // Avoid Yaml.load due to
// https://github.com/kubernetes-client/java/issues/2741 // https://github.com/kubernetes-client/java/issues/2741
var stsDef = Dynamics.newFromYaml(out.toString()); var stsDef = Dynamics.newFromYaml(
new Yaml(new SafeConstructor(new LoaderOptions())), out.toString());
// If exists apply changes only when transitioning state // If exists apply changes only when transitioning state
// or not running. // or not running.

View file

@ -21,7 +21,6 @@ package org.jdrupes.vmoperator.manager;
import com.google.gson.reflect.TypeToken; import com.google.gson.reflect.TypeToken;
import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.ApisApi; import io.kubernetes.client.openapi.apis.ApisApi;
import io.kubernetes.client.openapi.apis.CustomObjectsApi; import io.kubernetes.client.openapi.apis.CustomObjectsApi;
import io.kubernetes.client.openapi.models.V1APIGroup; import io.kubernetes.client.openapi.models.V1APIGroup;
@ -29,6 +28,7 @@ import io.kubernetes.client.openapi.models.V1APIResource;
import io.kubernetes.client.openapi.models.V1GroupVersionForDiscovery; import io.kubernetes.client.openapi.models.V1GroupVersionForDiscovery;
import io.kubernetes.client.openapi.models.V1Namespace; import io.kubernetes.client.openapi.models.V1Namespace;
import io.kubernetes.client.openapi.models.V1ObjectMeta; import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch; import io.kubernetes.client.util.Watch;
import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi; import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi;
import io.kubernetes.client.util.generic.options.ListOptions; import io.kubernetes.client.util.generic.options.ListOptions;
@ -59,7 +59,6 @@ import org.jgrapes.util.events.ConfigurationUpdate;
@SuppressWarnings("PMD.DataflowAnomalyAnalysis") @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
public class VmWatcher extends Component { public class VmWatcher extends Component {
private ApiClient client;
private String namespaceToWatch; private String namespaceToWatch;
private final Map<String, VmChannel> channels = new ConcurrentHashMap<>(); private final Map<String, VmChannel> channels = new ConcurrentHashMap<>();
@ -115,7 +114,7 @@ public class VmWatcher extends Component {
.fine(() -> "Controlling namespace \"" + namespaceToWatch + "\"."); .fine(() -> "Controlling namespace \"" + namespaceToWatch + "\".");
// Get all our API versions // Get all our API versions
client = Configuration.getDefaultApiClient(); var client = Config.defaultClient();
var apis = new ApisApi(client).getAPIVersions(); var apis = new ApisApi(client).getAPIVersions();
var vmOpApiVersions = apis.getGroups().stream() var vmOpApiVersions = apis.getGroups().stream()
.filter(g -> g.getName().equals(VM_OP_GROUP)).findFirst() .filter(g -> g.getName().equals(VM_OP_GROUP)).findFirst()
@ -124,7 +123,7 @@ public class VmWatcher extends Component {
// Remove left overs // Remove left overs
var coa = new CustomObjectsApi(client); var coa = new CustomObjectsApi(client);
purge(coa, vmOpApiVersions); purge(client, coa, vmOpApiVersions);
// Start a watcher thread for each existing CRD version. // Start a watcher thread for each existing CRD version.
// The watcher will send us an "ADDED" for each existing VM. // The watcher will send us an "ADDED" for each existing VM.
@ -133,14 +132,14 @@ public class VmWatcher extends Component {
.getResources().stream() .getResources().stream()
.filter(r -> Constants.VM_OP_KIND_VM.equals(r.getKind())) .filter(r -> Constants.VM_OP_KIND_VM.equals(r.getKind()))
.findFirst() .findFirst()
.ifPresent(crd -> watchVmDefs(coa, crd, version)); .ifPresent(crd -> watchVmDefs(crd, version));
} }
} }
@SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops", @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops",
"PMD.CognitiveComplexity" }) "PMD.CognitiveComplexity" })
private void purge(CustomObjectsApi coa, List<String> vmOpApiVersions) private void purge(ApiClient client, CustomObjectsApi coa,
throws ApiException { List<String> vmOpApiVersions) throws ApiException {
// Get existing CRs (VMs) // Get existing CRs (VMs)
Set<String> known = new HashSet<>(); Set<String> known = new HashSet<>();
for (var version : vmOpApiVersions) { for (var version : vmOpApiVersions) {
@ -149,7 +148,7 @@ public class VmWatcher extends Component {
.getResources().stream() .getResources().stream()
.filter(r -> Constants.VM_OP_KIND_VM.equals(r.getKind())) .filter(r -> Constants.VM_OP_KIND_VM.equals(r.getKind()))
.findFirst() .findFirst()
.ifPresent(crd -> known.addAll(getKnown(crd, version))); .ifPresent(crd -> known.addAll(getKnown(client, crd, version)));
} }
ListOptions opts = new ListOptions(); ListOptions opts = new ListOptions();
@ -184,7 +183,8 @@ public class VmWatcher extends Component {
} }
} }
private Set<String> getKnown(V1APIResource crd, String version) { private Set<String> getKnown(ApiClient client, V1APIResource crd,
String version) {
Set<String> result = new HashSet<>(); Set<String> result = new HashSet<>();
var api = new DynamicKubernetesApi(VM_OP_GROUP, version, var api = new DynamicKubernetesApi(VM_OP_GROUP, version,
crd.getName(), client); crd.getName(), client);
@ -197,13 +197,14 @@ public class VmWatcher extends Component {
return result; return result;
} }
private void watchVmDefs(CustomObjectsApi coa, V1APIResource crd, private void watchVmDefs(V1APIResource crd, String version) {
String version) {
@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
var watcher = new Thread(() -> { var watcher = new Thread(() -> {
try { try {
// Watch sometimes terminates without apparent reason. // Watch sometimes terminates without apparent reason.
while (true) { while (true) {
var client = Config.defaultClient();
var coa = new CustomObjectsApi(client);
var call = coa.listNamespacedCustomObjectCall(VM_OP_GROUP, var call = coa.listNamespacedCustomObjectCall(VM_OP_GROUP,
version, namespaceToWatch, crd.getName(), null, false, version, namespaceToWatch, crd.getName(), null, false,
null, null, null, null, null, null, null, true, null); null, null, null, null, null, null, null, true, null);
@ -233,7 +234,16 @@ public class VmWatcher extends Component {
Watch.Response<V1Namespace> item) { Watch.Response<V1Namespace> item) {
V1ObjectMeta metadata = item.object.getMetadata(); V1ObjectMeta metadata = item.object.getMetadata();
VmChannel channel = channels.computeIfAbsent(metadata.getName(), VmChannel channel = channels.computeIfAbsent(metadata.getName(),
k -> new VmChannel(channel(), newEventPipeline(), client)); k -> {
try {
return new VmChannel(channel(), newEventPipeline(),
Config.defaultClient());
} catch (IOException e) {
logger.log(Level.SEVERE, e, () -> "Failed to create client"
+ " for handling changes: " + e.getMessage());
return null;
}
});
channel.pipeline().fire(new VmDefChanged(VmDefChanged.Type channel.pipeline().fire(new VmDefChanged(VmDefChanged.Type
.valueOf(item.type), vmsCrd, item.object), channel); .valueOf(item.type), vmsCrd, item.object), channel);
} }