Improve tracking.

This commit is contained in:
Michael Lipp 2024-11-13 23:45:54 +01:00
parent 9773207307
commit 4d447717c2
6 changed files with 375 additions and 178 deletions

View file

@ -0,0 +1,159 @@
/*
* VM-Operator
* Copyright (C) 2024 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.runner.qemu;
import com.google.gson.JsonObject;
import io.kubernetes.client.apimachinery.GroupVersionKind;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.EventsV1Event;
import java.io.IOException;
import java.util.logging.Level;
import static org.jdrupes.vmoperator.common.Constants.APP_NAME;
import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP;
import static org.jdrupes.vmoperator.common.Constants.VM_OP_KIND_VM;
import org.jdrupes.vmoperator.common.K8s;
import org.jdrupes.vmoperator.common.K8sClient;
import org.jdrupes.vmoperator.common.VmDefinitionStub;
import org.jdrupes.vmoperator.runner.qemu.events.Exit;
import org.jdrupes.vmoperator.runner.qemu.events.SpiceDisconnectedEvent;
import org.jdrupes.vmoperator.runner.qemu.events.SpiceInitializedEvent;
import org.jgrapes.core.Channel;
import org.jgrapes.core.annotation.Handler;
import org.jgrapes.core.events.Start;
/**
* A (sub)component that updates the console status in the CR status.
* Created as child of {@link StatusUpdater}.
*/
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
public class ConsoleTracker extends VmDefUpdater {
private final K8sClient apiClient;
private VmDefinitionStub vmStub;
private String mainChannelClientHost;
private long mainChannelClientPort;
/**
* Instantiates a new status updater.
*
* @param componentChannel the component channel
*/
@SuppressWarnings("PMD.ConstructorCallsOverridableMethod")
public ConsoleTracker(Channel componentChannel) {
super(componentChannel);
apiClient = (K8sClient) io.kubernetes.client.openapi.Configuration
.getDefaultApiClient();
}
/**
* Handle the start event.
*
* @param event the event
* @throws IOException
* @throws ApiException
*/
@Handler
public void onStart(Start event) {
if (namespace == null) {
return;
}
try {
vmStub = VmDefinitionStub.get(apiClient,
new GroupVersionKind(VM_OP_GROUP, "", VM_OP_KIND_VM),
namespace, vmName);
} catch (ApiException e) {
logger.log(Level.SEVERE, e,
() -> "Cannot access VM object, terminating.");
event.cancel(true);
fire(new Exit(1));
}
}
/**
* On spice connected.
*
* @param event the event
* @throws ApiException the api exception
*/
@Handler
@SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition",
"PMD.AvoidDuplicateLiterals" })
public void onSpiceInitialized(SpiceInitializedEvent event)
throws ApiException {
if (vmStub == null) {
return;
}
// Only process connections using main channel.
if (event.channelType() != 1) {
return;
}
mainChannelClientHost = event.clientHost();
mainChannelClientPort = event.clientPort();
vmStub.updateStatus(from -> {
JsonObject status = from.status();
status.addProperty("consoleClient", event.clientHost());
updateCondition(apiClient, from, status, "ConsoleConnected",
true, "Connection from " + event.clientHost(), null);
return status;
});
// Log event
var evt = new EventsV1Event()
.reportingController(VM_OP_GROUP + "/" + APP_NAME)
.action("ConsoleConnectionUpdate")
.reason("Connection from " + event.clientHost());
K8s.createEvent(apiClient, vmStub.model().get(), evt);
}
/**
* On spice disconnected.
*
* @param event the event
* @throws ApiException the api exception
*/
@Handler
@SuppressWarnings("PMD.AvoidDuplicateLiterals")
public void onSpiceDisconnected(SpiceDisconnectedEvent event)
throws ApiException {
if (vmStub == null) {
return;
}
// Only process disconnects from main channel.
if (!event.clientHost().equals(mainChannelClientHost)
|| event.clientPort() != mainChannelClientPort) {
return;
}
vmStub.updateStatus(from -> {
JsonObject status = from.status();
status.addProperty("consoleClient", "");
updateCondition(apiClient, from, status, "ConsoleConnected",
false, event.clientHost() + " has disconnected", null);
return status;
});
// Log event
var evt = new EventsV1Event()
.reportingController(VM_OP_GROUP + "/" + APP_NAME)
.action("ConsoleConnectionUpdate")
.reason("Disconnected from " + event.clientHost());
K8s.createEvent(apiClient, vmStub.model().get(), evt);
}
}

View file

