diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 000000000..95a2e2945 --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1,57 @@ +GitHub Copilot Instructions — Impress + +These instructions define the project-wide baseline for work in this repository. +Branch- or subsystem-specific architecture belongs in `.github/instructions/*.instructions.md`. +When working in an area covered by one of those files, treat the matching +instruction file as the implementation-detail source of truth. + +PROJECT CONTEXT +This repository is metarhia/impress, a high-performance server runtime using: + +- worker_threads concurrency +- hot-reload and filesystem watch +- strict backward-compatibility requirements across the Metarhia ecosystem + +GENERAL RULES + +- Do not break existing public APIs or user-visible behavior without a clear reason. +- Preserve backward compatibility unless the change explicitly requires otherwise. +- Keep code, tests, and documentation in sync. +- Prefer minimal changes that preserve the current external behavior. +- Keep module boundaries intact; do not introduce unnecessary coupling. + +FILE OWNERSHIP + +- `.github/copilot-instructions.md` contains only repository-wide, branch-agnostic rules. +- `AGENTS.md` contains workflow/process rules for agents. +- `.github/instructions/*.instructions.md` contains branch- or subsystem-specific implementation details. + +BRANCH-AWARE INSTRUCTION SELECTION + +- Instruction files in `.github/instructions/` may include a `branch` field in their YAML frontmatter. +- A file with `branch: X` applies ONLY when the current Git branch is `X`. +- When multiple instruction files match an edited file's path, determine the current Git branch and follow ONLY the instruction file whose `branch` value matches. Ignore all non-matching branch-scoped files. +- Instruction files WITHOUT a `branch` field are general and apply to all branches. +- If no instruction file matches the current branch, do not invent constraints from non-matching files. + +WORKING RULES FOR INSTRUCTION FILES + +- Before changing files matched by an instruction file's `applyTo`, read that instruction file. +- Multiple instruction files may coexist for different modules, subsystems, or branches; keep each focused and scoped. +- If a branch-specific instruction file and the code diverge, update the instruction file to match the code in the current branch. + +SYNCING BRANCH-SPECIFIC INSTRUCTIONS + +- Branch-specific instruction files are stored on the `CopilotInstructions` branch and synced to feature branches as untracked files. +- They MUST NOT be committed on feature branches or included in pull requests. +- To sync, run: `.github/scripts/sync-instructions.ps1` (PowerShell) or the equivalent shell commands. +- First-time bootstrap (when the script is not yet present locally): + `git fetch origin CopilotInstructions; git checkout origin/CopilotInstructions -- .github/instructions/ .github/scripts/; git reset HEAD -- .github/instructions/ .github/scripts/` + +TESTING + +- Validate changes with the existing test suite when behavior may be affected. +- Update or add tests when behavior, integration, or configuration semantics change. + +FINAL RULE +This file defines only the baseline constraints for Impress. Do not place branch- or subsystem-specific architecture here; keep that in `.github/instructions/*.instructions.md`. diff --git a/VFS-SAB.md b/VFS-SAB.md new file mode 100644 index 000000000..8fb73f1c8 --- /dev/null +++ b/VFS-SAB.md @@ -0,0 +1,550 @@ +# SharedArrayBuffer Cache in Impress + +## Motivation + +Impress uses `worker_threads` to handle HTTP requests. Each worker serves static files (HTML, CSS, JS, images, etc.). Without shared memory, every worker keeps its own copy of every file � with 8 workers and 100 MiB of static assets, total consumption reaches 800 MiB. SharedArrayBuffer stores all files in shared memory accessible to all threads. + +## Benchmarks + +Baseline: no SAB (per-worker file copies). Delta columns show improvement relative to baseline � positive values are better even for latency and memory metrics. + +# Compare before-sab -> after-sab + +| File | Metric | Before | After | Delta | +| --- | --- | ---: | ---: | ---: | +| bench-64k.bin | RPS | 1767.14 | 1759.49 | -0.43% | +| bench-64k.bin | Throughput MB/s | 111.16 | 110.67 | -0.44% | +| bench-64k.bin | p95 ms | 330.00 | 309.00 | +6.36% | + +| bench-256k.bin | RPS | 507.04 | 507.04 | +0.00% | +| bench-256k.bin | Throughput MB/s | 126.97 | 126.97 | +0.00% | +| bench-256k.bin | p95 ms | 752.00 | 748.00 | +0.53% | + +| bench-1m.bin | RPS | 127.80 | 128.63 | +0.65% | +| bench-1m.bin | Throughput MB/s | 127.84 | 128.71 | +0.68% | +| bench-1m.bin | p95 ms | 1976.00 | 1993.00 | -0.86% | + +| bench-5m.bin | RPS | 83.12 | 114.00 | +37.15% | +| bench-5m.bin | Throughput MB/s | 415.72 | 570.22 | +37.16% | +| bench-5m.bin | p95 ms | 23172.00 | 13236.00 | +42.88% | + +| bench-10m.bin | RPS | 79.60 | 92.00 | +15.58% | +| bench-10m.bin | Throughput MB/s | 759.08 | 877.44 | +15.59% | +| bench-10m.bin | p95 ms | 25680.00 | 22631.00 | +11.87% | + +| System metric | Before | After | Delta | +| --- | ---: | ---: | ---: | +| CPU max % | 22.91 | 23.65 | -3.23% | +| Working set max MB | 5066.02 | 1210.59 | +76.10% | +| Private max MB | 5110.43 | 1358.33 | +73.42% | + +## Architecture + +The system is split into four modules behind one orchestrator: + +| Module | Location | Purpose | +|--------|----------|---------| +| **SharedCache** | `lib/cache/SharedCache.js` | Orchestration: watcher, ACK tracking, compaction dispatch, broadcast | +| **FilesystemCache** | `lib/cache/FilesystemCache.js` | Slab allocator with pooled SAB segments, extent-based allocation, compaction | +| **PlacementSource** | `lib/cache/PlacementSource.js` | Filesystem scanner, returns `{ stat, path }` per file | + +`SharedCache` owns the `FilesystemCache` instance and delegates all allocation, snapshot, projection, free, and compact operations to it. + +`FilesystemCache` has no dependencies on Node.js built-ins. This allows it to be used in a browser or in tests without mocks. + +## Limit mode: Slab Allocator + +The memory management model follows the Linux SLUB allocator principle: SharedArrayBuffer segments are **never returned to the OS**. Instead, they go through a lifecycle: + +``` +-----------� files deleted -----------� memory needed -----------� +� Active � ------------------> � Clean � ------------------> � Active � +� (data) � � (empty) � � (data) � +L----------- L----------- L----------- +``` + +**Why SABs are never freed:** V8 cannot reduce reserved virtual memory after a SharedArrayBuffer is deallocated. Recreating a SAB of the same size still allocates a new page. Retaining empty segments (`emptySegmentIds`) and reusing them completely eliminates allocation system calls. + +### Internal classes in FilesystemCache.js + +#### Pool + +Manages the memory budget and the set of SAB segments. + +``` +Pool ++-- segments: Map � all segments ++-- emptySegmentIds: Set — empty, ready for reuse ++-- limit: number � total budget (default 1 GiB) ++-- baseSegmentSize: number � segment size (default 64 MiB) ++-- totalUsed: number � total size of all SABs +� ++-- createBaseSegment() � takes a clean segment or creates a new SAB ++-- freeSegment(id) — marks an empty segment as clean +L-- getSegment(id) � access by ID +``` + +Key detail: `baseSegmentSize = Math.ceil(maxFileSize / configured) * configured`. The segment size is rounded up to the nearest multiple of the configured value that can fit `maxFileSize`. For example, with `configured = 64 MiB` and `maxFileSize = 100 MiB`, the segment size becomes `128 MiB` (2 ? 64). When `maxFileSize` is smaller than `configured` (the typical case, e.g. `10 MB` and `64 MiB`), the segment size stays at `configured`. The `limit` must be evenly divisible by the effective segment size � otherwise the remainder is wasted. There are no dedicated segments � a single segment type serves all files. + +#### SegmentRegistry + +Extent-based allocator within base segments. Each segment is tracked via: + +- **`partially: Map`** � boundary of written data (high water mark) +- **`empty: Map>`** � freed regions + +`allocate(size, limitReached=false)` algorithm: + +``` +1. Best-fit search across free extents of all segments + > found a match > return { segmentId, offset } + > exact size match > remove the extent + > larger than needed > shrink the extent + +2. Tail-append: find a segment where tail + size ? baseSegmentSize + > found > advance tail, return + +3. New segment (if !limitReached): + > pool.createBaseSegment() > registerSegment > tail = size + > budget exhausted > return null (file becomes a disk entry) +``` + +With `limitReached=true` (used by compact), step 3 is skipped � data is only moved into existing segments. + +`free()` inserts an extent into a sorted list and merges adjacent ones: + +``` +[100..200] + [200..300] > [100..300] // merge right +[0..100] + [100..300] > [0..300] // merge left +``` + +### Entry types (limit mode) + +**Shared entry** � file in SAB: +```js +{ kind: 'shared', segmentId, offset, length, stat } +``` +Zero-byte files are shared entries with `segmentId: 0, offset: 0, length: 0` � no segment is allocated. + +**Disk entry** � file on disk (size > maxFileSize, or budget exhausted): +```js +{ kind: 'disk', path, stat, data: null } +``` + +--- + + +## File loading + +### Initial load (main thread) + +``` +SharedCache.initialize() + > for each placement: + source.load() // PlacementSource scans the directory + cache.load(name, files) // Backend distributes files across SABs +``` + +If shared cache initialization fails (configuration, filesystem, or reader error), application startup is aborted � there is no fallback to per-worker static loading. Empty placements are valid: initialization succeeds with an empty index and zero allocated segments. + +In limit mode, `load()` sorts files by descending size � large files are placed first, reducing fragmentation. For each file, `#allocateEntry()` is called: + +1. `size > maxFileSize` > disk entry +2. No data and no reader > disk entry +3. `size === 0` > shared entry with `segmentId: 0, offset: 0, length: 0` (no segment allocated) +4. `registry.allocate(size)` > obtains `{ segmentId, offset }` � free space in a segment +5. `reader(path, sab, offset, size)` > reads the file from disk directly into SAB, bypassing the heap; if `data` is already in memory � copies via `Uint8Array.set(data)` + + +The reader is injected when SharedCache is created � it is `async (path, sab, offset, size) => void`. In Node.js it is implemented via `fh.read(Buffer.from(sab, offset, size))` � a Buffer view is created over the SharedArrayBuffer region, and `fs` writes data directly there. + +### Delivery to workers + +``` +workerData.sharedCache = cache.snapshot() +``` + +Limit mode snapshot: +```js +{ segments: [{ id, sab }, ...], filesystems: { placement: { entries: [...] } } } +``` + + +SharedArrayBuffer is passed via `workerData` � V8 transfers only a reference, no data copying occurs. + +## Worker-side projection + +```js +const { FilesystemCache } = require('./cache/FilesystemCache.js'); +const segmentsMap = new Map(); +for (const seg of sharedCache.segments) segmentsMap.set(seg.id, seg.sab); +const projectEntry = (entry) => FilesystemCache.projectEntry(entry, segmentsMap); +``` + +ACK: + +```js +const sendAck = (updateId) => parentPort.postMessage({ name: 'ack-update', updateId }); +``` + + +### Limit mode projection + +Each shared entry is projected into an object with an eager Buffer view: + +```js +{ data: Buffer.from(segmentsMap.get(segmentId), offset, length), stat } +``` + +Zero-byte entries (`length === 0`) are projected as `{ data: Buffer.alloc(0), stat }` without consulting `segmentsMap`. `free()` also skips zero-byte entries � they hold no segment allocation. + +`Buffer.from(sab, offset, length)` creates a lightweight view (~64 bytes descriptor) over the SAB region � no data copy. The view is created once at projection time. Since segments are never freed (slab retention), SAB references in `segmentsMap` live for the entire process lifetime. When a file is removed via `deleteFiles`, the projected object loses its last reference and is GC'd along with the Buffer view. Stale data in the segment is overwritten upon reuse. + + +### Common + +Disk entries in both modes are projected as `{ data: null, stat, path }`. + +## Hot-reload: epoch-based delta updates + +metawatch debounces filesystem events, collecting them into a batch during a quiet period. SharedCache uses **epoch coalescing** on top of this: all changes and deletions in a single metawatch batch are collected into one epoch, then flushed as minimal broadcasts. + +Routing from a filesystem event to a placement is done by the first path segment relative to application root, not by absolute-path prefix matching. This avoids collisions such as `static` vs `static2`. + +``` +metawatch SharedCache + � � + +-- debounce fs.watch events � + +-- 'before' ------------------> epoch = { updates, deletes, oldEntries } + +-- 'change' file1 -----------> push processChange() promise + +-- 'change' file2 -----------> push processChange() promise + +-- 'delete' file3 -----------> processDelete() (sync) + +-- 'after' -------------------> Promise.all > flushEpoch() +``` + +`#flushEpoch` delegates to `#flushEpochWithAck`. + +### Limit mode flush + +Sends at most `2 ? placements` messages (one `file-update` and one `file-delete` per placement), each carrying an `updateId`. All old entries are tracked against the **last** `updateId` � since `worker_threads` guarantees FIFO ordering, an ACK for the last message implies all prior messages have been processed. + +``` +flushEpochWithAck: + 1 file-update per placement (entries + newSegments + updateId) + 1 file-delete per placement (keys + updateId) + trackUpdate(lastUpdateId, all old entries) +``` + +1000 file changes > 1 broadcast with 1000 entries > N workers receive 1 message > N ACKs > 1 free cycle. + +### ACK protocol (limit mode only) + +Old entries are not freed immediately � a worker may be reading data at the moment of an update. Protocol: + +``` +Main thread Workers + � � + +-- file-update (updateId=5) -----> � + +-- file-delete (updateId=6) -----> � + � +-- apply update, ack 5 + � <-------------------- ack 5 (ignored, not tracked) + � +-- apply delete, ack 6 + � <-------------------- ack 6 -----+ + � ... all workers acked 6 ... � + +-- free(all oldEntries) � + +-- tryCompact() � + L------------------------------------- +``` + +If a worker crashes, the `worker.exit` event triggers `sharedCache.handleWorkerExit(id)`, which immediately removes the worker from all pending ACK sets. If it was the last expected worker, `free` is called right away. The new worker is restarted and receives a fresh `snapshot()`. + +There is no timeout-based forced free � a live worker will always eventually process its message queue and send an ACK. Forced free of a slow-but-alive worker would risk data corruption: the freed extent could be reused by another file while the worker's Buffer view still points to it. + +## Compaction (limit mode only) + +After entries are freed, `compact(threshold=0.3)` is called: + +1. Finds the base segment with the lowest utilization below `threshold` +2. Requires at least 2 base segments (a single segment has nowhere to compact to) +3. Attempts to move all files from the target segment into others (via `allocate(size, limitReached=true)`) +4. On success � updates indexes, groups moved files by placement, sends one `file-update` per affected placement, and tracks all `oldEntries` against the **last** `updateId` of the compaction batch +5. On failure � full rollback: restores extents and tail of the target segment + +Compaction uses the same batch-first ACK rule as epoch flush: workers may receive several `file-update` messages from one compaction, but memory is released only after the ACK for the last message in that batch. + +After compaction, the emptied segment automatically enters `emptySegmentIds` through the normal `free > freeSegment` cycle. + +``` +Before compaction: + +Segment 1: [fileA][____][fileB][________] utilization 20% +Segment 2: [fileC][fileD][______________] utilization 60% + +After: + +Segment 1: > clean (empty, ready for reuse) +Segment 2: [fileC][fileD][fileA][fileB][_] utilization 80% +``` + +## Serving (lib/static.js) + +`Static` is the worker-side serving layer. Each shared-cache placement creates a `Static` instance that holds projected `files` and handles HTTP responses. + +### Initialization + +`initServing(config)` is called after projection. It reads two config options: + +- **`streamThreshold`** � file size above which responses are streamed rather than written as a single buffer. Default: `'1 mb'`. Accepts any size unit (`sizeToBytes`). +- **`virtualFS`** � enables recursive virtual filesystem resolution. Default: `false`. + +When `virtualFS` is **off** (default): +- `search` = `lookup()` � exact match + `index.html` for directory paths +- `errorPage` � generates a minimal HTML page (`

