feat: reimplement websocket to use hono

This commit is contained in:
Aarnav Tale 2025-03-24 10:52:29 -04:00
parent c066b3064d
commit 9a1051b9af
13 changed files with 337 additions and 351 deletions

View File

@ -16,7 +16,7 @@ export async function loader({
// We shouldn't session invalidate if Headscale is down // We shouldn't session invalidate if Headscale is down
if (healthy) { if (healthy) {
try { try {
await context.client.get('/api/v1/apikey', session.get('api_key')!); await context.client.get('v1/apikey', session.get('api_key')!);
} catch (error) { } catch (error) {
if (error instanceof ResponseError) { if (error instanceof ResponseError) {
log.debug('api', 'API Key validation failed %o', error); log.debug('api', 'API Key validation failed %o', error);

View File

@ -1,7 +1,7 @@
import type { ActionFunctionArgs } from 'react-router'; import type { ActionFunctionArgs } from 'react-router';
import type { LoadContext } from '~/server'; import type { LoadContext } from '~/server';
import log from '~/utils/log';
import { send } from '~/utils/res'; import { send } from '~/utils/res';
import log from '~server/utils/log';
// TODO: Turn this into the same thing as dns-actions like machine-actions!!! // TODO: Turn this into the same thing as dns-actions like machine-actions!!!
export async function menuAction({ export async function menuAction({
@ -24,16 +24,13 @@ export async function menuAction({
switch (method) { switch (method) {
case 'delete': { case 'delete': {
await context.client.delete( await context.client.delete(`v1/node/${id}`, session.get('api_key')!);
`/api/v1/node/${id}`,
session.get('api_key')!,
);
return { message: 'Machine removed' }; return { message: 'Machine removed' };
} }
case 'expire': { case 'expire': {
await context.client.post( await context.client.post(
`/api/v1/node/${id}/expire`, `v1/node/${id}/expire`,
session.get('api_key')!, session.get('api_key')!,
); );
return { message: 'Machine expired' }; return { message: 'Machine expired' };
@ -51,7 +48,7 @@ export async function menuAction({
const name = String(data.get('name')); const name = String(data.get('name'));
await context.client.post( await context.client.post(
`/api/v1/node/${id}/rename/${name}`, `v1/node/${id}/rename/${name}`,
session.get('api_key')!, session.get('api_key')!,
); );
return { message: 'Machine renamed' }; return { message: 'Machine renamed' };
@ -72,7 +69,7 @@ export async function menuAction({
const postfix = enabled ? 'enable' : 'disable'; const postfix = enabled ? 'enable' : 'disable';
await context.client.post( await context.client.post(
`/api/v1/routes/${route}/${postfix}`, `v1/routes/${route}/${postfix}`,
session.get('api_key')!, session.get('api_key')!,
); );
return { message: 'Route updated' }; return { message: 'Route updated' };
@ -95,7 +92,7 @@ export async function menuAction({
await Promise.all( await Promise.all(
routes.map(async (route) => { routes.map(async (route) => {
await context.client.post( await context.client.post(
`/api/v1/routes/${route}/${postfix}`, `v1/routes/${route}/${postfix}`,
session.get('api_key')!, session.get('api_key')!,
); );
}), }),
@ -156,7 +153,7 @@ export async function menuAction({
return { message: 'Tags updated' }; return { message: 'Tags updated' };
} catch (error) { } catch (error) {
log.debug('APIC', 'Failed to update tags: %s', error); log.debug('api', 'Failed to update tags: %s', error);
return send( return send(
{ message: 'Failed to update tags' }, { message: 'Failed to update tags' },
{ {

View File

@ -152,6 +152,8 @@ export default function MachineRow({
</Menu> </Menu>
</div> </div>
</td> </td>
{/* We pass undefined when agents are not enabled */}
{isAgent !== undefined ? (
<td className="py-2"> <td className="py-2">
{stats !== undefined ? ( {stats !== undefined ? (
<> <>
@ -164,6 +166,7 @@ export default function MachineRow({
<p className="text-sm opacity-50">Unknown</p> <p className="text-sm opacity-50">Unknown</p>
)} )}
</td> </td>
) : undefined}
<td className="py-2"> <td className="py-2">
<span <span
className={cn( className={cn(

View File

@ -8,7 +8,7 @@ import Tooltip from '~/components/Tooltip';
import type { LoadContext } from '~/server'; import type { LoadContext } from '~/server';
import type { Machine, Route, User } from '~/types'; import type { Machine, Route, User } from '~/types';
import cn from '~/utils/cn'; import cn from '~/utils/cn';
import useAgent from '~/utils/useAgent'; import useAgent from '~/utils/use-agent';
import { menuAction } from './action'; import { menuAction } from './action';
import MachineRow from './components/machine'; import MachineRow from './components/machine';
import NewMachine from './dialogs/new'; import NewMachine from './dialogs/new';
@ -44,9 +44,7 @@ export async function loader({
magic, magic,
server: context.config.headscale.url, server: context.config.headscale.url,
publicServer: context.config.headscale.public_url, publicServer: context.config.headscale.public_url,
// TODO: Fix this LOL agents: context.agents?.tailnetIDs(),
agents: ['test'],
// agents: [...(hp_getSingletonUnsafe('ws_agents') ?? []).keys()],
}; };
} }
@ -100,7 +98,10 @@ export default function Page() {
) : undefined} ) : undefined}
</div> </div>
</th> </th>
{/* We only want to show the version column if there are agents */}
{data.agents !== undefined ? (
<th className="uppercase text-xs font-bold pb-2">Version</th> <th className="uppercase text-xs font-bold pb-2">Version</th>
) : undefined}
<th className="uppercase text-xs font-bold pb-2">Last Seen</th> <th className="uppercase text-xs font-bold pb-2">Last Seen</th>
</tr> </tr>
</thead> </thead>
@ -120,7 +121,9 @@ export default function Page() {
users={data.users} users={data.users}
magic={data.magic} magic={data.magic}
stats={stats?.[machine.nodeKey]} stats={stats?.[machine.nodeKey]}
isAgent={data.agents.includes(machine.id)} // If we pass undefined, the column will not be rendered
// This is useful for when there are no agents configured
isAgent={data.agents?.includes(machine.id)}
/> />
))} ))}
</tbody> </tbody>

View File

@ -16,5 +16,6 @@ server
│ ├── config-loader.ts: Loads the Headscale configuration (if available). │ ├── config-loader.ts: Loads the Headscale configuration (if available).
│ ├── config-schema.ts: Defines the schema for the Headscale configuration. │ ├── config-schema.ts: Defines the schema for the Headscale configuration.
├── web/ ├── web/
│ ├── agent.ts: Handles setting up the agent WebSocket if needed.
│ ├── oidc.ts: Loads and validates an OIDC configuration (if available). │ ├── oidc.ts: Loads and validates an OIDC configuration (if available).
│ ├── sessions.ts: Initializes the session store and methods to manage it. │ ├── sessions.ts: Initializes the session store and methods to manage it.

View File

@ -1,4 +1,5 @@
import { constants, access, readFile, writeFile } from 'node:fs/promises'; import { constants, access, readFile, writeFile } from 'node:fs/promises';
import { setTimeout } from 'node:timers/promises';
import { type } from 'arktype'; import { type } from 'arktype';
import { Document, parseDocument } from 'yaml'; import { Document, parseDocument } from 'yaml';
import log from '~/utils/log'; import log from '~/utils/log';
@ -17,6 +18,7 @@ class HeadscaleConfig {
private document?: Document; private document?: Document;
private access: 'rw' | 'ro' | 'no'; private access: 'rw' | 'ro' | 'no';
private path?: string; private path?: string;
private writeLock = false;
constructor( constructor(
access: 'rw' | 'ro' | 'no', access: 'rw' | 'ro' | 'no',
@ -100,8 +102,17 @@ class HeadscaleConfig {
'Writing updated Headscale configuration to %s', 'Writing updated Headscale configuration to %s',
this.path, this.path,
); );
// We need to lock the writeLock so that we don't try to write
// to the file while we're already writing to it
while (this.writeLock) {
await setTimeout(100);
}
this.writeLock = true;
await writeFile(this.path, this.document.toString(), 'utf8'); await writeFile(this.path, this.document.toString(), 'utf8');
this.config = config; this.config = config;
this.writeLock = false;
return; return;
} }
} }

View File

@ -1,10 +1,13 @@
import { versions } from 'node:process'; import { versions } from 'node:process';
import type { UpgradeWebSocket } from 'hono/ws';
import { createHonoServer } from 'react-router-hono-server/node'; import { createHonoServer } from 'react-router-hono-server/node';
import type { WebSocket } from 'ws';
import log from '~/utils/log'; import log from '~/utils/log';
import { configureConfig, configureLogger, envVariables } from './config/env'; import { configureConfig, configureLogger, envVariables } from './config/env';
import { loadConfig } from './config/loader'; import { loadConfig } from './config/loader';
import { createApiClient } from './headscale/api-client'; import { createApiClient } from './headscale/api-client';
import { loadHeadscaleConfig } from './headscale/config-loader'; import { loadHeadscaleConfig } from './headscale/config-loader';
import { loadAgentSocket } from './web/agent';
import { createOidcClient } from './web/oidc'; import { createOidcClient } from './web/oidc';
import { createSessionStorage } from './web/sessions'; import { createSessionStorage } from './web/sessions';
@ -49,6 +52,12 @@ const appLoadContext = {
config.headscale.tls_cert_path, config.headscale.tls_cert_path,
), ),
agents: await loadAgentSocket(
config.server.agent.authkey,
config.server.agent.cache_path,
config.server.agent.ttl,
),
oidc: config.oidc ? await createOidcClient(config.oidc) : undefined, oidc: config.oidc ? await createOidcClient(config.oidc) : undefined,
}; };
@ -65,7 +74,14 @@ export default await createHonoServer({
return appLoadContext; return appLoadContext;
}, },
configure(server) {}, configure(server, { upgradeWebSocket }) {
if (appLoadContext.agents !== undefined) {
// We need this since we cannot pass the WSEvents context
(upgradeWebSocket as UpgradeWebSocket<WebSocket>)(
appLoadContext.agents.configureSocket,
);
}
},
listeningListener(info) { listeningListener(info) {
console.log(`Server is listening on http://localhost:${info.port}`); console.log(`Server is listening on http://localhost:${info.port}`);
}, },

273
app/server/web/agent.ts Normal file
View File

@ -0,0 +1,273 @@
import { createHash } from 'node:crypto';
import { open, readFile, writeFile } from 'node:fs/promises';
import { setTimeout } from 'node:timers/promises';
import { getConnInfo } from '@hono/node-server/conninfo';
import { type } from 'arktype';
import type { Context } from 'hono';
import type { WSContext, WSEvents } from 'hono/ws';
import { WebSocket } from 'ws';
import { HostInfo } from '~/types';
import log from '~/utils/log';
export async function loadAgentSocket(
authkey: string,
path: string,
ttl: number,
) {
if (authkey.length === 0) {
return;
}
try {
const handle = await open(path, 'w');
log.info('agent', 'Using agent cache file at %s', path);
await handle.close();
} catch (error) {
log.info('agent', 'Agent cache file not accessible at %s', path);
log.debug('agent', 'Error details: %s', error);
return;
}
const cache = new TimedCache<HostInfo>(ttl, path);
return new AgentManager(cache, authkey);
}
class AgentManager {
private cache: TimedCache<HostInfo>;
private agents: Map<string, WSContext>;
private timers: Map<string, NodeJS.Timeout>;
private authkey: string;
constructor(cache: TimedCache<HostInfo>, authkey: string) {
this.cache = cache;
this.authkey = authkey;
this.agents = new Map();
this.timers = new Map();
}
tailnetIDs() {
return Array.from(this.agents.keys());
}
// Request data from all connected agents
// This does not return anything, but caches the data which then needs to be
// queried by the caller separately.
requestData(nodeList: string[]) {
const NodeIDs = [...new Set(nodeList)];
NodeIDs.map((node) => {
log.debug('agent', 'Requesting agent data for %s', node);
});
for (const agent of this.agents.values()) {
agent.send(JSON.stringify({ NodeIDs }));
}
}
// Since we are using Node, Hono is built on 'ws' WebSocket types.
configureSocket(c: Context): WSEvents<WebSocket> {
return {
onOpen: (_, ws) => {
const id = c.req.header('x-headplane-tailnet-id');
if (!id) {
log.warn(
'agent',
'Rejecting an agent WebSocket connection without a tailnet ID',
);
ws.close(1008, 'ERR_INVALID_TAILNET_ID');
return;
}
const auth = c.req.header('authorization');
if (auth !== `Bearer ${this.authkey}`) {
log.warn('agent', 'Rejecting an unauthorized WebSocket connection');
const info = getConnInfo(c);
if (info.remote.address) {
log.warn('agent', 'Agent source IP: %s', info.remote.address);
}
ws.close(1008, 'ERR_UNAUTHORIZED');
return;
}
const pinger = setInterval(() => {
if (ws.readyState !== 1) {
clearInterval(pinger);
return;
}
ws.raw?.ping();
}, 30000);
this.agents.set(id, ws);
this.timers.set(id, pinger);
},
onClose: () => {
const id = c.req.header('x-headplane-tailnet-id');
if (!id) {
return;
}
clearInterval(this.timers.get(id));
this.agents.delete(id);
},
onError: (event, ws) => {
const id = c.req.header('x-headplane-tailnet-id');
if (!id) {
return;
}
clearInterval(this.timers.get(id));
if (event instanceof ErrorEvent) {
log.error('agent', 'WebSocket error: %s', event.message);
}
log.debug('agent', 'Closing agent WebSocket connection');
ws.close(1011, 'ERR_INTERNAL_ERROR');
},
// This is where we receive the data from the agent
// Requests are made in the AgentManager.requestData function
onMessage: (event, ws) => {
const id = c.req.header('x-headplane-tailnet-id');
if (!id) {
return;
}
const data = JSON.parse(event.data.toString());
log.debug('agent', 'Received agent data from %s', id);
for (const [node, info] of Object.entries<HostInfo>(data)) {
this.cache.set(node, info);
log.debug('agent', 'Cached HostInfo for %s', node);
}
},
};
}
}
const diskSchema = type({
key: 'string',
value: 'unknown',
expires: 'number?',
}).array();
// A persistent HashMap with a TTL for each key
class TimedCache<V> {
private _cache = new Map<string, V>();
private _timings = new Map<string, number>();
// Default TTL is 1 minute
private defaultTTL: number;
private filePath: string;
private writeLock = false;
// Last flush ID is essentially a hash of the flush contents
// Prevents unnecessary flushing if nothing has changed
private lastFlushId = '';
constructor(defaultTTL: number, filePath: string) {
this.defaultTTL = defaultTTL;
this.filePath = filePath;
// Load the cache from disk and then queue flushes every 10 seconds
this.load().then(() => {
setInterval(() => this.flush(), 10000);
});
}
set(key: string, value: V, ttl: number = this.defaultTTL) {
this._cache.set(key, value);
this._timings.set(key, Date.now() + ttl);
}
get(key: string) {
const value = this._cache.get(key);
if (!value) {
return;
}
const expires = this._timings.get(key);
if (!expires || expires < Date.now()) {
this._cache.delete(key);
this._timings.delete(key);
return;
}
return value;
}
// Map into a Record without any TTLs
toJSON() {
const result: Record<string, V> = {};
for (const [key, value] of this._cache.entries()) {
result[key] = value;
}
return result;
}
// WARNING: This function expects that this.filePath is NOT ENOENT
private async load() {
const data = await readFile(this.filePath, 'utf-8');
const cache = () => {
try {
return JSON.parse(data);
} catch (e) {
return undefined;
}
};
const diskData = cache();
if (diskData === undefined) {
log.error('agent', 'Failed to load cache at %s', this.filePath);
return;
}
const cacheData = diskSchema(diskData);
if (cacheData instanceof type.errors) {
log.debug('agent', 'Failed to load cache at %s', this.filePath);
log.debug('agent', 'Error details: %s', cacheData.toString());
// Skip loading the cache (it should be overwritten soon)
return;
}
for (const { key, value, expires } of diskData) {
this._cache.set(key, value);
this._timings.set(key, expires);
}
log.info('agent', 'Loaded cache from %s', this.filePath);
}
private async flush() {
const data = Array.from(this._cache.entries()).map(([key, value]) => {
return { key, value, expires: this._timings.get(key) };
});
if (data.length === 0) {
return;
}
// Calculate the hash of the data
const dumpData = JSON.stringify(data);
const sha = createHash('sha256').update(dumpData).digest('hex');
if (sha === this.lastFlushId) {
return;
}
// We need to lock the writeLock so that we don't try to write
// to the file while we're already writing to it
while (this.writeLock) {
await setTimeout(100);
}
this.writeLock = true;
await writeFile(this.filePath, dumpData, 'utf-8');
log.debug('agent', 'Flushed cache to %s', this.filePath);
this.lastFlushId = sha;
this.writeLock = false;
}
}

View File

@ -1,6 +1,6 @@
import log from '~/utils/log';
import { hp_getConfig } from '~server/context/global'; import { hp_getConfig } from '~server/context/global';
import { HeadplaneConfig } from '~server/context/parser'; import type { HeadplaneConfig } from '~server/context/parser';
import log from '~server/utils/log';
import { Integration } from './abstract'; import { Integration } from './abstract';
// import dockerIntegration from './docker'; // import dockerIntegration from './docker';
// import kubernetesIntegration from './kubernetes'; // import kubernetesIntegration from './kubernetes';

View File

@ -1,32 +0,0 @@
class Mutex {
private locked = false;
private queue: (() => void)[] = [];
constructor(locked: boolean) {
this.locked = locked;
}
acquire() {
return new Promise<void>((resolve) => {
if (!this.locked) {
this.locked = true;
resolve();
} else {
this.queue.push(resolve);
}
});
}
release() {
if (this.queue.length > 0) {
const next = this.queue.shift();
next?.();
} else {
this.locked = false;
}
}
}
export default function mutex(locked = false) {
return new Mutex(locked);
}

View File

@ -1,11 +1,6 @@
import { readFile } from 'node:fs/promises';
import * as client from 'openid-client'; import * as client from 'openid-client';
import { Configuration } from 'openid-client'; import { Configuration } from 'openid-client';
import { hp_getSingleton, hp_setSingleton } from '~server/context/global'; import log from '~/utils/log';
import { HeadplaneConfig } from '~server/context/parser';
import log from '~server/utils/log';
type OidcConfig = NonNullable<HeadplaneConfig['oidc']>;
// We try our best to infer the callback URI of our Headplane instance // We try our best to infer the callback URI of our Headplane instance
// By default it is always /<base_path>/oidc/callback // By default it is always /<base_path>/oidc/callback
@ -35,72 +30,6 @@ export function getRedirectUri(req: Request) {
return url.href; return url.href;
} }
let oidcSecret: string | undefined = undefined;
export function getOidcSecret() {
return oidcSecret;
}
async function resolveClientSecret(oidc: OidcConfig) {
if (!oidc.client_secret && !oidc.client_secret_path) {
return;
}
if (oidc.client_secret_path) {
// We need to interpolate environment variables into the path
// Path formatting can be like ${ENV_NAME}/path/to/secret
let path = oidc.client_secret_path;
const matches = path.match(/\${(.*?)}/g);
if (matches) {
for (const match of matches) {
const env = match.slice(2, -1);
const value = process.env[env];
if (!value) {
log.error('CFGX', 'Environment variable %s is not set', env);
return;
}
log.debug('CFGX', 'Interpolating %s with %s', match, value);
path = path.replace(match, value);
}
}
try {
log.debug('CFGX', 'Reading client secret from %s', path);
const secret = await readFile(path, 'utf-8');
if (secret.trim().length === 0) {
log.error('CFGX', 'Empty OIDC client secret');
return;
}
oidcSecret = secret;
} catch (error) {
log.error('CFGX', 'Failed to read client secret from %s', path);
log.error('CFGX', 'Error: %s', error);
log.debug('CFGX', 'Error details: %o', error);
}
}
if (oidc.client_secret) {
oidcSecret = oidc.client_secret;
}
}
function clientAuthMethod(
method: string,
): (secret: string) => client.ClientAuth {
switch (method) {
case 'client_secret_post':
return client.ClientSecretPost;
case 'client_secret_basic':
return client.ClientSecretBasic;
case 'client_secret_jwt':
return client.ClientSecretJwt;
default:
throw new Error('Invalid client authentication method');
}
}
export async function beginAuthFlow( export async function beginAuthFlow(
config: Configuration, config: Configuration,
redirect_uri: string, redirect_uri: string,
@ -243,60 +172,3 @@ export function formatError(error: unknown) {
}, },
}; };
} }
export async function testOidc(oidc: OidcConfig) {
await resolveClientSecret(oidc);
if (!oidcSecret) {
log.debug(
'OIDC',
'Cannot validate OIDC configuration without a client secret',
);
return false;
}
log.debug('OIDC', 'Discovering OIDC configuration from %s', oidc.issuer);
const secret = await resolveClientSecret(oidc);
const config = await client.discovery(
new URL(oidc.issuer),
oidc.client_id,
oidc.client_secret,
clientAuthMethod(oidc.token_endpoint_auth_method)(oidcSecret),
);
const meta = config.serverMetadata();
if (meta.authorization_endpoint === undefined) {
return false;
}
log.debug('OIDC', 'Authorization endpoint: %s', meta.authorization_endpoint);
log.debug('OIDC', 'Token endpoint: %s', meta.token_endpoint);
if (meta.response_types_supported) {
if (meta.response_types_supported.includes('code') === false) {
log.error('OIDC', 'OIDC server does not support code flow');
return false;
}
} else {
log.warn('OIDC', 'OIDC server does not advertise response_types_supported');
}
if (meta.token_endpoint_auth_methods_supported) {
if (
meta.token_endpoint_auth_methods_supported.includes(
oidc.token_endpoint_auth_method,
) === false
) {
log.error(
'OIDC',
'OIDC server does not support %s',
oidc.token_endpoint_auth_method,
);
return false;
}
}
log.debug('OIDC', 'OIDC configuration is valid');
hp_setSingleton('oidc_client', config);
return true;
}

View File

@ -1,158 +0,0 @@
// Handlers for the Local Agent on the server side
import { readFile, writeFile } from 'node:fs/promises';
import { setTimeout as pSetTimeout } from 'node:timers/promises';
import type { LoaderFunctionArgs } from 'react-router';
import { WebSocket } from 'ws';
import type { HostInfo } from '~/types';
import log from '~server/utils/log';
// Essentially a HashMap which invalidates entries after a certain time.
// It also is capable of syncing as a compressed file to disk.
class TimedCache<K, V> {
private _cache = new Map<K, V>();
private _timeCache = new Map<K, number>();
private defaultTTL: number;
private filepath: string;
private writeLock = false;
constructor(defaultTTL: number, filepath: string) {
this.defaultTTL = defaultTTL;
this.filepath = filepath;
}
async set(key: K, value: V, ttl: number = this.defaultTTL) {
this._cache.set(key, value);
this._timeCache.set(key, Date.now() + ttl);
await this.syncToFile();
}
async get(key: K) {
const entry = this._cache.get(key);
if (!entry) {
return;
}
const expires = this._timeCache.get(key);
if (!expires || expires < Date.now()) {
this._cache.delete(key);
this._timeCache.delete(key);
await this.syncToFile();
return;
}
return entry;
}
async loadFromFile() {
try {
const data = await readFile(this.filepath, 'utf-8');
const cache = JSON.parse(data);
for (const { key, value, expires } of cache) {
this._cache.set(key, value);
this._timeCache.set(key, expires);
}
} catch (e) {
if (e.code === 'ENOENT') {
log.debug('CACH', 'Cache file not found, creating new cache');
return;
}
log.error('CACH', 'Failed to load cache from file', e);
}
}
private async syncToFile() {
while (this.writeLock) {
await pSetTimeout(100);
}
this.writeLock = true;
const data = Array.from(this._cache.entries()).map(([key, value]) => {
return { key, value, expires: this._timeCache.get(key) };
});
await writeFile(this.filepath, JSON.stringify(data), 'utf-8');
await this.loadFromFile();
this.writeLock = false;
}
}
let cache: TimedCache<string, HostInfo> | undefined;
export async function initAgentCache(defaultTTL: number, filepath: string) {
cache = new TimedCache(defaultTTL, filepath);
await pSetTimeout(500);
await cache.loadFromFile();
}
let agentSocket: WebSocket | undefined;
// TODO: Actually type this?
export function initAgentSocket(context: LoaderFunctionArgs['context']) {
if (!context.ws) {
return;
}
const client = context.ws.clients.values().next().value;
agentSocket = client;
}
// Check the cache and then attempt the websocket query
// If we aren't connected to an agent, then debug log and return the cache
export async function queryAgent(nodes: string[]) {
return;
if (!cache) {
log.error('CACH', 'Cache not initialized');
return;
}
const cached: Record<string, HostInfo> = {};
await Promise.all(
nodes.map(async (node) => {
const cachedData = await cache?.get(node);
if (cachedData) {
cached[node] = cachedData;
}
}),
);
const uncached = nodes.filter((node) => !cached[node]);
// No need to query the agent if we have all the data cached
if (uncached.length === 0) {
return cached;
}
// We don't have an agent socket, so we can't query the agent
// and we just return the cached values available instead
if (!agentSocket) {
return cached;
}
agentSocket?.send(JSON.stringify({ NodeIDs: uncached }));
// biome-ignore lint: bruh
const returnData = await new Promise<Record<string, HostInfo> | void>(
(resolve, reject) => {
const timeout = setTimeout(() => {
agentSocket?.removeAllListeners('message');
resolve();
}, 3000);
agentSocket?.on('message', async (message: string) => {
const data = JSON.parse(message.toString());
if (Object.keys(data).length === 0) {
resolve();
}
agentSocket?.removeAllListeners('message');
resolve(data);
});
},
);
// if (returnData) {
// for await (const [node, info] of Object.entries(returnData)) {
// await cache?.set(node, info);
// }
// }
return returnData ? { ...cached, ...returnData } : cached;
}