Use one event processor for "main loop" events.

This commit is contained in:
Michael Lipp 2023-08-07 11:58:18 +02:00
parent 3e43d02931
commit 37e8f90cd1
2 changed files with 15 additions and 9 deletions

View file

@ -42,6 +42,7 @@ import org.jgrapes.core.Channel;
import org.jgrapes.core.Component; import org.jgrapes.core.Component;
import org.jgrapes.core.Components; import org.jgrapes.core.Components;
import org.jgrapes.core.Components.Timer; import org.jgrapes.core.Components.Timer;
import org.jgrapes.core.EventPipeline;
import org.jgrapes.core.annotation.Handler; import org.jgrapes.core.annotation.Handler;
import org.jgrapes.core.events.Start; import org.jgrapes.core.events.Start;
import org.jgrapes.core.events.Stop; import org.jgrapes.core.events.Stop;
@ -69,7 +70,7 @@ public class QemuMonitor extends Component {
private static ObjectMapper mapper = new ObjectMapper(); private static ObjectMapper mapper = new ObjectMapper();
@SuppressWarnings("PMD.UseConcurrentHashMap") private EventPipeline rep;
private Path socketPath; private Path socketPath;
private int powerdownTimeout; private int powerdownTimeout;
private SocketIOChannel monitorChannel; private SocketIOChannel monitorChannel;
@ -113,6 +114,7 @@ public class QemuMonitor extends Component {
*/ */
@Handler @Handler
public void onStart(Start event) throws IOException { public void onStart(Start event) throws IOException {
rep = event.associated(EventPipeline.class).get();
if (socketPath == null) { if (socketPath == null) {
return; return;
} }
@ -175,7 +177,7 @@ public class QemuMonitor extends Component {
@Handler @Handler
public void onConnectError(ConnectError event, SocketIOChannel channel) { public void onConnectError(ConnectError event, SocketIOChannel channel) {
event.event().associated(QemuMonitor.class).ifPresent(qm -> { event.event().associated(QemuMonitor.class).ifPresent(qm -> {
fire(new Stop()); rep.fire(new Stop());
}); });
} }
@ -201,7 +203,7 @@ public class QemuMonitor extends Component {
try { try {
var response = mapper.readValue(line, ObjectNode.class); var response = mapper.readValue(line, ObjectNode.class);
if (response.has("QMP")) { if (response.has("QMP")) {
fire(new MonitorReady()); rep.fire(new MonitorReady());
return; return;
} }
if (response.has("return") || response.has("error")) { if (response.has("return") || response.has("error")) {
@ -209,11 +211,11 @@ public class QemuMonitor extends Component {
logger.fine( logger.fine(
() -> String.format("(Previous \"monitor(in)\" is result " () -> String.format("(Previous \"monitor(in)\" is result "
+ "from executing %s)", executed)); + "from executing %s)", executed));
fire(MonitorResult.from(executed, response)); rep.fire(MonitorResult.from(executed, response));
return; return;
} }
if (response.has("event")) { if (response.has("event")) {
MonitorEvent.from(response).ifPresent(this::fire); MonitorEvent.from(response).ifPresent(rep::fire);
} }
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
throw new IOException(e); throw new IOException(e);

View file

@ -60,6 +60,7 @@ import org.jdrupes.vmoperator.util.ExtendedObjectWrapper;
import org.jdrupes.vmoperator.util.FsdUtils; import org.jdrupes.vmoperator.util.FsdUtils;
import org.jgrapes.core.Component; import org.jgrapes.core.Component;
import org.jgrapes.core.Components; import org.jgrapes.core.Components;
import org.jgrapes.core.EventPipeline;
import org.jgrapes.core.TypedIdKey; import org.jgrapes.core.TypedIdKey;
import org.jgrapes.core.annotation.Handler; import org.jgrapes.core.annotation.Handler;
import org.jgrapes.core.events.Start; import org.jgrapes.core.events.Start;
@ -167,6 +168,7 @@ public class Runner extends Component {
private static final String SAVED_TEMPLATE = "VM.ftl.yaml"; private static final String SAVED_TEMPLATE = "VM.ftl.yaml";
private static final String FW_VARS = "fw-vars.fd"; private static final String FW_VARS = "fw-vars.fd";
private EventPipeline rep;
private final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory()); private final ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory());
private final JsonNode defaults; private final JsonNode defaults;
@SuppressWarnings("PMD.UseConcurrentHashMap") @SuppressWarnings("PMD.UseConcurrentHashMap")
@ -236,7 +238,7 @@ public class Runner extends Component {
} }
logger.fine(() -> "Updating configuration"); logger.fine(() -> "Updating configuration");
var newConf = yamlMapper.convertValue(c, Configuration.class); var newConf = yamlMapper.convertValue(c, Configuration.class);
fire(new ConfigureQemu(newConf, state)); rep.fire(new ConfigureQemu(newConf, state));
}); });
} }
@ -338,8 +340,10 @@ public class Runner extends Component {
* *
* @param event the event * @param event the event
*/ */
@Handler @Handler(priority = 100)
public void onStart(Start event) { public void onStart(Start event) {
rep = newEventPipeline();
event.setAssociated(EventPipeline.class, rep);
try { try {
if (config == null) { if (config == null) {
// Missing configuration, fail // Missing configuration, fail
@ -499,11 +503,11 @@ public class Runner extends Component {
logger.severe(() -> "Process " + procDef.name logger.severe(() -> "Process " + procDef.name
+ " has exited with value " + event.exitValue() + " has exited with value " + event.exitValue()
+ " during startup."); + " during startup.");
fire(new Stop()); rep.fire(new Stop());
return; return;
} }
if (procDef.equals(qemuDefinition) && state == State.RUNNING) { if (procDef.equals(qemuDefinition) && state == State.RUNNING) {
fire(new Stop()); rep.fire(new Stop());
} }
logger.info(() -> "Process " + procDef.name logger.info(() -> "Process " + procDef.name
+ " has exited with value " + event.exitValue()); + " has exited with value " + event.exitValue());