Provide "Running" condition in status.

This commit is contained in:
Michael Lipp 2023-09-14 14:14:07 +02:00
parent f50a4af46c
commit 74e05ce023
5 changed files with 353 additions and 15 deletions

View file

@ -9,6 +9,8 @@ spec:
- name: v1
served: true
storage: true
subresources:
status: {}
schema:
openAPIV3Schema:
type: object
@ -1372,10 +1374,28 @@ spec:
- vm
status:
type: object
default: {}
properties:
cpus:
description: >-
Number of CPUs currently in use.
type: integer
default: 0
ram:
description: >-
Amount of memory in use.
type: string
default: "0"
conditions:
description: >-
List of component conditions observed
default:
- type: Running
status: "False"
observedGeneration: 1
lastTransitionTime: "1970-01-01T00:00:00Z"
reason: Creation
message: "Creation of CR"
type: array
items:
type: object
@ -1383,6 +1403,12 @@ spec:
Information about the condition of a component. See
https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#spec-and-status
and https://github.com/kubernetes/apimachinery/blob/release-1.23/pkg/apis/meta/v1/types.go#L1432-L1492
required:
- type
- status
- lastTransitionTime
- reason
- message
properties:
type:
type: string
@ -1428,12 +1454,6 @@ spec:
Message is a human readable message indicating
details about the transition. This may be an empty string.
default: ""
required:
- type
- status
- lastTransitionTime
- reason
- message
# either Namespaced or Cluster
scope: Namespaced
names:

View file

