Refactor QEMU socket connection handling and start vmop agent.

This commit is contained in:
Michael Lipp 2025-02-24 11:58:13 +01:00
parent 3012da3e87
commit e3b5f5a04d
10 changed files with 451 additions and 319 deletions

View file

@ -14,7 +14,7 @@ spec:
# repository: ghcr.io # repository: ghcr.io
# path: mnlipp/org.jdrupes.vmoperator.runner.qemu-alpine # path: mnlipp/org.jdrupes.vmoperator.runner.qemu-alpine
# version: "3.0.0" # version: "3.0.0"
source: registry.mnl.de/org/jdrupes/vm-operator/org.jdrupes.vmoperator.runner.qemu-arch:testing source: registry.mnl.de/org/jdrupes/vm-operator/org.jdrupes.vmoperator.runner.qemu-arch:feature-auto-login
pullPolicy: Always pullPolicy: Always
permissions: permissions:

View file

@ -0,0 +1,86 @@
/*
* VM-Operator
* Copyright (C) 2025 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.runner.qemu;
import java.io.IOException;
import java.nio.file.Path;
import org.jdrupes.vmoperator.runner.qemu.events.VserportChangeEvent;
import org.jgrapes.core.Channel;
import org.jgrapes.core.annotation.Handler;
import org.jgrapes.util.events.ConfigurationUpdate;
/**
* A component that handles the communication with an agent
* running in the VM.
*
* If the log level for this class is set to fine, the messages
* exchanged on the socket are logged.
*/
public abstract class AgentConnector extends QemuConnector {
protected String channelId;
/**
* Instantiates a new agent connector.
*
* @param componentChannel the component channel
* @throws IOException Signals that an I/O exception has occurred.
*/
public AgentConnector(Channel componentChannel) throws IOException {
super(componentChannel);
}
/**
* As the initial configuration of this component depends on the
* configuration of the {@link Runner}, it doesn't have a handler
* for the {@link ConfigurationUpdate} event. The values are
* forwarded from the {@link Runner} instead.
*
* @param channelId the channel id
* @param socketPath the socket path
*/
/* default */ void configure(String channelId, Path socketPath) {
super.configure(socketPath);
this.channelId = channelId;
logger.fine(() -> getClass().getSimpleName() + " configured with"
+ " channelId=" + channelId);
}
/**
* When the virtual serial port with the configured channel id has
* been opened call {@link #agentConnected()}.
*
* @param event the event
*/
@Handler
public void onVserportChanged(VserportChangeEvent event) {
if (event.id().equals(channelId) && event.isOpen()) {
agentConnected();
}
}
/**
* Called when the agent in the VM opens the connection. The
* default implementation does nothing.
*/
@SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract")
protected void agentConnected() {
// Default is to do nothing.
}
}

View file