@ -27,22 +27,13 @@ import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.EventsV1Event;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Level;
import java.util.stream.Collectors;
import static org.jdrupes.vmoperator.common.Constants.APP_NAME;
import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP;
import static org.jdrupes.vmoperator.common.Constants.VM_OP_KIND_VM;
import org.jdrupes.vmoperator.common.K8s;
import org.jdrupes.vmoperator.common.K8sClient;
import org.jdrupes.vmoperator.common.K8sDynamicModel;
import org.jdrupes.vmoperator.common.VmDefinitionModel;
import org.jdrupes.vmoperator.common.VmDefinitionStub;
import org.jdrupes.vmoperator.runner.qemu.events.BalloonChangeEvent;
@ -53,28 +44,21 @@ import org.jdrupes.vmoperator.runner.qemu.events.HotpluggableCpuStatus;
import org.jdrupes.vmoperator.runner.qemu.events.RunnerStateChange;
import org.jdrupes.vmoperator.runner.qemu.events.RunnerStateChange.RunState;
import org.jdrupes.vmoperator.runner.qemu.events.ShutdownEvent;
import org.jdrupes.vmoperator.runner.qemu.events.SpiceConnectedEvent;
import org.jdrupes.vmoperator.runner.qemu.events.SpiceDisconnectedEvent;
import org.jdrupes.vmoperator.util.GsonPtr;
import org.jgrapes.core.Channel;
import org.jgrapes.core.Component;
import org.jgrapes.core.annotation.Handler;
import org.jgrapes.core.events.HandlingError;
import org.jgrapes.core.events.Start;
import org.jgrapes.util.events.ConfigurationUpdate;
import org.jgrapes.util.events.InitialConfiguration;
/**
* Updates the CR status.
*/
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
public class StatusUpdater extends Component {
public class StatusUpdater extends VmDefUpdater {
private static final Set<RunState> RUNNING_STATES
= Set.of(RunState.RUNNING, RunState.TERMINATING);
private String namespace;
private String vmName;
private K8sClient apiClient;
private long observedGeneration;
private boolean guestShutdownStops;
@ -98,6 +82,7 @@ public class StatusUpdater extends Component {
() -> "Cannot access events API, terminating.");
fire(new Exit(1));
}
attach(new ConsoleTracker(componentChannel));
}
/**
@ -114,43 +99,6 @@ public class StatusUpdater extends Component {
}
}
/**
* On configuration update.
*
* @param event the event
*/
@Handler
@SuppressWarnings("unchecked")
public void onConfigurationUpdate(ConfigurationUpdate event) {
event.structured("/Runner").ifPresent(c -> {
if (event instanceof InitialConfiguration) {
namespace = (String) c.get("namespace");
updateNamespace();
vmName = Optional.ofNullable((Map<String, String>) c.get("vm"))
.map(vm -> vm.get("name")).orElse(null);
}
});
}
private void updateNamespace() {
if (namespace == null) {
var path = Path
.of("/var/run/secrets/kubernetes.io/serviceaccount/namespace");
if (Files.isReadable(path)) {
try {
namespace = Files.lines(path).findFirst().orElse(null);
} catch (IOException e) {
logger.log(Level.WARNING, e,
() -> "Cannot read namespace.");
}
}
}
if (namespace == null) {
logger.warning(() -> "Namespace is unknown, some functions"
+ " won't be available.");
}
}
/**
* Handle the start event.
*
@ -238,13 +186,9 @@ public class StatusUpdater extends Component {
}
vmStub.updateStatus(vmDef, from -> {
JsonObject status = from.status();
status.getAsJsonArray("conditions").asList().stream()
.map(cond -> (JsonObject) cond)
.forEach(cond -> {
if ("Running".equals(cond.get("type").getAsString())) {
updateRunningCondition(event, from, cond);
}
});
boolean running = RUNNING_STATES.contains(event.runState());
updateCondition(apiClient, vmDef, vmDef.status(), "Running",
running, event.reason(), event.message());
if (event.runState() == RunState.STARTING) {
status.addProperty("ram", GsonPtr.to(from.data())
.getAsString("spec", "vm", "maximumRam").orElse("0"));
@ -253,6 +197,13 @@ public class StatusUpdater extends Component {
status.addProperty("ram", "0");
status.addProperty("cpus", 0);
}
// In case console connection was still present
if (!running) {
status.addProperty("consoleClient", "");
updateCondition(apiClient, from, status, "ConsoleConnected",
false, "VM has stopped", null);
}
return status;
});
@ -278,29 +229,6 @@ public class StatusUpdater extends Component {
K8s.createEvent(apiClient, vmDef, evt);
}
private void updateRunningCondition(RunnerStateChange event,
K8sDynamicModel from, JsonObject cond) {
@SuppressWarnings("PMD.AvoidDuplicateLiterals")
boolean reportedRunning
= "True".equals(cond.get("status").getAsString());
if (RUNNING_STATES.contains(event.runState())
&& !reportedRunning) {
cond.addProperty("status", "True");
cond.addProperty("lastTransitionTime",
Instant.now().toString());
}
if (!RUNNING_STATES.contains(event.runState())
&& reportedRunning) {
cond.addProperty("status", "False");
cond.addProperty("lastTransitionTime",
Instant.now().toString());
}
cond.addProperty("reason", event.reason());
cond.addProperty("message", event.message());
cond.addProperty("observedGeneration",
from.getMetadata().getGeneration());
}
/**
* On ballon change.
*
@ -369,91 +297,4 @@ public class StatusUpdater extends Component {
public void onShutdown(ShutdownEvent event) throws ApiException {
shutdownByGuest = event.byGuest();
}
/**
* On spice connected.
*
* @param event the event
* @throws ApiException the api exception
*/
@Handler
public void onSpiceConnected(SpiceConnectedEvent event)
throws ApiException {
if (vmStub == null) {
return;
}
vmStub.updateStatus(from -> {
JsonObject status = from.status();
status.addProperty("consoleClient", event.clientHost());
updateConsoleConnectedCondition(from, status, true);
return status;
});
// Log event
var evt = new EventsV1Event()
.reportingController(VM_OP_GROUP + "/" + APP_NAME)
.action("ConsoleConnectionUpdate")
.reason("Connection from " + event.clientHost());
K8s.createEvent(apiClient, vmStub.model().get(), evt);
}
/**
* On spice disconnected.
*
* @param event the event
* @throws ApiException the api exception
*/
@Handler
public void onSpiceDisconnected(SpiceDisconnectedEvent event)
throws ApiException {
if (vmStub == null) {
return;
}
vmStub.updateStatus(from -> {
JsonObject status = from.status();
status.addProperty("consoleClient", "");
updateConsoleConnectedCondition(from, status, false);
return status;
});
// Log event
var evt = new EventsV1Event()
.reportingController(VM_OP_GROUP + "/" + APP_NAME)
.action("ConsoleConnectionUpdate")
.reason("Disconnected from " + event.clientHost());
K8s.createEvent(apiClient, vmStub.model().get(), evt);
}
private void updateConsoleConnectedCondition(VmDefinitionModel from,
JsonObject status, boolean connected) {
// Optimize, as we can get this several times
var current = status.getAsJsonArray("conditions").asList().stream()
.map(cond -> (JsonObject) cond)
.filter(cond -> "ConsoleConnected"
.equals(cond.get("type").getAsString()))
.findFirst()
.map(cond -> "True".equals(cond.get("status").getAsString()));
if (current.isPresent() && current.get() == connected) {
return;
}
// Do update
final var condition = Map.of("type", "ConsoleConnected",
"status", connected ? "True" : "False",
"observedGeneration", from.getMetadata().getGeneration(),
"reason", connected ? "Connected" : "Disconnected",
"lastTransitionTime", Instant.now().toString());
List<Object> toReplace = new ArrayList<>(List.of(condition));
List<Object> newConds
= status.getAsJsonArray("conditions").asList().stream()
.map(cond -> (JsonObject) cond)
.map(cond -> "ConsoleConnected"
.equals(cond.get("type").getAsString())
? toReplace.remove(0)
: cond)
.collect(Collectors.toCollection(() -> new ArrayList<>()));
newConds.addAll(toReplace);
status.add("conditions",
apiClient.getJSON().getGson().toJsonTree(newConds));
}
}