404 Not Found

`) + +When `virtualFS` is **on**: +- `search` = `find()` � walks up the directory tree looking for `index.html`, `.virtual.html`, `.{code}.html` +- `errorPage` � searches for custom error pages (`.404.html`, `.416.html`) in the file tree + +### Serve flow + +``` +serve(url, transport) + � + +- 1. Fast exact-hit (file has data + stat, not internal) + � +-- Range request? > validate > stream or subarray > 206 + � +-- size > streamThreshold? > createSABStream() > 200 + � L-- small file > transport.write(data) > 200 + � + +- 2. Recursive search via lookup() or find() + � +-- file with data + stat (e.g. index.html via directory path) > write directly + � L-- file with data only (status/virtual pages) > write directly + � Range and streaming intentionally omitted � always small HTML files + � + +- 3. Disk fallback (uncached or oversized) + � +-- Range request? > validate > fs.createReadStream(options) > 206 + � L-- fs.createReadStream() > 200 + � + L- 4. 404 +``` + +### SAB streaming + +When a cached file exceeds `streamThreshold`, it is sent via `createSABStream()` which reads 64 KiB chunks from the SAB Buffer view. This applies to both full responses and range requests. Files below the threshold are written as a single buffer (or `subarray` for ranges). + +### Range requests + +Supported in the exact-hit and disk fallback paths: +- Valid range > 206 Partial Content (stream or subarray depending on size vs threshold) +- Invalid range (`start >= end`, `start >= size`, `end >= size`) > 416 Range Not Satisfiable + +Range requests reaching the recursive search path (step 2) are ignored by design � those paths resolve only small HTML files that are always served in full. + +### Disk fallback + +Files with `data: null` (oversized or budget-exhausted) are served from disk via `fs.createReadStream()`, with range support via `start`/`end` options. + +## Configuration + +```js +// config/cache.js +({ + maxFileSize: '10 mb', // files larger than this > disk entry + streamThreshold: '1 mb', // files larger than this > streamed in chunks (default '1 mb') + virtualFS: false, // enable recursive virtual FS resolution (default false) + placements: [ + { name: 'static' }, + { name: 'resources' }, + { name: 'assets', ext: ['.png', '.jpg', '.woff2'] }, + ], + // Limit mode (slab allocator options): + sab: { + limit: '1 gib', // total SAB budget (must be divisible by segment size) + baseSegmentSize: '64 mib', // single segment size (must be ? maxFileSize) + }, +}); +``` + +All size values support both binary (KiB, MiB, GiB) and decimal (KB, MB, GB) units. + +The entire `cache` section is optional � when absent, all defaults apply (`mode: 'limit'`, `maxFileSize: '10 mb'`, `streamThreshold: '1 mb'`, `virtualFS: false`, `sab.limit: '1 gib'`, `sab.baseSegmentSize: '64 mib'`, placements: `static` + `resources`). + +**Important (limit mode):** `limit` must be evenly divisible by the effective `baseSegmentSize`. Otherwise the remainder is wasted � Pool cannot create a segment smaller than `baseSegmentSize`. The effective segment size is `Math.ceil(maxFileSize / configured) * configured`. + +## Safety invariants + +**Common (both modes):** +- Workers **never write** to SharedArrayBuffer +- All worker Buffer views are zero-copy descriptors over shared memory + +**Limit mode:** +- Old memory is freed **only after ACK** from all workers or worker exit +- SAB references in worker `segmentsMap` live for the entire process lifetime (slab retention) +- All worker Buffer views reference SABs from a **single** `segmentsMap` (not a copy) +- SAB segments are **never returned to the OS** � only reused +- Total memory usage is always ? `limit` + +## Patterns and influences + +The cache design draws on several well-known systems patterns: + +- **SLUB slab allocator** (Linux kernel) � segments are never returned to the OS; empty segments are marked clean and reused, eliminating allocation system calls (limit mode) +- **Extent-based allocation** (ext4, XFS) � free space tracked as `{ offset, length }` extents with best-fit search and adjacent merge on free (limit mode) +- **Event coalescing / group commit** (PostgreSQL WAL, Nagle's algorithm) � metawatch debounces fs events into batches, SharedCache coalesces each batch into minimal broadcasts via epoch flush (both modes) +- **Copy-on-write update** (MVCC) � file updates allocate a new extent, old data lives until all workers ACK; readers never see partial writes (limit mode) +- **Dependency injection** � LimitCache accepts an injectable `reader` function, keeping it free of Node.js built-in dependencies for cross-platform use + +## Data flow diagram + +``` + -----------------------------------� + � Main Thread � + � � + � SharedCache (orchestrator) � + � +-- FilesystemCache � + � � +-- Pool � + � � � L-- SAB segments � + � � L-- SegmentRegistry � + � � L-- extents/tails � + � +-- PlacementSource[] � + � L-- Watcher � + L----------T------------------------ + � + snapshot / file-update / file-delete + � + -------------------+------------------� + � � � + --------------� --------------� --------------� + � Worker 1 � � Worker 2 � � Worker N � + � � � � � � + � mode detect � � mode detect � � mode detect � + � project() � � project() � � project() � + � � � � � � + � place.files � � place.files � � place.files � + � (views) � � (views) � � (views) � + L-------------- L-------------- L-------------- + � � � + L---- Buffer.from(sab, ...) ----------- + zero-copy data access +``` + +## Integration Guide + +The cache module (`lib/cache/`) is self-contained: it depends only on `metawatch` and `metautil` from the Metarhia ecosystem and has no knowledge of the application framework structure or worker lifecycle. + +### Install dependencies + +```sh +npm install metawatch metautil +``` + +### Create and initialize SharedCache + +```js +const { SharedCache } = require('./lib/cache/SharedCache.js'); +const { Worker } = require('node:worker_threads'); + +// threads Map must be created before SharedCache so the closures capture it +const threads = new Map(); + +const cache = new SharedCache({ + // SAB budget — all optional, defaults shown + limit: '1 gib', + baseSegmentSize: '64 mib', + maxFileSize: '10 mb', + + // DirectoryWatcher debounce timeout, ms — optional + watchTimeout: 2000, + + // Directories to serve under `dir` — optional, default: static + resources + placements: [ + { name: 'static' }, + { name: 'resources' }, + { name: 'assets', ext: ['.png', '.jpg', '.woff2'] }, + ], + + // Application root directory (placements live here) + dir: '/path/to/app', + + // Console-compatible logger + console, + + // Called once per message batch to all active workers + broadcast: (data) => { + for (const thread of threads.values()) thread.postMessage(data); + }, + + // Returns iterable of active worker IDs (used to track pending ACKs) + getWorkerIds: () => threads.keys(), +}); + +await cache.initialize(); // scan placements, load files into SAB +cache.watch(); // start filesystem watcher +``` + +### Deliver snapshot to a new worker + +Pass the snapshot in `workerData` before creating the worker. The snapshot contains SAB references — V8 transfers only descriptors, no data is copied. + +```js +const workerData = { + sharedCache: cache.snapshot(), + // ... other workerData fields +}; +const worker = new Worker(workerPath, { workerData }); +threads.set(workerId, worker); + +worker.on('message', (msg) => { + if (msg.name === 'ack-update') cache.handleAck(msg.updateId, workerId); +}); + +worker.on('exit', () => { + cache.handleWorkerExit(workerId); + threads.delete(workerId); +}); +``` + +### Worker side + +```js +const { workerData, parentPort } = require('node:worker_threads'); +const { FilesystemCache } = require('./cache/FilesystemCache.js'); + +const { sharedCache } = workerData; + +// Build segment map from initial snapshot +const segmentsMap = new Map(); +for (const seg of sharedCache.segments) segmentsMap.set(seg.id, seg.sab); + +// Project a placement into a files Map (key -> { data: Buffer|null, stat, path? }) +const files = FilesystemCache.project(sharedCache.filesystems['static'], segmentsMap); + +// Send ACK after applying each update +const sendAck = (updateId) => + parentPort.postMessage({ name: 'ack-update', updateId }); + +// Handle delta messages from main thread +parentPort.on('message', (msg) => { + if (msg.name === 'file-update') { + // Register new segments before projecting entries + for (const seg of msg.newSegments) segmentsMap.set(seg.id, seg.sab); + for (const [key, entry] of msg.updates) { + files.set(key, FilesystemCache.projectEntry(entry, segmentsMap)); + } + sendAck(msg.updateId); + } else if (msg.name === 'file-delete') { + for (const key of msg.keys) files.delete(key); + sendAck(msg.updateId); + } +}); +``` + +### Message protocol reference + +| Message (main → worker) | Fields | Notes | +|---|---|---| +| `file-update` | `target`, `updateId`, `updates: [[key, entry], ...]`, `newSegments: [{id, sab}]` | Apply entries then ACK | +| `file-delete` | `target`, `updateId`, `keys: [string]` | Delete keys then ACK | + +| Message (worker → main) | Fields | Notes | +|---|---|---| +| `ack-update` | `updateId` | Sent after applying any message with `updateId` | diff --git a/impress.js b/impress.js index f3db1d7fa..d2309d126 100644 --- a/impress.js +++ b/impress.js @@ -11,6 +11,7 @@ const { Pool, isError } = require('metautil'); const { loadSchema } = require('metaschema'); const { Logger } = require('metalog'); const { Planner } = require('./lib/planner.js'); +const { SharedCache } = require('./lib/cache/SharedCache.js'); const CONFIG_SECTIONS = ['log', 'scale', 'server', 'sessions']; const PATH = process.cwd(); @@ -59,7 +60,15 @@ const broadcast = (app, data) => { }; const startWorker = async (app, kind, port, id = ++impress.lastWorkerId) => { - const workerData = { id, kind, root: app.root, path: app.path, port }; + const sharedCache = app.sharedCache.snapshot(); + const workerData = { + id, + kind, + root: app.root, + path: app.path, + port, + sharedCache, + }; const execArgv = [...process.execArgv, `--test-reporter=${REPORTER_PATH}`]; const options = { trackUnmanagedFds: true, workerData, execArgv }; const worker = new Worker(WORKER_PATH, options); @@ -74,6 +83,7 @@ const startWorker = async (app, kind, port, id = ++impress.lastWorkerId) => { }); worker.on('exit', (code) => { + app.sharedCache.handleWorkerExit(id); if (code !== 0) startWorker(app, kind, port, id); else app.threads.delete(id); if (impress.initialization) exit('Can not start Application server', 1); @@ -128,6 +138,10 @@ const startWorker = async (app, kind, port, id = ++impress.lastWorkerId) => { terminate: ({ code }) => { process.emit('TERMINATE', code); }, + + 'ack-update': ({ updateId }) => { + app.sharedCache.handleAck(updateId, id); + }, }; worker.on('message', (msg) => { @@ -175,9 +189,40 @@ const loadApplication = async (root, dir, master) => { impress.config = config; } const { balancer, ports = [], workers = {} } = config.server; + const { cache = {} } = config; const threads = new Map(); + const cacheOptions = { + limit: cache.sab?.limit, + baseSegmentSize: cache.sab?.baseSegmentSize, + maxFileSize: cache.maxFileSize, + watchTimeout: config.server.timeouts.watch, + placements: cache.placements, + dir, + console: impress.console, + broadcast: (data) => { + for (const thread of threads.values()) thread.postMessage(data); + }, + getWorkerIds: () => threads.keys(), + }; + const sharedCache = new SharedCache(cacheOptions); + try { + await sharedCache.initialize(); + } catch (error) { + error.message = `Shared cache init failed: ${error.message}`; + throw error; + } + const pool = new Pool({ timeout: workers.wait }); - const app = { root, path: dir, config, threads, pool, ready: 0 }; + const app = { + root, + path: dir, + config, + threads, + pool, + ready: 0, + sharedCache, + }; + sharedCache.watch(); if (balancer) await startWorker(app, 'balancer', balancer); for (const port of ports) await startWorker(app, 'server', port); const poolSize = workers.pool || 0; diff --git a/lib/application.js b/lib/application.js index 6efc01a04..7298100e7 100644 --- a/lib/application.js +++ b/lib/application.js @@ -55,6 +55,7 @@ class Application extends EventEmitter { this.watcher = null; this.semaphore = null; this.server = null; + this.sharedCache = null; } absolute(relative) { @@ -70,14 +71,16 @@ class Application extends EventEmitter { } } - async load({ invoke }) { + async load({ invoke, sharedCache }) { + if (sharedCache) { + this.sharedCache = sharedCache; + this.applySharedCache(sharedCache); + } this.startWatch(); this.createSandbox(); this.sandbox.application.invoke = invoke; this.sandbox.application.emit('loading'); await this.parallel([ - this.static.load(), - this.resources.load(), this.cert.load(), (async () => { await this.schemas.load(); @@ -94,6 +97,25 @@ class Application extends EventEmitter { await this.start(); } + applySharedCache(sharedCache) { + const { projectEntry, config } = this; + const { filesystems } = sharedCache; + for (const name of Object.keys(filesystems)) { + const index = filesystems[name]; + const entries = + index.entries instanceof Map ? index.entries : new Map(index.entries); + const files = new Map(); + for (const [key, entry] of entries) { + files.set(key, projectEntry(entry)); + } + const place = this[name]; + if (!place) continue; + place.setFiles(files); + if (place.initServing) place.initServing(config); + } + sharedCache.segments = null; + } + async start() { const { sandbox, config, cert, mode } = this; const { kind, port } = workerData; @@ -198,11 +220,15 @@ class Application extends EventEmitter { startWatch() { const timeout = this.config.server.timeouts.watch; this.watcher = new DirectoryWatcher({ timeout }); + const shared = this.sharedCache + ? new Set(Object.keys(this.sharedCache.filesystems)) + : new Set(); this.watcher.on('change', (filePath) => { const relPath = filePath.substring(this.path.length + 1); const sepIndex = relPath.indexOf(node.path.sep); const place = relPath.substring(0, sepIndex); + if (shared.has(place)) return; node.fs.stat(filePath, (error, stat) => { if (error) return; if (stat.isDirectory()) return void this[place].load(filePath); @@ -215,6 +241,7 @@ class Application extends EventEmitter { const relPath = filePath.substring(this.path.length + 1); const sepIndex = relPath.indexOf(node.path.sep); const place = relPath.substring(0, sepIndex); + if (shared.has(place)) return; this[place].delete(filePath); if (threadId === 1) this.console.debug('Deleted: /' + relPath); }); diff --git a/lib/cache/FilesystemCache.js b/lib/cache/FilesystemCache.js new file mode 100644 index 000000000..848923c91 --- /dev/null +++ b/lib/cache/FilesystemCache.js @@ -0,0 +1,398 @@ +'use strict'; + +const DEFAULT_LIMIT = 1024 * 1024 * 1024; +const DEFAULT_BASE_SEGMENT_SIZE = 64 * 1024 * 1024; +const DEFAULT_MAX_FILE_SIZE = 10 * 1024 * 1024; + +class Pool { + constructor(limit, baseSegmentSize) { + this.limit = limit; + this.baseSegmentSize = baseSegmentSize; + this.segments = new Map(); + this.emptySegmentIds = new Set(); + this.totalUsed = 0; + this.nextSegmentId = 1; + } + + canAllocate(size) { + return this.totalUsed + size <= this.limit; + } + + createBaseSegment() { + for (const id of this.emptySegmentIds) { + this.emptySegmentIds.delete(id); + return this.segments.get(id); + } + const size = this.baseSegmentSize; + if (!this.canAllocate(size)) return null; + const id = this.nextSegmentId++; + const sab = new SharedArrayBuffer(size); + const segment = { id, sab }; + this.segments.set(id, segment); + this.totalUsed += size; + return segment; + } + + freeSegment(id) { + if (!this.segments.has(id)) return false; + if (this.emptySegmentIds.has(id)) return true; + this.emptySegmentIds.add(id); + return true; + } + + getSegment(id) { + return this.segments.get(id) || null; + } + + getSegmentsSnapshot() { + const result = []; + for (const seg of this.segments.values()) { + result.push({ id: seg.id, sab: seg.sab }); + } + return result; + } +} + +class SegmentRegistry { + constructor(pool) { + this.pool = pool; + this.empty = new Map(); + this.partially = new Map(); + } + + register(segmentId) { + this.empty.set(segmentId, []); + this.partially.set(segmentId, 0); + } + + allocate(size, limitReached = false) { + let bestFit = null; + for (const [segmentId, extents] of this.empty) { + for (let i = 0; i < extents.length; i++) { + const extent = extents[i]; + if (extent.length < size) continue; + if (!bestFit || extent.length < bestFit.extent.length) { + bestFit = { segmentId, index: i, extent }; + } + } + } + if (bestFit) { + const { segmentId, index, extent } = bestFit; + const offset = extent.offset; + const extents = this.empty.get(segmentId); + if (extent.length === size) { + extents.splice(index, 1); + } else { + extents[index] = { + offset: extent.offset + size, + length: extent.length - size, + }; + } + return { segmentId, offset }; + } + for (const [segmentId, tail] of this.partially) { + if (tail + size <= this.pool.baseSegmentSize) { + this.partially.set(segmentId, tail + size); + return { segmentId, offset: tail }; + } + } + if (limitReached) return null; + const segment = this.pool.createBaseSegment(); + if (!segment) return null; + this.register(segment.id); + this.partially.set(segment.id, size); + return { segmentId: segment.id, offset: 0 }; + } + + free(segmentId, offset, length) { + const extents = this.empty.get(segmentId); + if (!extents) return; + const newExtent = { offset, length }; + let insertIndex = extents.findIndex((e) => e.offset > offset); + if (insertIndex === -1) insertIndex = extents.length; + extents.splice(insertIndex, 0, newExtent); + SegmentRegistry.mergeSiblings(extents, insertIndex); + } + + static mergeSiblings(extents, index) { + if (index + 1 < extents.length) { + const current = extents[index]; + const next = extents[index + 1]; + if (current.offset + current.length === next.offset) { + current.length += next.length; + extents.splice(index + 1, 1); + } + } + if (index > 0) { + const prev = extents[index - 1]; + const current = extents[index]; + if (prev.offset + prev.length === current.offset) { + prev.length += current.length; + extents.splice(index, 1); + } + } + } + + isUsed(segmentId) { + const tail = this.partially.get(segmentId); + if (!tail) return 0; + const extents = this.empty.get(segmentId); + if (!extents) return tail; + let free = 0; + for (const e of extents) free += e.length; + return tail - free; + } + + isEmpty(segmentId) { + return this.isUsed(segmentId) === 0; + } + + unregister(segmentId) { + this.empty.delete(segmentId); + this.partially.delete(segmentId); + } +} + +class FilesystemCache { + constructor(options = {}) { + const limit = options.limit || DEFAULT_LIMIT; + const maxFileSize = options.maxFileSize || DEFAULT_MAX_FILE_SIZE; + const configured = options.baseSegmentSize || DEFAULT_BASE_SEGMENT_SIZE; + const baseSegmentSize = Math.ceil(maxFileSize / configured) * configured; + this.maxFileSize = maxFileSize; + this.baseSegmentSize = baseSegmentSize; + this.reader = options.reader || null; + this.pool = new Pool(limit, baseSegmentSize); + this.registry = new SegmentRegistry(this.pool); + this.filesystems = {}; + } + + get totalUsed() { + return this.pool.totalUsed; + } + + getSegment(id) { + return this.pool.getSegment(id); + } + + async load(name, filesMap) { + const entries = new Map(); + const segmentIds = new Set(); + const candidates = [...filesMap.entries()]; + candidates.sort((a, b) => (b[1].stat?.size || 0) - (a[1].stat?.size || 0)); + for (const [key, file] of candidates) { + const entry = await this.#allocateEntry(file, segmentIds); + entries.set(key, entry); + } + const index = { entries, segmentIds }; + this.filesystems[name] = index; + return index; + } + + async allocate(name, key, file) { + let index = this.filesystems[name]; + if (!index) { + index = { entries: new Map(), segmentIds: new Set() }; + this.filesystems[name] = index; + } + const entry = await this.#allocateEntry(file, index.segmentIds); + index.entries.set(key, entry); + return entry; + } + + remove(fsId, filename) { + const index = this.filesystems[fsId]; + if (!index) return null; + const entry = index.entries.get(filename); + if (!entry) return null; + index.entries.delete(filename); + return entry; + } + + free(entry) { + if (!entry || entry.kind !== 'shared' || entry.length === 0) return; + const { segmentId } = entry; + const segment = this.pool.getSegment(segmentId); + if (!segment) return; + this.registry.free(segmentId, entry.offset, entry.length); + if (this.registry.isEmpty(segmentId)) { + this.registry.unregister(segmentId); + this.pool.freeSegment(segmentId); + } + } + + compact(threshold = 0.3) { + let target = null; + let minUtil = threshold; + let baseCount = 0; + for (const [segmentId, tail] of this.registry.partially) { + if (tail === 0) continue; + const segment = this.pool.getSegment(segmentId); + if (!segment) continue; + baseCount++; + const used = this.registry.isUsed(segmentId); + const util = used / this.baseSegmentSize; + if (util < minUtil) { + minUtil = util; + target = segmentId; + } + } + if (baseCount < 2 || !target) return null; + const items = []; + for (const name of Object.keys(this.filesystems)) { + for (const [key, entry] of this.filesystems[name].entries) { + if (entry.kind === 'shared' && entry.segmentId === target) { + items.push({ name, key, entry }); + } + } + } + if (items.length === 0) return null; + const savedTail = this.registry.partially.get(target); + const savedExtents = this.registry.empty.get(target).map((e) => ({ ...e })); + this.registry.unregister(target); + const moved = []; + let success = true; + for (const { name, key, entry } of items) { + const allocation = this.registry.allocate(entry.length, true); + if (!allocation) { + success = false; + break; + } + const oldSab = this.pool.getSegment(target).sab; + const src = new Uint8Array(oldSab, entry.offset, entry.length); + const dstSeg = this.pool.getSegment(allocation.segmentId); + new Uint8Array(dstSeg.sab, allocation.offset, entry.length).set(src); + moved.push({ + name, + key, + oldEntry: entry, + newEntry: { + kind: 'shared', + segmentId: allocation.segmentId, + offset: allocation.offset, + length: entry.length, + stat: entry.stat, + }, + }); + } + if (!success) { + for (const { newEntry } of moved) { + this.registry.free( + newEntry.segmentId, + newEntry.offset, + newEntry.length, + ); + } + this.registry.register(target); + this.registry.empty.set(target, savedExtents); + this.registry.partially.set(target, savedTail); + return null; + } + const updates = []; + const oldEntries = []; + const newSegmentIds = new Set(); + for (const { name, key, oldEntry, newEntry } of moved) { + this.filesystems[name].entries.set(key, newEntry); + this.filesystems[name].segmentIds.add(newEntry.segmentId); + newSegmentIds.add(newEntry.segmentId); + updates.push({ name, key, entry: newEntry, oldEntry }); + oldEntries.push(oldEntry); + } + for (const name of Object.keys(this.filesystems)) { + this.filesystems[name].segmentIds.delete(target); + } + const newSegments = []; + for (const id of newSegmentIds) { + const seg = this.pool.getSegment(id); + if (seg) newSegments.push({ id: seg.id, sab: seg.sab }); + } + return { updates, oldEntries, newSegments }; + } + + snapshot() { + const segments = this.pool.getSegmentsSnapshot(); + const filesystems = {}; + for (const name of Object.keys(this.filesystems)) { + const { entries } = this.filesystems[name]; + filesystems[name] = { entries: [...entries] }; + } + return { segments, filesystems }; + } + + stats() { + const segs = [...this.pool.segments.values()]; + const cleanCount = this.pool.emptySegmentIds.size; + const lines = segs.map((s) => { + const used = this.registry.isUsed(s.id); + const pct = ((used / this.baseSegmentSize) * 100).toFixed(1); + const mark = this.pool.emptySegmentIds.has(s.id) ? ' [empty]' : ''; + return ` seg ${s.id}: ${used}/${this.baseSegmentSize} (${pct}%)${mark}`; + }); + return { + segmentCount: segs.length, + cleanCount, + totalUsed: this.pool.totalUsed, + lines, + }; + } + + async #allocateEntry(file, segmentIds) { + const { data, stat, path: filePath } = file; + const size = stat?.size || 0; + if (size > this.maxFileSize) { + return { kind: 'disk', path: filePath, stat, data: null }; + } + if (!data && !this.reader) { + return { kind: 'disk', path: filePath, stat, data: null }; + } + if (size === 0) { + return { kind: 'shared', segmentId: 0, offset: 0, length: 0, stat }; + } + const allocation = this.registry.allocate(size); + if (!allocation) return { kind: 'disk', path: filePath, stat, data: null }; + const { segmentId, offset } = allocation; + const segment = this.pool.getSegment(segmentId); + await this.#writeToSegment(segment.sab, offset, size, data, filePath); + segmentIds.add(segmentId); + return { kind: 'shared', segmentId, offset, length: size, stat }; + } + + async #writeToSegment(sab, offset, size, data, filePath) { + if (size === 0) return; + if (data) { + const view = new Uint8Array(sab, offset, size); + view.set(data); + return; + } + if (this.reader) { + await this.reader(filePath, sab, offset, size); + return; + } + throw new Error(`No reader and no data for: ${filePath}`); + } + + static project(index, segmentsMap) { + const files = new Map(); + const entries = + index.entries instanceof Map ? index.entries : new Map(index.entries); + for (const [key, entry] of entries) { + files.set(key, FilesystemCache.projectEntry(entry, segmentsMap)); + } + return files; + } + + static projectEntry(entry, segmentsMap) { + if (entry.kind === 'shared') { + const { segmentId, offset, length } = entry; + if (length === 0) { + return { data: Buffer.alloc(0), stat: entry.stat }; + } + const sab = segmentsMap.get(segmentId); + const data = Buffer.from(sab, offset, length); + return { data, stat: entry.stat }; + } + return { data: null, stat: entry.stat, path: entry.path }; + } +} + +module.exports = { FilesystemCache }; diff --git a/lib/cache/PlacementSource.js b/lib/cache/PlacementSource.js new file mode 100644 index 000000000..2ec09148d --- /dev/null +++ b/lib/cache/PlacementSource.js @@ -0,0 +1,61 @@ +'use strict'; + +const path = require('node:path'); +const fsp = require('node:fs/promises'); +const metautil = require('metautil'); + +const WIN = process.platform === 'win32'; + +const toKey = WIN + ? (filePath, base) => { + const key = filePath.substring(base.length); + return metautil.replace(key, path.sep, '/'); + } + : (filePath, base) => filePath.substring(base.length); + +class PlacementSource { + constructor(name, dir, watcher, options = {}) { + this.name = name; + this.path = path.join(dir, name); + this.watcher = watcher; + this.files = new Map(); + this.ext = options.ext || null; + } + + getKey(filePath) { + return toKey(filePath, this.path); + } + + async load(targetPath = this.path) { + this.watcher.watch(targetPath); + let entries; + try { + entries = await fsp.readdir(targetPath, { withFileTypes: true }); + } catch { + return; + } + for (const entry of entries) { + const filePath = path.join(targetPath, entry.name); + if (entry.isDirectory()) { + await this.load(filePath); + } else { + await this.change(filePath); + } + } + } + + async change(filePath) { + const ext = metautil.fileExt(filePath); + if (this.ext && !this.ext.includes(ext)) return; + try { + const stat = await fsp.stat(filePath); + const key = this.getKey(filePath); + this.files.set(key, { stat, path: filePath }); + } catch { + const key = this.getKey(filePath); + this.files.delete(key); + } + } +} + +module.exports = { PlacementSource }; diff --git a/lib/cache/SharedCache.js b/lib/cache/SharedCache.js new file mode 100644 index 000000000..ca7bcc1ee --- /dev/null +++ b/lib/cache/SharedCache.js @@ -0,0 +1,292 @@ +'use strict'; + +const path = require('node:path'); +const fsp = require('node:fs/promises'); +const { DirectoryWatcher } = require('metawatch'); +const metautil = require('metautil'); +const { FilesystemCache } = require('./FilesystemCache.js'); +const { PlacementSource } = require('./PlacementSource.js'); + +const DEFAULT_PLACEMENTS = [{ name: 'static' }, { name: 'resources' }]; + +class SharedCache { + constructor({ limit, baseSegmentSize, maxFileSize, watchTimeout, placements, dir, console, broadcast, getWorkerIds }) { + const reader = async (filePath, sab, offset, size) => { + const fh = await fsp.open(filePath, 'r'); + try { + const buf = Buffer.from(sab, offset, size); + await fh.read(buf, 0, size, 0); + } finally { + await fh.close(); + } + }; + + this.cache = new FilesystemCache({ + limit: metautil.sizeToBytes(limit || '1 gib'), + baseSegmentSize: metautil.sizeToBytes(baseSegmentSize || '64 mib'), + maxFileSize: metautil.sizeToBytes(maxFileSize || '10 mb'), + reader, + }); + + this.placements = placements || DEFAULT_PLACEMENTS; + this.dir = dir; + this.watchTimeout = watchTimeout; + this.console = console; + this.broadcast = broadcast; + this.getWorkerIds = getWorkerIds; + this.sources = {}; + this.watcher = null; + this.nextUpdateId = 0; + this.pendingFrees = new Map(); + this.#afterAck = (pending) => { + this.#freeEntries(pending); + }; + } + + #afterAck; + + async initialize() { + this.watcher = new DirectoryWatcher({ timeout: this.watchTimeout }); + for (const placement of this.placements) { + const opts = placement.ext ? { ext: placement.ext } : {}; + this.sources[placement.name] = new PlacementSource( + placement.name, + this.dir, + this.watcher, + opts, + ); + } + for (const name of Object.keys(this.sources)) { + const source = this.sources[name]; + await source.load(); + await this.cache.load(name, source.files); + } + } + + snapshot() { + return this.cache.snapshot(); + } + + handleAck(updateId, workerId) { + if (!this.pendingFrees) return; + const pending = this.pendingFrees.get(updateId); + if (!pending) return; + pending.workerIds.delete(workerId); + if (pending.workerIds.size === 0) { + this.#afterAck(pending); + this.pendingFrees.delete(updateId); + } + } + + handleWorkerExit(workerId) { + if (!this.pendingFrees) return; + for (const [updateId, pending] of this.pendingFrees) { + pending.workerIds.delete(workerId); + if (pending.workerIds.size === 0) { + this.#afterAck(pending); + this.pendingFrees.delete(updateId); + } + } + } + + watch() { + const { sources, cache } = this; + + const sourcesByName = new Map(Object.entries(sources)); + + const findSource = (filePath) => { + const relPath = path.relative(this.dir, filePath); + if (!relPath || relPath.startsWith('..')) return null; + if (path.isAbsolute(relPath)) return null; + const sepIndex = relPath.indexOf(path.sep); + const name = sepIndex === -1 ? relPath : relPath.substring(0, sepIndex); + const source = sourcesByName.get(name); + return source ? { name, source } : null; + }; + + let epoch = null; + + const processChange = async (ep, name, source, filePath) => { + const stat = await fsp.stat(filePath).catch(() => null); + if (!stat) return; + if (stat.isDirectory()) { + const before = new Set(source.files.keys()); + await source.load(filePath); + for (const [key, file] of source.files) { + if (before.has(key)) continue; + const newEntry = await cache.allocate(name, key, file); + const group = + ep.updates[name] || + (ep.updates[name] = { entries: [], segmentIds: new Set() }); + group.entries.push([key, newEntry]); + if (newEntry.kind === 'shared' && newEntry.segmentId) { + group.segmentIds.add(newEntry.segmentId); + } + } + return; + } + await source.change(filePath); + const key = source.getKey(filePath); + const file = source.files.get(key); + if (!file) return; + const oldEntry = cache.filesystems[name]?.entries.get(key); + const newEntry = await cache.allocate(name, key, file); + const group = + ep.updates[name] || + (ep.updates[name] = { entries: [], segmentIds: new Set() }); + group.entries.push([key, newEntry]); + if (newEntry.kind === 'shared' && newEntry.segmentId) { + group.segmentIds.add(newEntry.segmentId); + } + if (oldEntry && oldEntry.kind === 'shared') { + ep.oldEntries.push(oldEntry); + } + }; + + const processDelete = (ep, name, source, filePath) => { + const prefix = source.getKey(filePath); + const keys = []; + const exactEntry = source.files.get(prefix); + if (exactEntry) { + source.files.delete(prefix); + keys.push(prefix); + } + const dirPrefix = prefix.endsWith('/') ? prefix : prefix + '/'; + for (const key of source.files.keys()) { + if (key.startsWith(dirPrefix)) { + source.files.delete(key); + keys.push(key); + } + } + if (keys.length === 0) return; + const group = ep.deletes[name] || (ep.deletes[name] = []); + for (const key of keys) { + group.push(key); + const old = cache.remove(name, key); + if (old && old.kind === 'shared') ep.oldEntries.push(old); + } + }; + + this.watcher.on('before', () => { + epoch = { updates: {}, deletes: {}, oldEntries: [], promises: [] }; + }); + + this.watcher.on('change', (filePath) => { + const entry = findSource(filePath); + const ep = epoch; + if (entry && ep) { + ep.promises.push(processChange(ep, entry.name, entry.source, filePath)); + } + }); + + this.watcher.on('delete', (filePath) => { + const entry = findSource(filePath); + const ep = epoch; + if (entry && ep) processDelete(ep, entry.name, entry.source, filePath); + }); + + this.watcher.on('after', () => { + const current = epoch; + epoch = null; + if (!current) return; + Promise.all(current.promises) + .then(() => this.#flushEpoch(current)) + .catch((err) => this.console.error(`[cache] epoch: ${err.message}`)); + }); + } + + #flushEpoch(epoch) { + return this.#flushEpochWithAck(epoch); + } + + #flushEpochWithAck(epoch) { + const { updates, deletes, oldEntries } = epoch; + let lastUpdateId = 0; + for (const name of Object.keys(updates)) { + const { entries, segmentIds } = updates[name]; + if (entries.length === 0) continue; + const newSegments = []; + for (const id of segmentIds) { + const seg = this.cache.getSegment(id); + if (seg) newSegments.push({ id: seg.id, sab: seg.sab }); + } + lastUpdateId = ++this.nextUpdateId; + this.#broadcast({ + name: 'file-update', + target: name, + updateId: lastUpdateId, + updates: entries, + newSegments, + }); + } + for (const name of Object.keys(deletes)) { + const keys = deletes[name]; + if (keys.length === 0) continue; + lastUpdateId = ++this.nextUpdateId; + this.#broadcast({ + name: 'file-delete', + target: name, + updateId: lastUpdateId, + keys, + }); + } + if (oldEntries.length > 0 && lastUpdateId > 0) { + this.#trackUpdate(lastUpdateId, oldEntries); + } + } + + #broadcast(data) { + this.broadcast(data); + } + + #freeEntries(pending) { + if (!pending) return; + for (const entry of pending.entries) this.cache.free(entry); + const { segmentCount, cleanCount, totalUsed, lines } = this.cache.stats(); + const count = pending.entries.length; + this.console.debug( + `[cache] freeEntries: ${count} entries freed, ` + + `${segmentCount} segments (${cleanCount} clean), ` + + `totalUsed=${totalUsed}\n${lines.join('\n')}`, + ); + this.#tryCompact(); + } + + #tryCompact() { + const result = this.cache.compact(); + if (!result) { + this.console.debug('[cache] compact: no target found'); + return; + } + this.console.info( + `[cache] compact: moved ${result.updates.length} files, ` + + `freed ${result.oldEntries.length} old entries`, + ); + const byName = {}; + for (const { name, key, entry } of result.updates) { + if (!byName[name]) byName[name] = []; + byName[name].push([key, entry]); + } + let lastUpdateId = 0; + for (const name of Object.keys(byName)) { + lastUpdateId = ++this.nextUpdateId; + this.#broadcast({ + name: 'file-update', + target: name, + updateId: lastUpdateId, + updates: byName[name], + newSegments: result.newSegments, + }); + } + if (result.oldEntries.length > 0 && lastUpdateId > 0) { + this.#trackUpdate(lastUpdateId, result.oldEntries); + } + } + + #trackUpdate(updateId, entries) { + const workerIds = new Set(this.getWorkerIds()); + this.pendingFrees.set(updateId, { workerIds, entries }); + } +} + +module.exports = { SharedCache }; diff --git a/lib/cert.js b/lib/cert.js index 614879202..841974af7 100644 --- a/lib/cert.js +++ b/lib/cert.js @@ -1,26 +1,65 @@ 'use strict'; const { node, metarhia, wt } = require('./deps.js'); -const { Static } = require('./static.js'); +const { Place } = require('./place.js'); -class Cert extends Static { +const WIN = process.platform === 'win32'; +const MAX_FILE_SIZE = '10 mb'; + +class Cert extends Place { constructor(name, application, options = {}) { - super(name, application, options); + super(name, application); + this.files = new Map(); this.domains = new Map(); + this.ext = options.ext; + this.maxFileSize = -1; } get(key) { return this.domains.get(key); } + getKey(filePath) { + const key = filePath.substring(this.path.length); + if (WIN) return metarhia.metautil.replace(key, node.path.sep, '/'); + return key; + } + + delete(filePath) { + const key = this.getKey(filePath); + this.files.delete(key); + } + + async change(filePath) { + if (this.maxFileSize === -1) { + const maxFileSize = this.application.config?.cache?.maxFileSize; + const size = maxFileSize || MAX_FILE_SIZE; + this.maxFileSize = metarhia.metautil.sizeToBytes(size); + } + const ext = metarhia.metautil.fileExt(filePath); + if (this.ext && !this.ext.includes(ext)) return; + try { + const stat = await node.fsp.stat(filePath); + const key = this.getKey(filePath); + if (stat.size > this.maxFileSize) { + this.files.set(key, { data: null, stat }); + } else { + const data = await node.fsp.readFile(filePath); + this.files.set(key, { data, stat }); + } + } catch { + this.delete(filePath); + } + } + async before(changes) { const folders = new Set(); for (const [name, event] of changes) { const dir = node.path.dirname(name); const folder = node.path.basename(dir); folders.add(folder); - if (event === 'change') await super.change(name); - if (event === 'datele') super.delete(name); + if (event === 'change') await this.change(name); + if (event === 'datele') this.delete(name); } await this.init([...folders]); changes.length = 0; diff --git a/lib/static.js b/lib/static.js index e074afdd6..f221be6d8 100644 --- a/lib/static.js +++ b/lib/static.js @@ -1,69 +1,91 @@ 'use strict'; const { node, metarhia } = require('./deps.js'); -const { Place } = require('./place.js'); const { join } = node.path.posix; +const { Readable } = node.stream; -const WIN = process.platform === 'win32'; -const MAX_FILE_SIZE = '10 mb'; +const CHUNK_SIZE = 65536; const STATUS_CACHE = new Map(); const status = (code) => { let file = STATUS_CACHE.get(code); if (file) return file; - const status = node.http.STATUS_CODES[code] || 'Unknown error'; + const statusText = node.http.STATUS_CODES[code] || 'Unknown error'; const data = Buffer.from(` -${code} ${status} -

