Add launcher coordinator IPC and startup reservation

Introduce a launcher coordinator to reserve startup ownership and prevent duplicate host launches. Adds a NamedPipe-based IPC server/client (LauncherCoordinatorIpcServer/Client), coordinator messages/models, and PublicShellStatus/activation types for richer shell reporting. Enhances StartupAttemptRecord and StartupAttemptRegistry to track coordinator pid/pipe, heartbeat, reserved-before-host-start, and public IPC status, plus new reservation/heartbeat APIs and takeover logic. Wire coordinator into App and LauncherFlowCoordinator to attach secondary launchers, publish coordinator status, probe existing hosts, and include more detailed launch result details. Also adds unit tests and docs describing coordinator and startup visuals behavior.
This commit is contained in:
lincube
2026-04-23 09:45:05 +08:00
parent 33591a0a63
commit 927dc8d1fd
19 changed files with 1637 additions and 23 deletions

View File

@@ -9,6 +9,7 @@ namespace LanMountainDesktop.Launcher.Services;
internal sealed class StartupAttemptRegistry
{
private static readonly TimeSpan CoordinatorHeartbeatTimeout = TimeSpan.FromSeconds(10);
private static readonly JsonSerializerOptions SerializerOptions = new()
{
WriteIndented = true
@@ -45,12 +46,14 @@ internal sealed class StartupAttemptRegistry
{
AttemptId = Guid.NewGuid().ToString("N"),
HostPid = hostPid,
CoordinatorPid = Environment.ProcessId,
LaunchSource = launchSource,
SuccessPolicy = successPolicy,
LastObservedStage = stage,
LastObservedMessage = message ?? string.Empty,
StartedAtUtc = DateTimeOffset.UtcNow,
UpdatedAtUtc = DateTimeOffset.UtcNow,
HeartbeatAtUtc = DateTimeOffset.UtcNow,
State = StartupAttemptState.Pending
};
@@ -63,6 +66,148 @@ internal sealed class StartupAttemptRegistry
return Clone(record);
}
public bool TryReserveCoordinator(
string launchSource,
string successPolicy,
string coordinatorPipeName,
out StartupAttemptRecord reservedAttempt,
out StartupAttemptRecord? activeCoordinatorAttempt)
{
StartupAttemptRecord? reserved = null;
StartupAttemptRecord? active = null;
ExecuteWithLock(() =>
{
var existing = LoadUnsafe();
if (existing is not null && IsCoordinatorLive(existing))
{
active = Clone(existing);
return;
}
if (existing is not null && IsRecoverableCoordinatorAttempt(existing))
{
existing.CoordinatorPid = Environment.ProcessId;
existing.CoordinatorPipeName = coordinatorPipeName;
existing.HeartbeatAtUtc = DateTimeOffset.UtcNow;
existing.UpdatedAtUtc = DateTimeOffset.UtcNow;
if (existing.HostPid <= 0)
{
existing.ReservedBeforeHostStart = true;
}
if (existing.State == StartupAttemptState.DetachedWaiting)
{
existing.State = StartupAttemptState.SoftTimeout;
}
_ownedAttemptId = existing.AttemptId;
SaveUnsafe(existing);
reserved = Clone(existing);
return;
}
var now = DateTimeOffset.UtcNow;
var record = new StartupAttemptRecord
{
AttemptId = Guid.NewGuid().ToString("N"),
HostPid = 0,
CoordinatorPid = Environment.ProcessId,
CoordinatorPipeName = coordinatorPipeName,
LaunchSource = launchSource,
SuccessPolicy = successPolicy,
LastObservedStage = StartupStage.Initializing,
LastObservedMessage = "Launcher coordinator reserved startup ownership.",
StartedAtUtc = now,
UpdatedAtUtc = now,
HeartbeatAtUtc = now,
ReservedBeforeHostStart = true,
State = StartupAttemptState.Pending
};
_ownedAttemptId = record.AttemptId;
SaveUnsafe(record);
reserved = Clone(record);
});
reservedAttempt = reserved ?? new StartupAttemptRecord();
activeCoordinatorAttempt = active;
return reserved is not null;
}
public StartupAttemptRecord? GetOwnedAttempt()
{
StartupAttemptRecord? result = null;
if (string.IsNullOrWhiteSpace(_ownedAttemptId))
{
return null;
}
ExecuteWithLock(() =>
{
var record = LoadUnsafe();
if (record is not null && string.Equals(record.AttemptId, _ownedAttemptId, StringComparison.Ordinal))
{
result = Clone(record);
}
});
return result;
}
public StartupAttemptRecord? TryGetLiveCoordinatorAttempt()
{
StartupAttemptRecord? result = null;
ExecuteWithLock(() =>
{
var record = LoadUnsafe();
if (record is not null && IsCoordinatorLive(record))
{
result = Clone(record);
}
});
return result;
}
public StartupAttemptRecord? TryGetLatestAttempt()
{
StartupAttemptRecord? result = null;
ExecuteWithLock(() =>
{
var record = LoadUnsafe();
if (record is not null)
{
result = Clone(record);
}
});
return result;
}
public StartupAttemptRecord AssignOwnedHostProcess(
int hostPid,
StartupStage stage,
string? message)
{
StartupAttemptRecord? result = null;
UpdateOwned(record =>
{
record.HostPid = hostPid;
record.LastObservedStage = stage;
record.LastObservedMessage = message ?? record.LastObservedMessage;
record.ReservedBeforeHostStart = false;
result = Clone(record);
});
return result ?? StartOwnedAttempt(
hostPid,
string.Empty,
string.Empty,
stage,
message);
}
public bool AdoptAttempt(string attemptId)
{
if (string.IsNullOrWhiteSpace(attemptId))
@@ -120,7 +265,11 @@ internal sealed class StartupAttemptRegistry
public void MarkOwnedIpcConnected()
{
UpdateOwned(record => record.IpcConnected = true);
UpdateOwned(record =>
{
record.IpcConnected = true;
record.PublicIpcConnected = true;
});
}
public void UpdateOwnedStage(StartupStage stage, string? message, bool ipcConnected)
@@ -132,10 +281,25 @@ internal sealed class StartupAttemptRegistry
if (ipcConnected)
{
record.IpcConnected = true;
record.PublicIpcConnected = true;
}
});
}
public void UpdateOwnedCoordinatorHeartbeat(LauncherCoordinatorStatus status)
{
UpdateOwned(record =>
{
record.CoordinatorPid = Environment.ProcessId;
record.HeartbeatAtUtc = DateTimeOffset.UtcNow;
record.LastObservedStage = status.LastObservedStage;
record.LastObservedMessage = status.LastObservedMessage;
record.IpcConnected = status.PublicIpcConnected;
record.PublicIpcConnected = status.PublicIpcConnected;
record.ShellStatus = status.ShellStatus?.ShellState ?? status.State;
});
}
public void MarkOwnedSoftTimeout(string? message)
{
UpdateOwned(record =>
@@ -267,6 +431,38 @@ internal sealed class StartupAttemptRegistry
return TryGetLiveProcess(record.HostPid, out _);
}
private static bool IsRecoverableCoordinatorAttempt(StartupAttemptRecord record)
{
if (record.State is not (StartupAttemptState.Pending or StartupAttemptState.SoftTimeout or StartupAttemptState.DetachedWaiting))
{
return false;
}
if (record.HostPid <= 0)
{
return true;
}
return TryGetLiveProcess(record.HostPid, out _);
}
private static bool IsCoordinatorLive(StartupAttemptRecord record)
{
if (record.State is not (StartupAttemptState.Pending or StartupAttemptState.SoftTimeout or StartupAttemptState.DetachedWaiting))
{
return false;
}
if (record.CoordinatorPid <= 0 ||
string.IsNullOrWhiteSpace(record.CoordinatorPipeName) ||
DateTimeOffset.UtcNow - record.HeartbeatAtUtc > CoordinatorHeartbeatTimeout)
{
return false;
}
return TryGetLiveProcess(record.CoordinatorPid, out _);
}
private static bool TryGetLiveProcess(int processId, out Process? process)
{
process = null;
@@ -300,13 +496,19 @@ internal sealed class StartupAttemptRegistry
{
AttemptId = record.AttemptId,
HostPid = record.HostPid,
CoordinatorPid = record.CoordinatorPid,
CoordinatorPipeName = record.CoordinatorPipeName,
StartedAtUtc = record.StartedAtUtc,
UpdatedAtUtc = record.UpdatedAtUtc,
HeartbeatAtUtc = record.HeartbeatAtUtc,
LaunchSource = record.LaunchSource,
SuccessPolicy = record.SuccessPolicy,
LastObservedStage = record.LastObservedStage,
LastObservedMessage = record.LastObservedMessage,
IpcConnected = record.IpcConnected,
PublicIpcConnected = record.PublicIpcConnected,
ShellStatus = record.ShellStatus,
ReservedBeforeHostStart = record.ReservedBeforeHostStart,
State = record.State
};
}