View file

@ -0,0 +1,141 @@
/*
* VM-Operator
* Copyright (C) 2024 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.runner.qemu;
import com.google.gson.JsonObject;
import io.kubernetes.client.openapi.ApiClient;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.logging.Level;
import java.util.stream.Collectors;
import org.jdrupes.vmoperator.common.VmDefinitionModel;
import org.jgrapes.core.Channel;
import org.jgrapes.core.Component;
import org.jgrapes.core.annotation.Handler;
import org.jgrapes.util.events.ConfigurationUpdate;
import org.jgrapes.util.events.InitialConfiguration;
/**
* Updates the CR status.
*/
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
public class VmDefUpdater extends Component {
protected String namespace;
protected String vmName;
/**
* Instantiates a new status updater.
*
* @param componentChannel the component channel
*/
@SuppressWarnings("PMD.ConstructorCallsOverridableMethod")
public VmDefUpdater(Channel componentChannel) {
super(componentChannel);
}
/**
* On configuration update.
*
* @param event the event
*/
@Handler
@SuppressWarnings("unchecked")
public void onConfigurationUpdate(ConfigurationUpdate event) {
event.structured("/Runner").ifPresent(c -> {
if (event instanceof InitialConfiguration) {
namespace = (String) c.get("namespace");
updateNamespace();
vmName = Optional.ofNullable((Map<String, String>) c.get("vm"))
.map(vm -> vm.get("name")).orElse(null);
}
});
}
private void updateNamespace() {
if (namespace == null) {
var path = Path
.of("/var/run/secrets/kubernetes.io/serviceaccount/namespace");
if (Files.isReadable(path)) {
try {
namespace = Files.lines(path).findFirst().orElse(null);
} catch (IOException e) {
logger.log(Level.WARNING, e,
() -> "Cannot read namespace.");
}
}
}
if (namespace == null) {
logger.warning(() -> "Namespace is unknown, some functions"
+ " won't be available.");
}
}
/**
* Update condition.
*
* @param apiClient the api client
* @param from the vM definition
* @param status the current status
* @param type the condition type
* @param state the new state
* @param reason the reason for the change
*/
protected void updateCondition(ApiClient apiClient, VmDefinitionModel from,
JsonObject status, String type, boolean state, String reason,
String message) {
// Optimize, as we can get this several times
var current = status.getAsJsonArray("conditions").asList().stream()
.map(cond -> (JsonObject) cond)
.filter(cond -> type.equals(cond.get("type").getAsString()))
.findFirst()
.map(cond -> "True".equals(cond.get("status").getAsString()));
if (current.isPresent() && current.get() == state) {
return;
}
// Do update
final var condition = new HashMap<>(Map.of("type", type,
"status", state ? "True" : "False",
"observedGeneration", from.getMetadata().getGeneration(),
"reason", reason,
"lastTransitionTime", Instant.now().toString()));
if (message != null) {
condition.put("message", message);
}
List<Object> toReplace = new ArrayList<>(List.of(condition));
List<Object> newConds
= status.getAsJsonArray("conditions").asList().stream()
.map(cond -> (JsonObject) cond)
.map(cond -> type.equals(cond.get("type").getAsString())
? toReplace.remove(0)
: cond)
.collect(Collectors.toCollection(() -> new ArrayList<>()));
newConds.addAll(toReplace);
status.add("conditions",
apiClient.getJSON().getGson().toJsonTree(newConds));
}
}