${code} ${status}

`); +${code} ${statusText} +

${code} ${statusText}

`); file = { data, stat: null, code }; STATUS_CACHE.set(code, file); return file; }; -class Static extends Place { - constructor(name, application, options = {}) { - super(name, application); +const createSABStream = (data, options = {}) => { + const sab = data.buffer; + const base = data.byteOffset; + const total = data.byteLength; + const start = base + (options.start ?? 0); + const end = base + (options.end ?? total - 1); + let offset = start; + return new Readable({ + read() { + if (offset > end) return void this.push(null); + const chunkEnd = Math.min(offset + CHUNK_SIZE, end + 1); + this.push(Buffer.from(sab, offset, chunkEnd - offset)); + offset = chunkEnd; + }, + }); +}; + +class Static { + constructor(name, application) { + this.name = name; + this.path = application.absolute(name); this.files = new Map(); - this.ext = options.ext; - this.maxFileSize = -1; + this.streamThreshold = Infinity; } get(key) { return this.files.get(key); } - getKey(filePath) { - const key = filePath.substring(this.path.length); - if (WIN) return metarhia.metautil.replace(key, node.path.sep, '/'); - return key; + setFiles(filesMap) { + this.files = filesMap; } - delete(filePath) { - const key = this.getKey(filePath); - this.files.delete(key); + updateFiles(updates) { + for (const [key, file] of updates) { + this.files.set(key, file); + } } - async change(filePath) { - if (this.maxFileSize === -1) { - const maxFileSize = this.application.config?.cache?.maxFileSize; - const size = maxFileSize || MAX_FILE_SIZE; - this.maxFileSize = metarhia.metautil.sizeToBytes(size); + deleteFiles(keys) { + for (const key of keys) { + this.files.delete(key); } - const ext = metarhia.metautil.fileExt(filePath); - if (this.ext && !this.ext.includes(ext)) return; - try { - const stat = await node.fsp.stat(filePath); - const key = this.getKey(filePath); - if (stat.size > this.maxFileSize) { - this.files.set(key, { data: null, stat }); - } else { - const data = await node.fsp.readFile(filePath); - this.files.set(key, { data, stat }); - } - } catch { - this.delete(filePath); + } + + initServing(config) { + const cacheConfig = config?.cache || {}; + const { sizeToBytes } = metarhia.metautil; + this.streamThreshold = sizeToBytes(cacheConfig.streamThreshold || '1 mb'); + if (cacheConfig.virtualFS) { + this.search = this.find; + this.errorPage = (code, path) => this.find(path, code); + } else { + this.search = this.lookup; + this.errorPage = (code) => status(code); + } + } + + lookup(filePath) { + let file = this.files.get(filePath); + if (file) return { ...file, code: 200 }; + if (filePath.endsWith('/')) { + file = this.files.get(join(filePath, 'index.html')); + if (file) return { ...file, code: 200 }; } + return null; } find(path, code, parent = false) { @@ -92,20 +114,64 @@ class Static extends Place { return this.find(filePath, code, true); } + async serve(url, transport) { const [filePath] = metarhia.metautil.split(url, '?'); const fileExt = metarhia.metautil.fileExt(filePath); - let file = this.find(filePath); - if (file.data && file.stat) { + + // Fast exact-hit path for ordinary cached files + const exact = this.get(filePath); + const internal = node.path.basename(filePath).startsWith('.'); + if (exact && exact.data && exact.stat && !internal) { + const { data } = exact; + const size = data.byteLength; + const { headers } = transport.req; + if (headers.range) { + const range = metarhia.metautil.parseRange(headers.range); + const { start, end = size - 1 } = range; + if (start >= end || start >= size || end >= size) { + const err = this.errorPage(416, filePath); + return void transport.write(err.data, 416, fileExt); + } + if (size > this.streamThreshold) { + const readable = createSABStream(data, { start, end }); + const options = { start, end, size }; + return void transport.write(readable, 206, fileExt, options); + } + const slice = data.subarray(start, end + 1); + const options = { start, end, size }; + return void transport.write(slice, 206, fileExt, options); + } + if (size > this.streamThreshold) { + const readable = createSABStream(data); + const options = { size }; + return void transport.write(readable, 200, fileExt, options); + } + return void transport.write(data, 200, fileExt); + } + + // Recursive search (index, virtual, status pages). + // Range and streaming are intentionally omitted here — these paths only + // resolve small HTML files (index.html, .virtual.html, .NNN.html). + // Direct file requests always hit the exact-hit path above which supports + // full Range/streaming. find()/virtualFS are candidates for deprecation. + const file = this.search(filePath); + if (file && file.data && file.stat) { if (file.code === -1) return void transport.write(file.data, 200, 'html'); return void transport.write(file.data, file.code, fileExt); } - const absPath = join(this.path, url); + if (file && file.data) { + const ext = file.code === -1 ? 'html' : fileExt; + const statusCode = file.code === -1 ? 200 : file.code || 404; + return void transport.write(file.data, statusCode, ext); + } + + // Disk fallback: uncached or oversized file + const absPath = join(this.path, filePath); if (absPath.startsWith(this.path)) { - let { stat } = file; - if (!stat) stat = await node.fsp.stat(absPath).catch(() => null); - if (stat && stat.isFile()) { - const { size } = stat; + const fsStat = await node.fsp.stat(absPath).catch(() => null); + if (fsStat && fsStat.isFile()) { + const { size } = file?.stat ?? fsStat; const options = { size }; let code = 200; const { headers } = transport.req; @@ -113,8 +179,8 @@ class Static extends Place { const range = metarhia.metautil.parseRange(headers.range); const { start, end = size - 1 } = range; if (start >= end || start >= size || end >= size) { - file = this.find(filePath, 416); - return void transport.write(file.data, 416, fileExt); + const err = this.errorPage(416, filePath); + return void transport.write(err.data, 416, fileExt); } options.start = start; options.end = end; @@ -124,8 +190,10 @@ class Static extends Place { return void transport.write(readable, code, fileExt, options); } } - if (file.code === -1) return void transport.write(file.data, 200, 'html'); - return void transport.write(file.data, 404); + + // 404 + const err = this.errorPage(404, filePath); + return void transport.write(err.data, 404); } } diff --git a/lib/worker.js b/lib/worker.js index 4f15e98d2..9e544338f 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -5,6 +5,24 @@ const { parentPort, threadId, workerData } = wt; const application = require('./application.js'); +// Pre-initialize shared cache projection infrastructure +const segmentsMap = new Map(); +const sharedCache = workerData.sharedCache; +let projectEntry = null; + +if (sharedCache) { + const { FilesystemCache } = require('./cache/FilesystemCache.js'); + for (const seg of sharedCache.segments) segmentsMap.set(seg.id, seg.sab); + projectEntry = (entry) => FilesystemCache.projectEntry(entry, segmentsMap); +} +application.segmentsMap = segmentsMap; +application.projectEntry = projectEntry; + +// Send ACK after worker applies a cache update +const sendAck = sharedCache + ? (updateId) => parentPort.postMessage({ name: 'ack-update', updateId }) + : () => {}; + const logError = (type) => async (err) => { const error = metarhia.metautil.isError(err) ? err : new Error('Unknown'); if (error.name === 'ExperimentalWarning') return; @@ -41,6 +59,27 @@ const invoke = async ({ method, args, exclusive = false }) => { }; const handlers = { + 'file-update': (msg) => { + const { target, updateId, updates, newSegments } = msg; + if (newSegments) { + for (const seg of newSegments) segmentsMap.set(seg.id, seg.sab); + } + const projected = new Map(); + for (const [key, entry] of updates) { + projected.set(key, projectEntry(entry)); + } + const place = application[target]; + if (place) place.updateFiles(projected); + sendAck(updateId); + }, + + 'file-delete': (msg) => { + const { target, updateId, keys } = msg; + const place = application[target]; + if (place) place.deleteFiles(keys); + sendAck(updateId); + }, + ready: async () => { application.emit('ready'); }, @@ -113,7 +152,7 @@ parentPort.on('message', async (msg) => { process.exit(0); } - await application.load({ invoke }); + await application.load({ invoke, sharedCache: workerData.sharedCache }); console.info(`Application started in worker ${threadId}`); parentPort.postMessage({ name: 'started', kind: workerData.kind }); })().catch(logError(`Can not start worker ${threadId}`)); diff --git a/schemas/config/cache.js b/schemas/config/cache.js index 4d44a550a..56245606c 100644 --- a/schemas/config/cache.js +++ b/schemas/config/cache.js @@ -1,5 +1,23 @@ ({ size: 'size', maxFileSize: 'size', + streamThreshold: { type: 'size', required: false }, + virtualFS: { type: 'boolean', required: false }, avoid: { array: 'string', required: false }, + placements: { + array: { + schema: { + name: 'string', + ext: { array: 'string', required: false }, + }, + }, + required: false, + }, + sab: { + schema: { + limit: 'size', + baseSegmentSize: 'size', + }, + required: false, + }, }); diff --git a/test/cache-shared.js b/test/cache-shared.js new file mode 100644 index 000000000..8ae4dd765 --- /dev/null +++ b/test/cache-shared.js @@ -0,0 +1,160 @@ +'use strict'; + +const { test } = require('node:test'); +const assert = require('node:assert'); +const path = require('node:path'); +const { Static } = require('../lib/static.js'); +const { FilesystemCache } = require('../lib/cache/FilesystemCache.js'); + +const root = process.cwd(); + +const application = { + path: path.join(root, 'test'), + watcher: { watch() {} }, + absolute(relative) { + return path.join(this.path, relative); + }, +}; + +// --- FilesystemCache (limit backend) --- + +test('FilesystemCache - should load files into segments', async () => { + const seg = 1024; + const options = { limit: seg, maxFileSize: seg, baseSegmentSize: seg }; + const cache = new FilesystemCache(options); + const filesMap = new Map(); + const data = Buffer.from('hello world'); + const stat = { size: data.byteLength }; + filesMap.set('/test.js', { data, stat, path: '/test.js' }); + await cache.load('static', filesMap); + const index = cache.filesystems.static; + assert.ok(index); + const entry = index.entries.get('/test.js'); + assert.strictEqual(entry.kind, 'shared'); + assert.strictEqual(entry.length, data.byteLength); +}); + +test('FilesystemCache - project creates Buffer view', async () => { + const seg = 1024; + const options = { limit: seg, maxFileSize: seg, baseSegmentSize: seg }; + const cache = new FilesystemCache(options); + const data = Buffer.from('test data'); + const stat = { size: data.byteLength }; + const filesMap = new Map([['/f.js', { data, stat, path: '/f.js' }]]); + await cache.load('static', filesMap); + const snap = cache.snapshot(); + const segmentsMap = new Map(); + for (const seg of snap.segments) segmentsMap.set(seg.id, seg.sab); + const files = FilesystemCache.project(snap.filesystems.static, segmentsMap); + const file = files.get('/f.js'); + assert.ok(file.data instanceof Buffer); + assert.deepStrictEqual(file.data, data); + assert.strictEqual(file.stat, stat); +}); + +test('FilesystemCache - zero-byte files stay shared without segments', async () => { + const seg = 1024; + const options = { limit: seg, maxFileSize: seg, baseSegmentSize: seg }; + const cache = new FilesystemCache(options); + const stat = { size: 0 }; + const filesMap = new Map([['/empty.txt', { data: Buffer.alloc(0), stat, path: '/empty.txt' }]]); + await cache.load('static', filesMap); + const entry = cache.filesystems.static.entries.get('/empty.txt'); + assert.deepStrictEqual(entry, { + kind: 'shared', + segmentId: 0, + offset: 0, + length: 0, + stat, + }); + const snapshot = cache.snapshot(); + assert.deepStrictEqual(snapshot.segments, []); +}); + +test('FilesystemCache - projectEntry returns empty Buffer for zero-byte files', () => { + const stat = { size: 0 }; + const file = FilesystemCache.projectEntry( + { kind: 'shared', segmentId: 0, offset: 0, length: 0, stat }, + new Map(), + ); + assert.ok(file.data instanceof Buffer); + assert.strictEqual(file.data.length, 0); + assert.strictEqual(file.stat, stat); +}); + +// --- Static (worker side) --- + +test('Static setFiles - populate from projected entries', () => { + const st = new Static('lib', application); + const data = Buffer.from('hello world'); + const stat = { size: data.byteLength }; + const files = new Map([['/index.html', { data, stat }]]); + st.setFiles(files); + assert.strictEqual(st.files.size, 1); + const file = st.get('/index.html'); + assert.ok(file.data instanceof Buffer); + assert.deepStrictEqual(file.data, data); +}); + +test('Static updateFiles - updates entries', () => { + const st = new Static('lib', application); + const data1 = Buffer.from('version 1'); + const stat1 = { size: data1.byteLength }; + st.setFiles(new Map([['/f.js', { data: data1, stat: stat1 }]])); + const data2 = Buffer.from('version 2 updated'); + const stat2 = { size: data2.byteLength }; + st.updateFiles(new Map([['/f.js', { data: data2, stat: stat2 }]])); + const file = st.get('/f.js'); + assert.deepStrictEqual(file.data, data2); +}); + +test('Static deleteFiles - removes entries by keys', () => { + const st = new Static('lib', application); + const sab = new SharedArrayBuffer(4); + new Uint8Array(sab).set([1, 2, 3, 4]); + const data = Buffer.from(sab, 0, 4); + const stat = { size: 4 }; + st.setFiles( + new Map([ + ['/a.js', { data, stat }], + ['/b.js', { data, stat }], + ]), + ); + assert.strictEqual(st.files.size, 2); + st.deleteFiles(['/a.js']); + assert.strictEqual(st.files.size, 1); + assert.strictEqual(st.get('/a.js'), undefined); + assert.ok(st.get('/b.js')); +}); + +test('Static - disk entry has null data', () => { + const st = new Static('lib', application); + const stat = { size: 20000000 }; + st.setFiles(new Map([['/big.bin', { data: null, stat, path: '/big.bin' }]])); + const file = st.get('/big.bin'); + assert.strictEqual(file.data, null); + assert.strictEqual(file.stat.size, 20000000); +}); + +test('Static - SAB data is zero-copy view', () => { + const sab = new SharedArrayBuffer(5); + new Uint8Array(sab).set([10, 20, 30, 40, 50]); + const data = Buffer.from(sab, 0, 5); + const stat = { size: 5 }; + const st = new Static('lib', application); + st.setFiles(new Map([['/f.bin', { data, stat }]])); + const file = st.get('/f.bin'); + assert.strictEqual(file.data.buffer, sab); +}); + +test('Static setFiles clears previous entries', () => { + const st = new Static('lib', application); + const data = Buffer.from([1, 2]); + const stat = { size: 2 }; + st.setFiles(new Map([['/old.js', { data, stat }]])); + assert.strictEqual(st.files.size, 1); + st.setFiles(new Map([['/new.js', { data, stat }]])); + assert.strictEqual(st.files.size, 1); + assert.strictEqual(st.get('/old.js'), undefined); + assert.ok(st.get('/new.js')); +}); diff --git a/test/static.js b/test/static.js index 7d3373f8c..9b8d534cc 100644 --- a/test/static.js +++ b/test/static.js @@ -3,6 +3,7 @@ const { test } = require('node:test'); const assert = require('node:assert'); const path = require('node:path'); +const { Readable } = require('node:stream'); const { Static } = require('../lib/static.js'); const root = process.cwd(); @@ -15,20 +16,209 @@ const application = { }, }; -test('lib/static load - should load static files correctly', async () => { - const cache = new Static('lib', application); - assert.strictEqual(cache.files instanceof Map, true); - assert.strictEqual(cache.files.size, 0); - assert.strictEqual(cache.ext, undefined); - assert.strictEqual(cache.maxFileSize, -1); - assert.strictEqual(cache.get('/example/add.js'), undefined); - - await cache.load(); - assert.strictEqual(cache.files.size, 13); - const file = cache.get('/example/add.js'); - assert.strictEqual(file.data instanceof Buffer, true); - assert.strictEqual(file.data.length, 158); - assert.strictEqual(cache.get('/example/unknown.js'), undefined); - assert.strictEqual(cache.ext, undefined); - assert.strictEqual(cache.maxFileSize, 10000000); +// Capture transport.write() calls +const makeTransport = (headers = {}) => { + const result = {}; + return { + req: { headers }, + write(data, code, ext, options) { + result.data = data; + result.code = code; + result.ext = ext; + result.options = options; + }, + result, + }; +}; + +// Build a SAB-backed file entry matching the projection shape from cache backends +const makeSABFile = (raw) => { + const sab = new SharedArrayBuffer(raw.byteLength); + new Uint8Array(sab).set(raw); + const data = Buffer.from(sab, 0, raw.byteLength); + const stat = { size: raw.byteLength, isFile: () => true }; + return { data, stat }; +}; + +// Build a Static with serving initialized; override threshold after if needed +const makeStatic = () => { + const st = new Static('lib', application); + st.initServing({}); + return st; +}; + +// --- Constructor --- + +test('lib/static - should create Static correctly', () => { + const st = new Static('lib', application); + assert.strictEqual(st.files instanceof Map, true); + assert.strictEqual(st.files.size, 0); + assert.strictEqual(st.get('/example/add.js'), undefined); +}); + +// --- initServing --- + +test('lib/static - initServing sets search function and finite streamThreshold', () => { + const st = makeStatic(); + assert.strictEqual(typeof st.search, 'function'); + assert.ok(Number.isFinite(st.streamThreshold)); + assert.ok(st.streamThreshold > 0); +}); + +// --- serve: exact-hit, small file (200 + Buffer) --- + +test('lib/static serve - exact-hit small file returns buffer and 200', async () => { + const st = makeStatic(); + const file = makeSABFile(Buffer.from('hello')); + st.setFiles(new Map([['/f.txt', file]])); + const t = makeTransport(); + await st.serve('/f.txt', t); + assert.strictEqual(t.result.code, 200); + assert.ok(Buffer.isBuffer(t.result.data)); + assert.strictEqual(t.result.ext, 'txt'); + assert.deepStrictEqual(t.result.data, file.data); +}); + +// --- serve: exact-hit, large file above threshold (200 + Readable) --- + +test('lib/static serve - large file above threshold streams as Readable', async () => { + const st = makeStatic(); + st.streamThreshold = 100; + const raw = Buffer.alloc(200, 0x41); // 200 bytes > threshold + const file = makeSABFile(raw); + st.setFiles(new Map([['/big.bin', file]])); + const t = makeTransport(); + await st.serve('/big.bin', t); + assert.strictEqual(t.result.code, 200); + assert.ok(t.result.data instanceof Readable); + assert.deepStrictEqual(t.result.options, { size: 200 }); +}); + +test('lib/static serve - streamed response delivers correct bytes', async () => { + const st = makeStatic(); + st.streamThreshold = 100; + const raw = Buffer.alloc(200, 0x42); + const file = makeSABFile(raw); + st.setFiles(new Map([['/big.bin', file]])); + const t = makeTransport(); + await st.serve('/big.bin', t); + const chunks = []; + for await (const chunk of t.result.data) chunks.push(chunk); + assert.deepStrictEqual(Buffer.concat(chunks), raw); +}); + +// --- serve: exact-hit, Range request, small file (206 + subarray) --- + +test('lib/static serve - range request on small file returns 206 with subarray', async () => { + const st = makeStatic(); + const raw = Buffer.from('0123456789'); + const file = makeSABFile(raw); + st.setFiles(new Map([['/data.bin', file]])); + const t = makeTransport({ range: 'bytes=2-5' }); + await st.serve('/data.bin', t); + assert.strictEqual(t.result.code, 206); + assert.ok(Buffer.isBuffer(t.result.data)); + assert.strictEqual(t.result.data.toString(), '2345'); + assert.deepStrictEqual(t.result.options, { start: 2, end: 5, size: 10 }); +}); + +// --- serve: exact-hit, Range request, large file (206 + Readable) --- + +test('lib/static serve - range request on large file returns 206 with Readable', async () => { + const st = makeStatic(); + st.streamThreshold = 100; + const raw = Buffer.alloc(200, 0x43); + const file = makeSABFile(raw); + st.setFiles(new Map([['/big.bin', file]])); + const t = makeTransport({ range: 'bytes=0-99' }); + await st.serve('/big.bin', t); + assert.strictEqual(t.result.code, 206); + assert.ok(t.result.data instanceof Readable); + assert.deepStrictEqual(t.result.options, { start: 0, end: 99, size: 200 }); +}); + +// --- serve: invalid range → 416 --- + +test('lib/static serve - invalid range (start >= size) returns 416', async () => { + const st = makeStatic(); + const file = makeSABFile(Buffer.from('hello')); + st.setFiles(new Map([['/f.txt', file]])); + const t = makeTransport({ range: 'bytes=10-20' }); // start 10 >= size 5 + await st.serve('/f.txt', t); + assert.strictEqual(t.result.code, 416); + assert.ok(Buffer.isBuffer(t.result.data)); +}); + +// --- serve: query string is stripped --- + +test('lib/static serve - query string is stripped from path', async () => { + const st = makeStatic(); + const file = makeSABFile(Buffer.from('body {}')); + st.setFiles(new Map([['/style.css', file]])); + const t = makeTransport(); + await st.serve('/style.css?v=42', t); + assert.strictEqual(t.result.code, 200); + assert.deepStrictEqual(t.result.data, file.data); +}); + +// --- serve: zero-byte file --- + +test('lib/static serve - zero-byte SAB file is served without error', async () => { + const st = makeStatic(); + const sab = new SharedArrayBuffer(0); + const data = Buffer.from(sab, 0, 0); + const stat = { size: 0, isFile: () => true }; + st.setFiles(new Map([['/empty.txt', { data, stat }]])); + const t = makeTransport(); + await st.serve('/empty.txt', t); + assert.strictEqual(t.result.code, 200); + assert.ok(Buffer.isBuffer(t.result.data)); + assert.strictEqual(t.result.data.byteLength, 0); +}); + +// --- serve: recursive lookup — directory → index.html --- + +test('lib/static serve - directory path resolves to index.html (200, full buffer)', async () => { + const st = makeStatic(); + const file = makeSABFile(Buffer.from('')); + st.setFiles(new Map([['/dir/index.html', file]])); + const t = makeTransport(); + await st.serve('/dir/', t); + assert.strictEqual(t.result.code, 200); + assert.deepStrictEqual(t.result.data, file.data); +}); + +test('lib/static serve - index.html via directory lookup ignores Range header', async () => { + // Verifies the documented intentional omission of Range handling in recursive path + const st = makeStatic(); + const file = makeSABFile(Buffer.from('')); + st.setFiles(new Map([['/dir/index.html', file]])); + const t = makeTransport({ range: 'bytes=0-3' }); + await st.serve('/dir/', t); + // Must be 200, NOT 206 — recursive path skips Range handling by design + assert.strictEqual(t.result.code, 200); + assert.deepStrictEqual(t.result.data, file.data); +}); + +// --- serve: internal files (starting with '.') bypass exact-hit path --- + +test('lib/static serve - internal file bypasses exact-hit and ignores Range', async () => { + const st = makeStatic(); + const file = makeSABFile(Buffer.from('

custom 404

')); + st.setFiles(new Map([['/dir/.404.html', file]])); + const t = makeTransport({ range: 'bytes=0-3' }); + await st.serve('/dir/.404.html', t); + // Internal file goes through lookup, not exact-hit — no Range handling → no 206 + assert.strictEqual(t.result.code, 200); + assert.ok(Buffer.isBuffer(t.result.data)); +}); + +// --- serve: 404 --- + +test('lib/static serve - returns 404 for missing file', async () => { + const st = makeStatic(); + const t = makeTransport(); + await st.serve('/not-found.txt', t); + assert.strictEqual(t.result.code, 404); + assert.ok(Buffer.isBuffer(t.result.data)); });