@ -18,6 +18,8 @@ dependencies {
implementation 'commons-cli:commons-cli:1.5.0'
implementation 'org.freemarker:freemarker:[2.3.32,2.4)'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:[2.15.1,3]'
runtimeOnly 'org.slf4j:slf4j-jdk14:[2.0.7,3)'
}
application {

View file

@ -66,6 +66,7 @@ import org.jgrapes.core.annotation.Handler;
import org.jgrapes.core.events.Start;
import org.jgrapes.core.events.Started;
import org.jgrapes.core.events.Stop;
import org.jgrapes.core.internal.EventProcessor;
import org.jgrapes.io.NioDispatcher;
import org.jgrapes.io.events.Input;
import org.jgrapes.io.events.ProcessExited;
@ -90,6 +91,13 @@ import org.jgrapes.util.events.WatchFile;
*
* ![Runner state diagram](RunnerStates.svg)
*
* The {@link Runner} associates an {@link EventProcessor} with the
* {@link Start} event. This "runner event processor" must be used
* for all events related to the application level function. Components
* that handle events from other sources (and thus event processors)
* must fire any resulting events on the runner event processor in order
* to maintain synchronization.
*
* @startuml RunnerStates.svg
* [*] --> Initializing
* Initializing -> Initializing: InitialConfiguration/configure Runner
@ -149,8 +157,13 @@ import org.jgrapes.util.events.WatchFile;
* error --> terminate
* StartingProcess --> terminate: ProcessExited
*
* state Stopped {
* state stopped <<entryPoint>>
*
* terminated --> [*]
* stopped --> [*]
* }
*
* terminated --> stopped
*
* @enduml
*
@ -211,6 +224,7 @@ public class Runner extends Component {
attach(new ProcessManager(channel()));
attach(new SocketConnector(channel()));
attach(qemuMonitor = new QemuMonitor(channel()));
attach(new StatusUpdater(channel()));
// Configuration store with file in /etc/opt (default)
File config = new File(cmdLine.getOptionValue('c',
@ -352,6 +366,11 @@ public class Runner extends Component {
return;
}
// Make sure to use thread specific client
// https://github.com/kubernetes-client/java/issues/100
io.kubernetes.client.openapi.Configuration.setDefaultApiClient(null);
// Prepare specific event pipeline to avoid concurrency.
rep = newEventPipeline();
event.setAssociated(EventPipeline.class, rep);
try {
@ -387,7 +406,8 @@ public class Runner extends Component {
@Handler
public void onStarted(Started event) {
state = State.STARTING;
fire(new RunnerStateChange(state));
rep.fire(new RunnerStateChange(state, "RunnerStarted",
"Runner has been started"));
// Start first process
if (config.vm.useTpm && swtpmDefinition != null) {
startProcess(swtpmDefinition);
@ -476,7 +496,7 @@ public class Runner extends Component {
*/
@Handler
public void onMonitorReady(MonitorReady event) {
fire(new RunnerConfigurationUpdate(config, state));
rep.fire(new RunnerConfigurationUpdate(config, state));
}
/**
@ -489,7 +509,8 @@ public class Runner extends Component {
if (state == State.STARTING) {
fire(new MonitorCommand(new QmpCont()));
state = State.RUNNING;
fire(new RunnerStateChange(state));
rep.fire(new RunnerStateChange(state, "VmStarted",
"Qemu has been configured and is continuing"));
}
}
@ -524,9 +545,22 @@ public class Runner extends Component {
* @param event the event
*/
@Handler(priority = 10_000)
public void onStop(Stop event) {
public void onStopFirst(Stop event) {
state = State.TERMINATING;
fire(new RunnerStateChange(state));
rep.fire(new RunnerStateChange(state, "VmTerminating",
"The VM is being shut down"));
}
/**
* On stop.
*
* @param event the event
*/
@Handler(priority = -10_000)
public void onStopLast(Stop event) {
state = State.STOPPED;
rep.fire(new RunnerStateChange(state, "VmStopped",
"The VM has been shut down"));
}
private void shutdown() {

View file

@ -0,0 +1,244 @@
/*
* VM-Operator
* Copyright (C) 2023 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.runner.qemu;
import com.google.gson.JsonObject;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.ApiextensionsV1Api;
import io.kubernetes.client.openapi.models.V1CustomResourceDefinitionVersion;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi;
import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesObject;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.logging.Level;
import org.jdrupes.vmoperator.runner.qemu.events.RunnerConfigurationUpdate;
import org.jdrupes.vmoperator.runner.qemu.events.RunnerStateChange;
import org.jdrupes.vmoperator.runner.qemu.events.RunnerStateChange.State;
import static org.jdrupes.vmoperator.util.Constants.VM_OP_CRD_NAME;
import static org.jdrupes.vmoperator.util.Constants.VM_OP_GROUP;
import org.jgrapes.core.Channel;
import org.jgrapes.core.Component;
import org.jgrapes.core.annotation.Handler;
import org.jgrapes.core.events.Start;
import org.jgrapes.util.events.ConfigurationUpdate;
import org.jgrapes.util.events.InitialConfiguration;
/**
* Updates the CR status.
*/
public class StatusUpdater extends Component {
private static final Set<State> RUNNING_STATES
= Set.of(State.RUNNING, State.TERMINATING);
private String namespace;
private String vmName;
private DynamicKubernetesApi vmCrApi;
private long observedGeneration;
/**
* Instantiates a new status updater.
*
* @param componentChannel the component channel
*/
public StatusUpdater(Channel componentChannel) {
super(componentChannel);
}
/**
* On configuration update.
*
* @param event the event
*/
@Handler
@SuppressWarnings("unchecked")
public void onConfigurationUpdate(ConfigurationUpdate event) {
event.structured("/Runner").ifPresent(c -> {
if (event instanceof InitialConfiguration) {
namespace = (String) c.get("namespace");
updateNamespace();
vmName = Optional.ofNullable((Map<String, String>) c.get("vm"))
.map(vm -> vm.get("name")).orElse(null);
}
});
}
private void updateNamespace() {
if (namespace == null) {
var path = Path
.of("/var/run/secrets/kubernetes.io/serviceaccount/namespace");
if (Files.isReadable(path)) {
try {
namespace = Files.lines(path).findFirst().orElse(null);
} catch (IOException e) {
logger.log(Level.WARNING, e,
() -> "Cannot read namespace.");
}
}
}
if (namespace == null) {
logger.warning(() -> "Namespace is unknown, some functions"
+ " won't be available.");
}
}
/**
* Handle the start event.
*
* @param event the event
*/
@Handler
@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis",
"PMD.AvoidInstantiatingObjectsInLoops" })
public void onStart(Start event) {
try {
var client = Config.defaultClient();
var extsApi = new ApiextensionsV1Api(client);
var crds = extsApi.listCustomResourceDefinition(null, null, null,
"metadata.name=" + VM_OP_CRD_NAME, null, null, null,
null, null, null);
if (crds.getItems().isEmpty()) {
logger.warning(() -> "CRD is unknown, status will not"
+ " be updated.");
return;
}
var crd = crds.getItems().get(0);
if (crd.getSpec().getVersions().stream()
.filter(v -> v.getSubresources() == null).findAny()
.isPresent()) {
logger.warning(() -> "You are using an old version of the CRD,"
+ " status will not be updated.");
return;
}
var crdPlural = crd.getSpec().getNames().getPlural();
var vmOpApiVersions = crd.getSpec().getVersions().stream()
.map(V1CustomResourceDefinitionVersion::getName).toList();
for (var apiVer : vmOpApiVersions) {
var api = new DynamicKubernetesApi(VM_OP_GROUP, apiVer,
crdPlural, client);
var res = api.get(namespace, vmName);
if (res.isSuccess()) {
vmCrApi = api;
observedGeneration
= res.getObject().getMetadata().getGeneration();
break;
}
}
if (vmCrApi == null) {
logger.warning(() -> "VM's CR is unknown, status will not"
+ " be updated.");
}
} catch (IOException | ApiException e) {
logger.log(Level.WARNING, e, () -> "Cannot access kubernetes: "
+ e.getMessage());
}
}
@SuppressWarnings("PMD.AvoidDuplicateLiterals")
private JsonObject currentStatus(DynamicKubernetesObject vmCr) {
return vmCr.getRaw().getAsJsonObject("status").deepCopy();
}
/**
* On runner state changed.
*
* @param event the event
8 * @throws ApiException the api exception
*/
@Handler
public void onRunnerStateChanged(RunnerStateChange event)
throws ApiException {
if (vmCrApi == null) {
return;
}
var vmCr = vmCrApi.get(namespace, vmName)
.throwsApiException().getObject();
vmCrApi.updateStatus(vmCr, from -> {
JsonObject status = currentStatus(from);
status.getAsJsonArray("conditions").asList().stream()
.map(cond -> (JsonObject) cond)
.forEach(cond -> {
if ("Running".equals(cond.get("type").getAsString())) {
updateRunningCondition(event, from, cond);
}
});
return status;
});
}
private void updateRunningCondition(RunnerStateChange event,
DynamicKubernetesObject from, JsonObject cond) {
boolean reportedRunning
= "True".equals(cond.get("status").getAsString());
if (RUNNING_STATES.contains(event.state())
&& !reportedRunning) {
cond.addProperty("status", "True");
cond.addProperty("lastTransitionTime",
Instant.now().toString());
}
if (!RUNNING_STATES.contains(event.state())
&& reportedRunning) {
cond.addProperty("status", "False");
cond.addProperty("lastTransitionTime",
Instant.now().toString());
}
cond.addProperty("reason", event.reason());
cond.addProperty("message", event.message());
cond.addProperty("observedGeneration",
from.getMetadata().getGeneration());
}
/**
* On runner configuration update.
*
* @param event the event
* @throws ApiException
*/
@Handler
public void onRunnerConfigurationUpdate(RunnerConfigurationUpdate event)
throws ApiException {
if (vmCrApi == null) {
return;
}
// A change of the runner configuration is typically caused
// by a new version of the CR. So we observe the new CR.
var vmCr = vmCrApi.get(namespace, vmName).throwsApiException()
.getObject();
if (vmCr.getMetadata().getGeneration() == observedGeneration) {
return;
}
vmCrApi.updateStatus(vmCr, from -> {
JsonObject status = currentStatus(from);
status.getAsJsonArray("conditions").asList().stream()
.map(cond -> (JsonObject) cond)
.filter(
cond -> "Running".equals(cond.get("type").getAsString()))
.forEach(cond -> cond.addProperty("observedGeneration",
from.getMetadata().getGeneration()));
return status;
});
}
}

View file

@ -19,6 +19,7 @@
package org.jdrupes.vmoperator.runner.qemu.events;
import org.jgrapes.core.Channel;
import org.jgrapes.core.Components;
import org.jgrapes.core.Event;
/**
@ -30,19 +31,24 @@ public class RunnerStateChange extends Event<Void> {
* The state.
*/
public enum State {
INITIALIZING, STARTING, RUNNING, TERMINATING
INITIALIZING, STARTING, RUNNING, TERMINATING, STOPPED
}
private final State state;
private final String reason;
private final String message;
/**
* Instantiates a new runner state change.
*
* @param channels the channels
*/
public RunnerStateChange(State state, Channel... channels) {
public RunnerStateChange(State state, String reason, String message,
Channel... channels) {
super(channels);
this.state = state;
this.reason = reason;
this.message = message;
}
/**
@ -53,4 +59,36 @@ public class RunnerStateChange extends Event<Void> {
public State state() {
return state;
}
/**
* Gets the reason.
*
* @return the reason
*/
public String reason() {
return reason;
}
/**
* Gets the message.
*
* @return the message
*/
public String message() {
return message;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append(Components.objectName(this))
.append(" [").append(state).append(": ").append(reason);
if (channels() != null) {
builder.append(", channels=");
builder.append(Channel.toString(channels()));
}
builder.append(']');
return builder.toString();
}
}