@ -39,7 +39,7 @@ import org.jdrupes.vmoperator.util.FsdUtils;
/** /**
* The configuration information from the configuration file. * The configuration information from the configuration file.
*/ */
@SuppressWarnings("PMD.ExcessivePublicCount") @SuppressWarnings({ "PMD.ExcessivePublicCount", "PMD.TooManyFields" })
public class Configuration implements Dto { public class Configuration implements Dto {
private static final String CI_INSTANCE_ID = "instance-id"; private static final String CI_INSTANCE_ID = "instance-id";
@ -67,9 +67,6 @@ public class Configuration implements Dto {
/** The monitor socket. */ /** The monitor socket. */
public Path monitorSocket; public Path monitorSocket;
/** The guest agent socket socket. */
public Path guestAgentSocket;
/** The firmware rom. */ /** The firmware rom. */
public Path firmwareRom; public Path firmwareRom;
@ -344,7 +341,6 @@ public class Configuration implements Dto {
runtimeDir.toFile().mkdir(); runtimeDir.toFile().mkdir();
swtpmSocket = runtimeDir.resolve("swtpm-sock"); swtpmSocket = runtimeDir.resolve("swtpm-sock");
monitorSocket = runtimeDir.resolve("monitor.sock"); monitorSocket = runtimeDir.resolve("monitor.sock");
guestAgentSocket = runtimeDir.resolve("org.qemu.guest_agent.0");
} }
if (!Files.isDirectory(runtimeDir) || !Files.isWritable(runtimeDir)) { if (!Files.isDirectory(runtimeDir) || !Files.isWritable(runtimeDir)) {
logger.severe(() -> String.format( logger.severe(() -> String.format(

View file

@ -19,58 +19,26 @@
package org.jdrupes.vmoperator.runner.qemu; package org.jdrupes.vmoperator.runner.qemu;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException; import java.io.IOException;
import java.io.Writer;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.UnixDomainSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.logging.Level; import java.util.logging.Level;
import org.jdrupes.vmoperator.runner.qemu.commands.QmpCommand; import org.jdrupes.vmoperator.runner.qemu.commands.QmpCommand;
import org.jdrupes.vmoperator.runner.qemu.commands.QmpGuestGetOsinfo; import org.jdrupes.vmoperator.runner.qemu.commands.QmpGuestGetOsinfo;
import org.jdrupes.vmoperator.runner.qemu.events.GuestAgentCommand; import org.jdrupes.vmoperator.runner.qemu.events.GuestAgentCommand;
import org.jdrupes.vmoperator.runner.qemu.events.OsinfoEvent; import org.jdrupes.vmoperator.runner.qemu.events.OsinfoEvent;
import org.jdrupes.vmoperator.runner.qemu.events.VserportChangeEvent;
import org.jgrapes.core.Channel; import org.jgrapes.core.Channel;
import org.jgrapes.core.Component;
import org.jgrapes.core.EventPipeline;
import org.jgrapes.core.annotation.Handler; import org.jgrapes.core.annotation.Handler;
import org.jgrapes.core.events.Start;
import org.jgrapes.core.events.Stop;
import org.jgrapes.io.events.Closed;
import org.jgrapes.io.events.ConnectError;
import org.jgrapes.io.events.Input;
import org.jgrapes.io.events.OpenSocketConnection;
import org.jgrapes.io.util.ByteBufferWriter;
import org.jgrapes.io.util.LineCollector;
import org.jgrapes.net.SocketIOChannel;
import org.jgrapes.net.events.ClientConnected;
import org.jgrapes.util.events.ConfigurationUpdate;
/** /**
* A component that handles the communication over the guest agent * A component that handles the communication with the guest agent.
* socket.
* *
* If the log level for this class is set to fine, the messages * If the log level for this class is set to fine, the messages
* exchanged on the monitor socket are logged. * exchanged on the monitor socket are logged.
*/ */
@SuppressWarnings("PMD.DataflowAnomalyAnalysis") public class GuestAgentClient extends AgentConnector {
public class GuestAgentClient extends Component {
private static ObjectMapper mapper = new ObjectMapper();
private EventPipeline rep;
private Path socketPath;
private List<Map<String, String>> guestAgentCmds;
private String guestAgentCmd;
private SocketIOChannel gaChannel;
private final Queue<QmpCommand> executing = new LinkedList<>(); private final Queue<QmpCommand> executing = new LinkedList<>();
/** /**
@ -79,135 +47,36 @@ public class GuestAgentClient extends Component {
* @param componentChannel the component channel * @param componentChannel the component channel
* @throws IOException Signals that an I/O exception has occurred. * @throws IOException Signals that an I/O exception has occurred.
*/ */
@SuppressWarnings({ "PMD.AssignmentToNonFinalStatic",
"PMD.ConstructorCallsOverridableMethod" })
public GuestAgentClient(Channel componentChannel) throws IOException { public GuestAgentClient(Channel componentChannel) throws IOException {
super(componentChannel); super(componentChannel);
} }
/** /**
* As the initial configuration of this component depends on the * When the agent has connected, request the OS information.
* configuration of the {@link Runner}, it doesn't have a handler
* for the {@link ConfigurationUpdate} event. The values are
* forwarded from the {@link Runner} instead.
*
* @param socketPath the socket path
* @param guestAgentCmds the guest agent cmds
*/ */
@SuppressWarnings("PMD.EmptyCatchBlock") @Override
/* default */ void configure(Path socketPath, ArrayNode guestAgentCmds) { protected void agentConnected() {
this.socketPath = socketPath; fire(new GuestAgentCommand(new QmpGuestGetOsinfo()));
try {
this.guestAgentCmds = mapper.convertValue(guestAgentCmds,
mapper.constructType(getClass()
.getDeclaredField("guestAgentCmds").getGenericType()));
} catch (IllegalArgumentException | NoSuchFieldException
| SecurityException e) {
// Cannot happen
}
} }
/** /**
* Handle the start event. * Process agent input.
* *
* @param event the event * @param line the line
* @throws IOException Signals that an I/O exception has occurred. * @throws IOException Signals that an I/O exception has occurred.
*/ */
@Handler @Override
public void onStart(Start event) throws IOException { protected void processInput(String line) throws IOException {
rep = event.associated(EventPipeline.class).get();
if (socketPath == null) {
return;
}
Files.deleteIfExists(socketPath);
}
/**
* When the virtual serial port "channel0" has been opened,
* establish the connection by opening the socket.
*
* @param event the event
*/
@Handler
public void onVserportChanged(VserportChangeEvent event) {
if ("channel0".equals(event.id()) && event.isOpen()) {
fire(new OpenSocketConnection(
UnixDomainSocketAddress.of(socketPath))
.setAssociated(GuestAgentClient.class, this));
}
}
/**
* Check if this is from opening the monitor socket and if true,
* save the socket in the context and associate the channel with
* the context. Then send the initial message to the socket.
*
* @param event the event
* @param channel the channel
*/
@SuppressWarnings("resource")
@Handler
public void onClientConnected(ClientConnected event,
SocketIOChannel channel) {
event.openEvent().associated(GuestAgentClient.class).ifPresent(qm -> {
gaChannel = channel;
channel.setAssociated(GuestAgentClient.class, this);
channel.setAssociated(Writer.class, new ByteBufferWriter(
channel).nativeCharset());
channel.setAssociated(LineCollector.class,
new LineCollector()
.consumer(line -> {
try {
processGuestAgentInput(line);
} catch (IOException e) {
throw new UndeclaredThrowableException(e);
}
}));
fire(new GuestAgentCommand(new QmpGuestGetOsinfo()));
});
}
/**
* Called when a connection attempt fails.
*
* @param event the event
* @param channel the channel
*/
@Handler
public void onConnectError(ConnectError event, SocketIOChannel channel) {
event.event().associated(GuestAgentClient.class).ifPresent(qm -> {
rep.fire(new Stop());
});
}
/**
* Handle data from qemu monitor connection.
*
* @param event the event
* @param channel the channel
*/
@Handler
public void onInput(Input<?> event, SocketIOChannel channel) {
if (channel.associated(GuestAgentClient.class).isEmpty()) {
return;
}
channel.associated(LineCollector.class).ifPresent(collector -> {
collector.feed(event);
});
}
private void processGuestAgentInput(String line)
throws IOException {
logger.fine(() -> "guest agent(in): " + line); logger.fine(() -> "guest agent(in): " + line);
try { try {
var response = mapper.readValue(line, ObjectNode.class); var response = mapper.readValue(line, ObjectNode.class);
if (response.has("return") || response.has("error")) { if (response.has("return") || response.has("error")) {
QmpCommand executed = executing.poll(); QmpCommand executed = executing.poll();
logger.fine( logger.fine(() -> String.format("(Previous \"guest agent(in)\""
() -> String.format("(Previous \"guest agent(in)\" is " + " is result from executing %s)", executed));
+ "result from executing %s)", executed));
if (executed instanceof QmpGuestGetOsinfo) { if (executed instanceof QmpGuestGetOsinfo) {
processOsInfo(response); var osInfo = new OsinfoEvent(response.get("return"));
rep().fire(osInfo);
} }
} }
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
@ -215,48 +84,17 @@ public class GuestAgentClient extends Component {
} }
} }
private void processOsInfo(ObjectNode response) {
var osInfo = new OsinfoEvent(response.get("return"));
var osId = osInfo.osinfo().get("id").asText();
for (var cmdDef : guestAgentCmds) {
if (osId.equals(cmdDef.get("osId"))
|| "*".equals(cmdDef.get("osId"))) {
guestAgentCmd = cmdDef.get("executable");
break;
}
}
if (guestAgentCmd == null) {
logger.warning(() -> "No guest agent command for OS " + osId);
} else {
logger.fine(() -> "Guest agent command for OS " + osId
+ " is " + guestAgentCmd);
}
rep.fire(osInfo);
}
/**
* On closed.
*
* @param event the event
*/
@Handler
@SuppressWarnings({ "PMD.AvoidSynchronizedStatement",
"PMD.AvoidDuplicateLiterals" })
public void onClosed(Closed<?> event, SocketIOChannel channel) {
channel.associated(QemuMonitor.class).ifPresent(qm -> {
gaChannel = null;
});
}
/** /**
* On guest agent command. * On guest agent command.
* *
* @param event the event * @param event the event
*/ */
@Handler @Handler
@SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition", @SuppressWarnings("PMD.AvoidSynchronizedStatement")
"PMD.AvoidSynchronizedStatement" })
public void onGuestAgentCommand(GuestAgentCommand event) { public void onGuestAgentCommand(GuestAgentCommand event) {
if (qemuChannel() == null) {
return;
}
var command = event.command(); var command = event.command();
logger.fine(() -> "guest agent(out): " + command.toString()); logger.fine(() -> "guest agent(out): " + command.toString());
String asText; String asText;
@ -268,7 +106,7 @@ public class GuestAgentClient extends Component {
return; return;
} }
synchronized (executing) { synchronized (executing) {
gaChannel.associated(Writer.class).ifPresent(writer -> { writer().ifPresent(writer -> {
try { try {
executing.add(command); executing.add(command);
writer.append(asText).append('\n').flush(); writer.append(asText).append('\n').flush();

View file

@ -0,0 +1,234 @@
/*
* VM-Operator
* Copyright (C) 2025 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.runner.qemu;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.Writer;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.UnixDomainSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import org.jgrapes.core.Channel;
import org.jgrapes.core.Component;
import org.jgrapes.core.EventPipeline;
import org.jgrapes.core.annotation.Handler;
import org.jgrapes.core.events.Start;
import org.jgrapes.core.events.Stop;
import org.jgrapes.io.events.Closed;
import org.jgrapes.io.events.ConnectError;
import org.jgrapes.io.events.Input;
import org.jgrapes.io.events.OpenSocketConnection;
import org.jgrapes.io.util.ByteBufferWriter;
import org.jgrapes.io.util.LineCollector;
import org.jgrapes.net.SocketIOChannel;
import org.jgrapes.net.events.ClientConnected;
import org.jgrapes.util.events.ConfigurationUpdate;
import org.jgrapes.util.events.FileChanged;
import org.jgrapes.util.events.WatchFile;
/**
* A component that handles the communication with QEMU over a socket.
*
* If the log level for this class is set to fine, the messages
* exchanged on the socket are logged.
*/
public abstract class QemuConnector extends Component {
@SuppressWarnings("PMD.FieldNamingConventions")
protected static final ObjectMapper mapper = new ObjectMapper();
private EventPipeline rep;
private Path socketPath;
private SocketIOChannel qemuChannel;
/**
* Instantiates a new QEMU connector.
*
* @param componentChannel the component channel
* @throws IOException Signals that an I/O exception has occurred.
*/
public QemuConnector(Channel componentChannel) throws IOException {
super(componentChannel);
}
/**
* As the initial configuration of this component depends on the
* configuration of the {@link Runner}, it doesn't have a handler
* for the {@link ConfigurationUpdate} event. The values are
* forwarded from the {@link Runner} instead.
*
* @param socketPath the socket path
*/
/* default */ void configure(Path socketPath) {
this.socketPath = socketPath;
logger.fine(() -> getClass().getSimpleName()
+ " configured with socketPath=" + socketPath);
}
/**
* Note the runner's event processor and delete the socket.
*
* @param event the event
* @throws IOException Signals that an I/O exception has occurred.
*/
@Handler
public void onStart(Start event) throws IOException {
rep = event.associated(EventPipeline.class).get();
if (socketPath == null) {
return;
}
Files.deleteIfExists(socketPath);
fire(new WatchFile(socketPath));
}
/**
* Return the runner's event pipeline.
*
* @return the event pipeline
*/
protected EventPipeline rep() {
return rep;
}
/**
* Watch for the creation of the swtpm socket and start the
* qemu process if it has been created.
*
* @param event the event
*/
@Handler
public void onFileChanged(FileChanged event) {
if (event.change() == FileChanged.Kind.CREATED
&& event.path().equals(socketPath)) {
// qemu running, open socket
fire(new OpenSocketConnection(
UnixDomainSocketAddress.of(socketPath))
.setAssociated(getClass(), this));
}
}
/**
* Check if this is from opening the agent socket and if true,
* save the socket in the context and associate the channel with
* the context.
*
* @param event the event
* @param channel the channel
*/
@SuppressWarnings("resource")
@Handler
public void onClientConnected(ClientConnected event,
SocketIOChannel channel) {
event.openEvent().associated(getClass()).ifPresent(qm -> {
qemuChannel = channel;
channel.setAssociated(getClass(), this);
channel.setAssociated(Writer.class, new ByteBufferWriter(
channel).nativeCharset());
channel.setAssociated(LineCollector.class,
new LineCollector()
.consumer(line -> {
try {
processInput(line);
} catch (IOException e) {
throw new UndeclaredThrowableException(e);
}
}));
socketConnected();
});
}
/**
* Return the QEMU channel if the connection has been established.
*
* @return the socket IO channel
*/
protected Optional<SocketIOChannel> qemuChannel() {
return Optional.ofNullable(qemuChannel);
}
/**
* Return the {@link Writer} for the connection if the connection
* has been established.
*
* @return the optional
*/
protected Optional<Writer> writer() {
return qemuChannel().flatMap(c -> c.associated(Writer.class));
}
/**
* Called when the connector has been connected to the socket.
*/
@SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract")
protected void socketConnected() {
// Default is to do nothing.
}
/**
* Called when a connection attempt fails.
*
* @param event the event
* @param channel the channel
*/
@Handler
public void onConnectError(ConnectError event, SocketIOChannel channel) {
event.event().associated(getClass()).ifPresent(qm -> {
rep.fire(new Stop());
});
}
/**
* Handle data from the socket connection.
*
* @param event the event
* @param channel the channel
*/
@Handler
public void onInput(Input<?> event, SocketIOChannel channel) {
if (channel.associated(getClass()).isEmpty()) {
return;
}
channel.associated(LineCollector.class).ifPresent(collector -> {
collector.feed(event);
});
}
/**
* Process agent input.
*
* @param line the line
* @throws IOException Signals that an I/O exception has occurred.
*/
protected abstract void processInput(String line) throws IOException;
/**
* On closed.
*
* @param event the event
* @param channel the channel
*/
@Handler
public void onClosed(Closed<?> event, SocketIOChannel channel) {
channel.associated(getClass()).ifPresent(qm -> {
qemuChannel = null;
});
}
}

View file

@ -19,13 +19,8 @@
package org.jdrupes.vmoperator.runner.qemu; package org.jdrupes.vmoperator.runner.qemu;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException; import java.io.IOException;
import java.io.Writer;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.UnixDomainSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
@ -42,24 +37,13 @@ import org.jdrupes.vmoperator.runner.qemu.events.MonitorReady;
import org.jdrupes.vmoperator.runner.qemu.events.MonitorResult; import org.jdrupes.vmoperator.runner.qemu.events.MonitorResult;
import org.jdrupes.vmoperator.runner.qemu.events.PowerdownEvent; import org.jdrupes.vmoperator.runner.qemu.events.PowerdownEvent;
import org.jgrapes.core.Channel; import org.jgrapes.core.Channel;
import org.jgrapes.core.Component;
import org.jgrapes.core.Components; import org.jgrapes.core.Components;
import org.jgrapes.core.Components.Timer; import org.jgrapes.core.Components.Timer;
import org.jgrapes.core.EventPipeline;
import org.jgrapes.core.annotation.Handler; import org.jgrapes.core.annotation.Handler;
import org.jgrapes.core.events.Start;
import org.jgrapes.core.events.Stop; import org.jgrapes.core.events.Stop;
import org.jgrapes.io.events.Closed; import org.jgrapes.io.events.Closed;
import org.jgrapes.io.events.ConnectError;
import org.jgrapes.io.events.Input;
import org.jgrapes.io.events.OpenSocketConnection;
import org.jgrapes.io.util.ByteBufferWriter;
import org.jgrapes.io.util.LineCollector;
import org.jgrapes.net.SocketIOChannel; import org.jgrapes.net.SocketIOChannel;
import org.jgrapes.net.events.ClientConnected;
import org.jgrapes.util.events.ConfigurationUpdate; import org.jgrapes.util.events.ConfigurationUpdate;
import org.jgrapes.util.events.FileChanged;
import org.jgrapes.util.events.WatchFile;
/** /**
* A component that handles the communication over the Qemu monitor * A component that handles the communication over the Qemu monitor
@ -69,14 +53,9 @@ import org.jgrapes.util.events.WatchFile;
* exchanged on the monitor socket are logged. * exchanged on the monitor socket are logged.
*/ */
@SuppressWarnings("PMD.DataflowAnomalyAnalysis") @SuppressWarnings("PMD.DataflowAnomalyAnalysis")
public class QemuMonitor extends Component { public class QemuMonitor extends QemuConnector {
private static ObjectMapper mapper = new ObjectMapper();
private EventPipeline rep;
private Path socketPath;
private int powerdownTimeout; private int powerdownTimeout;
private SocketIOChannel monitorChannel;
private final Queue<QmpCommand> executing = new LinkedList<>(); private final Queue<QmpCommand> executing = new LinkedList<>();
private Instant powerdownStartedAt; private Instant powerdownStartedAt;
private Stop suspendedStop; private Stop suspendedStop;
@ -84,7 +63,7 @@ public class QemuMonitor extends Component {
private boolean powerdownConfirmed; private boolean powerdownConfirmed;
/** /**
* Instantiates a new qemu monitor. * Instantiates a new QEMU monitor.
* *
* @param componentChannel the component channel * @param componentChannel the component channel
* @param configDir the config dir * @param configDir the config dir
@ -111,109 +90,26 @@ public class QemuMonitor extends Component {
* @param powerdownTimeout * @param powerdownTimeout
*/ */
/* default */ void configure(Path socketPath, int powerdownTimeout) { /* default */ void configure(Path socketPath, int powerdownTimeout) {
this.socketPath = socketPath; super.configure(socketPath);
this.powerdownTimeout = powerdownTimeout; this.powerdownTimeout = powerdownTimeout;
} }
/** /**
* Handle the start event. * When the socket is connected, send the capabilities command.
*
* @param event the event
* @throws IOException Signals that an I/O exception has occurred.
*/ */
@Handler @Override
public void onStart(Start event) throws IOException { protected void socketConnected() {
rep = event.associated(EventPipeline.class).get();
if (socketPath == null) {
return;
}
Files.deleteIfExists(socketPath);
fire(new WatchFile(socketPath));
}
/**
* Watch for the creation of the swtpm socket and start the
* qemu process if it has been created.
*
* @param event the event
*/
@Handler
public void onFileChanged(FileChanged event) {
if (event.change() == FileChanged.Kind.CREATED
&& event.path().equals(socketPath)) {
// qemu running, open socket
fire(new OpenSocketConnection(
UnixDomainSocketAddress.of(socketPath))
.setAssociated(QemuMonitor.class, this));
}
}
/**
* Check if this is from opening the monitor socket and if true,
* save the socket in the context and associate the channel with
* the context. Then send the initial message to the socket.
*
* @param event the event
* @param channel the channel
*/
@SuppressWarnings("resource")
@Handler
public void onClientConnected(ClientConnected event,
SocketIOChannel channel) {
event.openEvent().associated(QemuMonitor.class).ifPresent(qm -> {
monitorChannel = channel;
channel.setAssociated(QemuMonitor.class, this);
channel.setAssociated(Writer.class, new ByteBufferWriter(
channel).nativeCharset());
channel.setAssociated(LineCollector.class,
new LineCollector()
.consumer(line -> {
try {
processMonitorInput(line);
} catch (IOException e) {
throw new UndeclaredThrowableException(e);
}
}));
fire(new MonitorCommand(new QmpCapabilities())); fire(new MonitorCommand(new QmpCapabilities()));
});
} }
/** @Override
* Called when a connection attempt fails. protected void processInput(String line)
*
* @param event the event
* @param channel the channel
*/
@Handler
public void onConnectError(ConnectError event, SocketIOChannel channel) {
event.event().associated(QemuMonitor.class).ifPresent(qm -> {
rep.fire(new Stop());
});
}
/**
* Handle data from qemu monitor connection.
*
* @param event the event
* @param channel the channel
*/
@Handler
public void onInput(Input<?> event, SocketIOChannel channel) {
if (channel.associated(QemuMonitor.class).isEmpty()) {
return;
}
channel.associated(LineCollector.class).ifPresent(collector -> {
collector.feed(event);
});
}
private void processMonitorInput(String line)
throws IOException { throws IOException {
logger.fine(() -> "monitor(in): " + line); logger.fine(() -> "monitor(in): " + line);
try { try {
var response = mapper.readValue(line, ObjectNode.class); var response = mapper.readValue(line, ObjectNode.class);
if (response.has("QMP")) { if (response.has("QMP")) {
rep.fire(new MonitorReady()); rep().fire(new MonitorReady());
return; return;
} }
if (response.has("return") || response.has("error")) { if (response.has("return") || response.has("error")) {
@ -221,11 +117,11 @@ public class QemuMonitor extends Component {
logger.fine( logger.fine(
() -> String.format("(Previous \"monitor(in)\" is result " () -> String.format("(Previous \"monitor(in)\" is result "
+ "from executing %s)", executed)); + "from executing %s)", executed));
rep.fire(MonitorResult.from(executed, response)); rep().fire(MonitorResult.from(executed, response));
return; return;
} }
if (response.has("event")) { if (response.has("event")) {
MonitorEvent.from(response).ifPresent(rep::fire); MonitorEvent.from(response).ifPresent(rep()::fire);
} }
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new IOException(e); throw new IOException(e);
@ -241,8 +137,8 @@ public class QemuMonitor extends Component {
@SuppressWarnings({ "PMD.AvoidSynchronizedStatement", @SuppressWarnings({ "PMD.AvoidSynchronizedStatement",
"PMD.AvoidDuplicateLiterals" }) "PMD.AvoidDuplicateLiterals" })
public void onClosed(Closed<?> event, SocketIOChannel channel) { public void onClosed(Closed<?> event, SocketIOChannel channel) {
super.onClosed(event, channel);
channel.associated(QemuMonitor.class).ifPresent(qm -> { channel.associated(QemuMonitor.class).ifPresent(qm -> {
monitorChannel = null;
synchronized (this) { synchronized (this) {
if (powerdownTimer != null) { if (powerdownTimer != null) {
powerdownTimer.cancel(); powerdownTimer.cancel();
@ -275,7 +171,7 @@ public class QemuMonitor extends Component {
return; return;
} }
synchronized (executing) { synchronized (executing) {
monitorChannel.associated(Writer.class).ifPresent(writer -> { writer().ifPresent(writer -> {
try { try {
executing.add(command); executing.add(command);
writer.append(asText).append('\n').flush(); writer.append(asText).append('\n').flush();
@ -295,7 +191,7 @@ public class QemuMonitor extends Component {
@Handler(priority = 100) @Handler(priority = 100)
@SuppressWarnings("PMD.AvoidSynchronizedStatement") @SuppressWarnings("PMD.AvoidSynchronizedStatement")
public void onStop(Stop event) { public void onStop(Stop event) {
if (monitorChannel != null) { if (qemuChannel() != null) {
// We have a connection to Qemu, attempt ACPI shutdown. // We have a connection to Qemu, attempt ACPI shutdown.
event.suspendHandling(); event.suspendHandling();
suspendedStop = event; suspendedStop = event;

View file

@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import freemarker.core.ParseException; import freemarker.core.ParseException;
@ -198,7 +197,6 @@ public class Runner extends Component {
private static final String QEMU = "qemu"; private static final String QEMU = "qemu";
private static final String SWTPM = "swtpm"; private static final String SWTPM = "swtpm";
private static final String CLOUD_INIT_IMG = "cloudInitImg"; private static final String CLOUD_INIT_IMG = "cloudInitImg";
private static final String GUEST_AGENT_CMDS = "guestAgentCmds";
private static final String TEMPLATE_DIR private static final String TEMPLATE_DIR
= "/opt/" + APP_NAME.replace("-", "") + "/templates"; = "/opt/" + APP_NAME.replace("-", "") + "/templates";
private static final String DEFAULT_TEMPLATE private static final String DEFAULT_TEMPLATE
@ -222,6 +220,7 @@ public class Runner extends Component {
private CommandDefinition qemuDefinition; private CommandDefinition qemuDefinition;
private final QemuMonitor qemuMonitor; private final QemuMonitor qemuMonitor;
private final GuestAgentClient guestAgentClient; private final GuestAgentClient guestAgentClient;
private final VmopAgentClient vmopAgentClient;
private Integer resetCounter; private Integer resetCounter;
private RunState state = RunState.INITIALIZING; private RunState state = RunState.INITIALIZING;
@ -280,6 +279,7 @@ public class Runner extends Component {
attach(new SocketConnector(channel())); attach(new SocketConnector(channel()));
attach(qemuMonitor = new QemuMonitor(channel(), configDir)); attach(qemuMonitor = new QemuMonitor(channel(), configDir));
attach(guestAgentClient = new GuestAgentClient(channel())); attach(guestAgentClient = new GuestAgentClient(channel()));
attach(vmopAgentClient = new VmopAgentClient(channel()));
attach(new StatusUpdater(channel())); attach(new StatusUpdater(channel()));
attach(new YamlConfigurationStore(channel(), configFile, false)); attach(new YamlConfigurationStore(channel(), configFile, false));
fire(new WatchFile(configFile.toPath())); fire(new WatchFile(configFile.toPath()));
@ -350,16 +350,12 @@ public class Runner extends Component {
.map(d -> new CommandDefinition(CLOUD_INIT_IMG, d)) .map(d -> new CommandDefinition(CLOUD_INIT_IMG, d))
.orElse(null); .orElse(null);
logger.finest(() -> cloudInitImgDefinition.toString()); logger.finest(() -> cloudInitImgDefinition.toString());
var guestAgentCmds = (ArrayNode) tplData.get(GUEST_AGENT_CMDS);
if (guestAgentCmds != null) {
logger.finest(
() -> "GuestAgentCmds: " + guestAgentCmds.toString());
}
// Forward some values to child components // Forward some values to child components
qemuMonitor.configure(config.monitorSocket, qemuMonitor.configure(config.monitorSocket,
config.vm.powerdownTimeout); config.vm.powerdownTimeout);
guestAgentClient.configure(config.guestAgentSocket, guestAgentCmds); configureAgentClient(guestAgentClient, "guest-agent-socket");
configureAgentClient(vmopAgentClient, "vmop-agent-socket");
} catch (IllegalArgumentException | IOException | TemplateException e) { } catch (IllegalArgumentException | IOException | TemplateException e) {
logger.log(Level.SEVERE, e, () -> "Invalid configuration: " logger.log(Level.SEVERE, e, () -> "Invalid configuration: "
+ e.getMessage()); + e.getMessage());
@ -484,6 +480,36 @@ public class Runner extends Component {
} }
} }
@SuppressWarnings("PMD.CognitiveComplexity")
private void configureAgentClient(AgentConnector client, String chardev) {
String id = null;
Path path = null;
for (var arg : qemuDefinition.command) {
if (arg.startsWith("virtserialport,")
&& arg.contains("chardev=" + chardev)) {
for (var prop : arg.split(",")) {
if (prop.startsWith("id=")) {
id = prop.substring(3);
}
}
}
if (arg.startsWith("socket,")
&& arg.contains("id=" + chardev)) {
for (var prop : arg.split(",")) {
if (prop.startsWith("path=")) {
path = Path.of(prop.substring(5));
}
}
}
}
if (id == null || path == null) {
logger.warning(() -> "Definition of chardev " + chardev
+ " missing in runner template.");
return;
}
client.configure(id, path);
}
/** /**
* Handle the started event. * Handle the started event.
* *

View file

@ -0,0 +1,48 @@
/*
* VM-Operator
* Copyright (C) 2025 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.runner.qemu;
import java.io.IOException;
import org.jgrapes.core.Channel;
/**
* A component that handles the communication over the vmop agent
* socket.
*
* If the log level for this class is set to fine, the messages
* exchanged on the socket are logged.
*/
public class VmopAgentClient extends AgentConnector {
/**
* Instantiates a new VM operator agent client.
*
* @param componentChannel the component channel
* @throws IOException Signals that an I/O exception has occurred.
*/
public VmopAgentClient(Channel componentChannel) throws IOException {
super(componentChannel);
}
@Override
protected void processInput(String line) throws IOException {
// TODO Auto-generated method stub
}
}

View file

@ -122,11 +122,16 @@
# Best explanation found: # Best explanation found:
# https://fedoraproject.org/wiki/Features/VirtioSerial # https://fedoraproject.org/wiki/Features/VirtioSerial
- [ "-device", "virtio-serial-pci,id=virtio-serial0" ] - [ "-device", "virtio-serial-pci,id=virtio-serial0" ]
# - Guest agent serial connection. MUST have id "channel0"! # - Guest agent serial connection.
- [ "-device", "virtserialport,id=channel0,name=org.qemu.guest_agent.0,\ - [ "-device", "virtserialport,id=channel0,name=org.qemu.guest_agent.0,\
chardev=guest-agent-socket" ] chardev=guest-agent-socket" ]
- [ "-chardev","socket,id=guest-agent-socket,\ - [ "-chardev","socket,id=guest-agent-socket,\
path=${ runtimeDir }/org.qemu.guest_agent.0,server=on,wait=off" ] path=${ runtimeDir }/org.qemu.guest_agent.0,server=on,wait=off" ]
# - VM operator agent serial connection.
- [ "-device", "virtserialport,id=channel1,name=org.jdrupes.vmop_agent.0,\
chardev=vmop-agent-socket" ]
- [ "-chardev","socket,id=vmop-agent-socket,\
path=${ runtimeDir }/org.jdrupes.vmop_agent.0,server=on,wait=off" ]
# * USB Hub and devices (more in SPICE configuration below) # * USB Hub and devices (more in SPICE configuration below)
# https://qemu-project.gitlab.io/qemu/system/devices/usb.html # https://qemu-project.gitlab.io/qemu/system/devices/usb.html
# https://github.com/qemu/qemu/blob/master/hw/usb/hcd-xhci.c # https://github.com/qemu/qemu/blob/master/hw/usb/hcd-xhci.c
@ -233,7 +238,3 @@
</#list> </#list>
</#if> </#if>
</#if> </#if>
"guestAgentCmds":
- "osId": "*"
"executable": "/usr/local/libexec/vm-operator-cmd"

View file

@ -26,6 +26,13 @@ layout: vm-operator
still accepted for backward compatibility until the next major version, still accepted for backward compatibility until the next major version,
but should be updated. but should be updated.
* The standard [template](./runner.html#stand-alone-configuration) used
to generate the QEMU command has been updated. Unless you have enabled
automatic updates of the template in the VM definition, you have to
update the template manually. If you're using your own template, you
have to add a virtual serial port (see the git history of the standard
template for the required addition).
## To version 3.4.0 ## To version 3.4.0
Starting with this version, the VM-Operator no longer uses a stateful set Starting with this version, the VM-Operator no longer uses a stateful set