View file

@ -35,7 +35,7 @@ public class MonitorEvent extends Event<Void> {
*/
public enum Kind {
READY, POWERDOWN, DEVICE_TRAY_MOVED, BALLOON_CHANGE, SHUTDOWN,
SPICE_CONNECTED, SPICE_DISCONNECTED
SPICE_CONNECTED, SPICE_INITIALIZED, SPICE_DISCONNECTED
}
private final Kind kind;
@ -64,13 +64,14 @@ public class MonitorEvent extends Event<Void> {
return Optional
.of(new ShutdownEvent(kind, response.get(EVENT_DATA)));
case SPICE_CONNECTED:
return Optional
.of(new SpiceConnectedEvent(kind,
response.get(EVENT_DATA)));
return Optional.of(new SpiceConnectedEvent(kind,
response.get(EVENT_DATA)));
case SPICE_INITIALIZED:
return Optional.of(new SpiceInitializedEvent(kind,
response.get(EVENT_DATA)));
case SPICE_DISCONNECTED:
return Optional
.of(new SpiceDisconnectedEvent(kind,
response.get(EVENT_DATA)));
return Optional.of(new SpiceDisconnectedEvent(kind,
response.get(EVENT_DATA)));
default:
return Optional
.of(new MonitorEvent(kind, response.get(EVENT_DATA)));

View file

@ -43,4 +43,13 @@ public class SpiceEvent extends MonitorEvent {
public String clientHost() {
return data().get("client").get("host").asText();
}
/**
* Returns the client's port.
*
* @return the client's port number
*/
public long clientPort() {
return data().get("client").get("port").asLong();
}
}

View file

@ -0,0 +1,46 @@
/*
* VM-Operator
* Copyright (C) 2023 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.runner.qemu.events;
import com.fasterxml.jackson.databind.JsonNode;
/**
* Signals a connection from a client.
*/
public class SpiceInitializedEvent extends SpiceEvent {
/**
* Instantiates a new spice connected event.
*
* @param kind the kind
* @param data the data
*/
public SpiceInitializedEvent(Kind kind, JsonNode data) {
super(kind, data);
}
/**
* Returns the channel type.
*
* @return the channel type
*/
public int channelType() {
return data().get("client").get("channel-type").asInt();
}
}