diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index ec9deb15bf..deed185604 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -3,6 +3,7 @@ """ import asyncio +import json from functools import lru_cache from lightrag.utils import logger, get_pinyin_sort_key import aiofiles @@ -229,6 +230,7 @@ class InsertTextRequest(BaseModel): Attributes: text: The text content to be inserted into the RAG system file_source: Source of the text (optional) + metadata: Additional metadata about the document (optional) """ text: str = Field( @@ -238,6 +240,9 @@ class InsertTextRequest(BaseModel): file_source: Optional[str] = Field( default=None, min_length=0, description="File Source" ) + metadata: Optional[dict[str, Any]] = Field( + default=None, description="Additional metadata about the document" + ) @field_validator("text", mode="after") @classmethod @@ -254,6 +259,7 @@ def normalize_source_before(cls, file_source: Optional[str]) -> str: "example": { "text": "This is a sample text to be inserted into the RAG system.", "file_source": "Source of the text (optional)", + "metadata": {"author": "John Doe", "category": "research"}, } } ) @@ -265,6 +271,7 @@ class InsertTextsRequest(BaseModel): Attributes: texts: List of text contents to be inserted into the RAG system file_sources: Sources of the texts (optional) + metadata: Single metadata dict (applied to all texts) or list of metadata dicts (one per text) """ texts: list[str] = Field( @@ -274,6 +281,10 @@ class InsertTextsRequest(BaseModel): file_sources: Optional[list[str]] = Field( default=None, min_length=0, description="Sources of the texts" ) + metadata: Optional[dict[str, Any] | list[dict[str, Any]]] = Field( + default=None, + description="Single metadata dict (applied to all texts) or list of metadata dicts (one per text)", + ) @field_validator("texts", mode="after") @classmethod @@ -300,6 +311,7 @@ def normalize_sources_before( "file_sources": [ "First file source (optional)", ], + "metadata": {"author": "John Doe", "year": 2026}, } } ) @@ -311,14 +323,17 @@ class InsertResponse(BaseModel): Attributes: status: Status of the operation (success, duplicated, partial_success, failure) message: Detailed message describing the operation result - track_id: Tracking ID for monitoring processing status + track_id: Tracking ID for monitoring processing status (optional for non-processing operations like metadata update) """ status: Literal["success", "duplicated", "partial_success", "failure"] = Field( description="Status of the operation" ) message: str = Field(description="Message describing the operation result") - track_id: str = Field(description="Tracking ID for monitoring processing status") + track_id: Optional[str] = Field( + default=None, + description="Tracking ID for monitoring processing status (None for operations that don't require tracking)", + ) model_config = ConfigDict( json_schema_extra={ @@ -331,6 +346,23 @@ class InsertResponse(BaseModel): ) +class UpdateMetadataRequest(BaseModel): + """Request model for updating document metadata + + Attributes: + metadata: New metadata dictionary to replace existing metadata + """ + + metadata: dict[str, Any] = Field( + description="New metadata to replace existing document metadata" + ) + + class Config: + json_schema_extra = { + "example": {"metadata": {"author": "John Doe", "year": 2026}} + } + + class ClearDocumentsResponse(BaseModel): """Response model for document clearing operation @@ -489,7 +521,7 @@ class DocStatusResponse(BaseModel): "track_id": "upload_20250729_170612_abc123", "chunks_count": 12, "error": None, - "metadata": {"author": "John Doe", "year": 2025}, + "metadata": {"author": "John Doe", "year": 2026}, "file_path": "research_paper.pdf", } } @@ -595,7 +627,7 @@ class TrackStatusResponse(BaseModel): "track_id": "upload_20250729_170612_abc123", "chunks_count": 12, "error": None, - "metadata": {"author": "John Doe", "year": 2025}, + "metadata": {"author": "John Doe", "year": 2026}, "file_path": "research_paper.pdf", } ], @@ -708,7 +740,7 @@ class PaginatedDocsResponse(BaseModel): "track_id": "upload_20250729_170612_abc123", "chunks_count": 12, "error_msg": None, - "metadata": {"author": "John Doe", "year": 2025}, + "metadata": {"author": "John Doe", "year": 2026}, "file_path": "research_paper.pdf", } ], @@ -1226,8 +1258,46 @@ def escape_sheet_title(title: str) -> str: return "\n".join(content_parts) +# System-reserved metadata fields that cannot be modified by users +RESERVED_METADATA_FIELDS = { + "is_duplicate", + "original_doc_id", + "original_track_id", + "error_type", + "processing_start_time", + "processing_end_time", +} + + +def reorder_metadata(metadata: dict[str, Any] | None) -> dict[str, Any]: + """Reorder metadata to show system fields first, then user fields. + + Args: + metadata: Metadata dictionary (can be None) + + Returns: + Reordered metadata dictionary with system fields first + """ + if not metadata: + return metadata or {} + + # Separate reserved and user fields + reserved_fields = { + k: v for k, v in metadata.items() if k in RESERVED_METADATA_FIELDS + } + user_fields = { + k: v for k, v in metadata.items() if k not in RESERVED_METADATA_FIELDS + } + + # Return with system fields first, then user fields + return {**reserved_fields, **user_fields} + + async def pipeline_enqueue_file( - rag: LightRAG, file_path: Path, track_id: str = None + rag: LightRAG, + file_path: Path, + track_id: str = None, + metadata: dict[str, Any] = None, ) -> tuple[bool, str]: """Add a file to the queue for processing @@ -1235,6 +1305,7 @@ async def pipeline_enqueue_file( rag: LightRAG instance file_path: Path to the saved file track_id: Optional tracking ID, if not provided will be generated + metadata: Optional metadata to attach to the document Returns: tuple: (success: bool, track_id: str) """ @@ -1599,7 +1670,10 @@ async def pipeline_enqueue_file( try: await rag.apipeline_enqueue_documents( - content, file_paths=file_path.name, track_id=track_id + content, + file_paths=file_path.name, + track_id=track_id, + metadata=metadata, ) logger.info( @@ -1683,17 +1757,23 @@ async def pipeline_enqueue_file( logger.error(f"Error deleting file {file_path}: {str(e)}") -async def pipeline_index_file(rag: LightRAG, file_path: Path, track_id: str = None): +async def pipeline_index_file( + rag: LightRAG, + file_path: Path, + track_id: str = None, + metadata: dict[str, Any] = None, +): """Index a file with track_id Args: rag: LightRAG instance file_path: Path to the saved file track_id: Optional tracking ID + metadata: Optional metadata to attach to the document """ try: success, returned_track_id = await pipeline_enqueue_file( - rag, file_path, track_id + rag, file_path, track_id, metadata ) if success: await rag.apipeline_process_enqueue_documents() @@ -1742,6 +1822,7 @@ async def pipeline_index_texts( texts: List[str], file_sources: List[str] = None, track_id: str = None, + metadata: dict[str, Any] | list[dict[str, Any]] | None = None, ): """Index a list of texts with track_id @@ -1750,6 +1831,7 @@ async def pipeline_index_texts( texts: The texts to index file_sources: Sources of the texts track_id: Optional tracking ID + metadata: Single metadata dict (applied to all texts) or list of metadata dicts (one per text) """ if not texts: return @@ -1767,7 +1849,10 @@ async def pipeline_index_texts( ) await rag.apipeline_enqueue_documents( - input=texts, file_paths=normalized_file_sources, track_id=track_id + input=texts, + file_paths=normalized_file_sources, + track_id=track_id, + metadata=metadata, ) await rag.apipeline_process_enqueue_documents() @@ -2116,7 +2201,9 @@ async def scan_for_new_documents(background_tasks: BackgroundTasks): "/upload", response_model=InsertResponse, dependencies=[Depends(combined_auth)] ) async def upload_to_input_dir( - background_tasks: BackgroundTasks, file: UploadFile = File(...) + background_tasks: BackgroundTasks, + file: UploadFile = File(...), + metadata: str | None = None, ): """ Upload a file to the input directory and index it. @@ -2130,6 +2217,11 @@ async def upload_to_input_dir( - Set to `None` or `0` for unlimited upload size - Returns HTTP 413 (Request Entity Too Large) if file exceeds limit + **Metadata Support:** + - Optional `metadata` form field accepts a JSON string + - Example: `{"author": "John Doe", "year": 2026}` + - Metadata is stored with the document and can be retrieved later + **Duplicate Detection Behavior:** This endpoint handles two types of duplicate scenarios differently: @@ -2160,6 +2252,7 @@ async def upload_to_input_dir( Args: background_tasks: FastAPI BackgroundTasks for async processing file (UploadFile): The file to be uploaded. It must have an allowed extension. + metadata (str): Optional JSON string containing metadata for the document Returns: InsertResponse: A response object containing the upload status and a message. @@ -2167,9 +2260,26 @@ async def upload_to_input_dir( - status="duplicated": Filename already exists (see track_id for existing document) Raises: - HTTPException: If the file type is not supported (400), file too large (413), or other errors occur (500). + HTTPException: If the file type is not supported (400), file too large (413), + invalid metadata JSON (400), or other errors occur (500). """ try: + # Parse metadata if provided + parsed_metadata = None + if metadata: + try: + parsed_metadata = json.loads(metadata) + if not isinstance(parsed_metadata, dict): + raise HTTPException( + status_code=400, + detail="Metadata must be a JSON object (dict)", + ) + except json.JSONDecodeError as e: + raise HTTPException( + status_code=400, + detail=f"Invalid metadata JSON: {str(e)}", + ) + # Sanitize filename to prevent Path Traversal attacks safe_filename = sanitize_filename(file.filename, doc_manager.input_dir) @@ -2264,7 +2374,9 @@ async def upload_to_input_dir( track_id = generate_track_id("upload") # Add to background tasks and get track_id - background_tasks.add_task(pipeline_index_file, rag, file_path, track_id) + background_tasks.add_task( + pipeline_index_file, rag, file_path, track_id, parsed_metadata + ) return InsertResponse( status="success", @@ -2346,6 +2458,7 @@ async def insert_text( [request.text], file_sources=[request.file_source], track_id=track_id, + metadata=request.metadata, ) return InsertResponse( @@ -2429,6 +2542,7 @@ async def insert_texts( request.texts, file_sources=request.file_sources, track_id=track_id, + metadata=request.metadata, ) return InsertResponse( @@ -3220,6 +3334,147 @@ async def get_document_status_counts() -> StatusCountsResponse: logger.error(traceback.format_exc()) raise HTTPException(status_code=500, detail=str(e)) + @router.get( + "/{document_id}", + response_model=DocStatusResponse, + dependencies=[Depends(combined_auth)], + ) + async def get_document_by_id(document_id: str) -> DocStatusResponse: + """ + Get a single document by ID. + + This endpoint retrieves the processing status and details of a specific document + using its document ID. + + Args: + document_id: The unique identifier for the document + + Returns: + DocStatusResponse: A response object containing the document details + + Raises: + HTTPException: If document is not found (404) or an error occurs (500). + """ + try: + doc = await rag.doc_status.get_by_id(document_id) + if doc is None: + raise HTTPException( + status_code=404, + detail=f"Document with ID '{document_id}' not found", + ) + + # Convert DocProcessingStatus to DocStatusResponse format + # Reorder metadata to ensure system fields appear first + return DocStatusResponse( + id=document_id, + content_summary=doc.get("content_summary"), + content_length=doc.get("content_length"), + file_path=doc.get("file_path"), + status=doc.get("status"), + created_at=doc.get("created_at"), + updated_at=doc.get("updated_at"), + track_id=doc.get("track_id"), + chunks_count=doc.get("chunks_count"), + error_msg=doc.get("error_msg"), + metadata=reorder_metadata(doc.get("metadata")), + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving document {document_id}: {str(e)}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + + @router.patch( + "/{document_id}/metadata", + response_model=InsertResponse, + dependencies=[Depends(combined_auth)], + ) + async def update_document_metadata( + document_id: str, request: UpdateMetadataRequest + ) -> InsertResponse: + """ + Update metadata for a specific document. + + This endpoint allows updating the metadata of an existing document without + reprocessing the document content. User metadata can be updated, but system-reserved + fields are protected and will be preserved. + + **Reserved System Fields (cannot be modified):** + - `is_duplicate` - System flag for duplicate documents + - `original_doc_id` - Original document ID for duplicates + - `original_track_id` - Original tracking ID for duplicates + - `processing_start_time` - Processing start timestamp + - `processing_end_time` - Processing end timestamp + + Args: + document_id: The unique identifier for the document + request: Request body containing the new metadata dictionary + + Returns: + InsertResponse: A response indicating success or failure + + Raises: + HTTPException: If document is not found (404), metadata is invalid (400), + attempts to modify reserved fields (400), or an error occurs (500). + """ + try: + # Validate metadata is a dict + if not isinstance(request.metadata, dict): + raise HTTPException( + status_code=400, detail="Metadata must be a dictionary" + ) + + # Check for attempts to modify reserved fields + reserved_fields_in_request = ( + set(request.metadata.keys()) & RESERVED_METADATA_FIELDS + ) + if reserved_fields_in_request: + raise HTTPException( + status_code=400, + detail=f"Cannot modify reserved system fields: {', '.join(sorted(reserved_fields_in_request))}. " + f"Reserved fields: {', '.join(sorted(RESERVED_METADATA_FIELDS))}", + ) + + # Check if document exists + doc = await rag.doc_status.get_by_id(document_id) + if doc is None: + raise HTTPException( + status_code=404, + detail=f"Document with ID '{document_id}' not found", + ) + + # Get existing metadata and preserve reserved fields + existing_metadata = doc.get("metadata", {}) + reserved_metadata = { + key: existing_metadata[key] + for key in RESERVED_METADATA_FIELDS + if key in existing_metadata + } + + # Merge preserved reserved fields and user metadata + merged_metadata = {**reserved_metadata, **request.metadata} + + # Update metadata (doc is a dict, not an object) via upsert + doc["metadata"] = merged_metadata + await rag.doc_status.upsert({document_id: doc}) + + return InsertResponse( + status="success", + message=f"Metadata updated for document '{document_id}'", + track_id=None, + ) + + except HTTPException: + raise + except Exception as e: + logger.error( + f"Error updating metadata for document {document_id}: {str(e)}" + ) + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=str(e)) + @router.post( "/reprocess_failed", response_model=ReprocessResponse, diff --git a/lightrag/api/routers/query_routes.py b/lightrag/api/routers/query_routes.py index 1a9405f89e..638348aa0b 100644 --- a/lightrag/api/routers/query_routes.py +++ b/lightrag/api/routers/query_routes.py @@ -105,6 +105,11 @@ class QueryRequest(BaseModel): description="If True, includes actual chunk text content in references. Only applies when include_references=True. Useful for evaluation and debugging.", ) + include_metadata: Optional[bool] = Field( + default=False, + description="If True, retrieves document metadata for each chunk using the full_doc_id. Metadata is looked up on-demand from document storage.", + ) + stream: Optional[bool] = Field( default=True, description="If True, enables streaming output for real-time responses. Only affects /query/stream endpoint.", @@ -152,6 +157,10 @@ class ReferenceItem(BaseModel): default=None, description="List of chunk contents from this file (only present when include_chunk_content=True)", ) + metadata: Optional[Dict[str, Any]] = Field( + default=None, + description="Document metadata (only present when include_metadata=True)", + ) class QueryResponse(BaseModel): @@ -221,6 +230,10 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60): "items": {"type": "string"}, "description": "List of chunk contents from this file (only included when include_chunk_content=True)", }, + "metadata": { + "type": ["object", "null"], + "description": "Document metadata (included when include_metadata=True)", + }, }, }, "description": "Reference list (only included when include_references=True)", @@ -277,6 +290,32 @@ def create_query_routes(rag, api_key: Optional[str] = None, top_k: int = 60): "response": "Artificial Intelligence (AI) is a branch of computer science that aims to create intelligent machines capable of performing tasks that typically require human intelligence, such as learning, reasoning, and problem-solving." }, }, + "with_metadata": { + "summary": "Response with metadata", + "description": "Example response when include_references=True and include_metadata=True", + "value": { + "response": "Artificial Intelligence (AI) is a branch of computer science that aims to create intelligent machines capable of performing tasks that typically require human intelligence, such as learning, reasoning, and problem-solving.", + "references": [ + { + "reference_id": "1", + "file_path": "/documents/ai_overview.pdf", + "metadata": { + "author": "Jane Doe", + "category": "research", + "institution": "AI Research Institute", + }, + }, + { + "reference_id": "2", + "file_path": "/documents/machine_learning.txt", + "metadata": { + "author": "John Doe", + "year": 2026, + }, + }, + ], + }, + }, "different_modes": { "summary": "Different query modes", "description": "Examples of responses from different query modes", @@ -421,26 +460,46 @@ async def query_text(request: QueryRequest): if not response_content: response_content = "No relevant context found for the query." - # Enrich references with chunk content if requested - if request.include_references and request.include_chunk_content: + # Enrich references with chunk content and/or metadata if requested + if request.include_references and ( + request.include_chunk_content or request.include_metadata + ): chunks = data.get("chunks", []) - # Create a mapping from reference_id to chunk content + # Create mappings from reference_id to chunk content and metadata ref_id_to_content = {} + ref_id_to_metadata = {} + for chunk in chunks: ref_id = chunk.get("reference_id", "") - content = chunk.get("content", "") - if ref_id and content: - # Collect chunk content; join later to avoid quadratic string concatenation - ref_id_to_content.setdefault(ref_id, []).append(content) - - # Add content to references + if ref_id: + # Collect chunk content if requested + if request.include_chunk_content: + content = chunk.get("content", "") + if content: + ref_id_to_content.setdefault(ref_id, []).append(content) + + # Collect metadata if requested (use first non-None metadata for each ref_id) + if ( + request.include_metadata + and ref_id not in ref_id_to_metadata + ): + metadata = chunk.get("metadata") + if metadata is not None: + ref_id_to_metadata[ref_id] = metadata + + # Add content and/or metadata to references enriched_references = [] for ref in references: ref_copy = ref.copy() ref_id = ref.get("reference_id", "") + if ref_id in ref_id_to_content: # Keep content as a list of chunks (one file may have multiple chunks) ref_copy["content"] = ref_id_to_content[ref_id] + + if ref_id in ref_id_to_metadata: + ref_copy["metadata"] = ref_id_to_metadata[ref_id] + enriched_references.append(ref_copy) references = enriched_references @@ -478,6 +537,11 @@ async def query_text(request: QueryRequest): "description": "Multiple NDJSON lines when stream=True, include_references=True, and include_chunk_content=True. First line contains references with content arrays (one file may have multiple chunks), subsequent lines contain response chunks.", "value": '{"references": [{"reference_id": "1", "file_path": "/documents/ai_overview.pdf", "content": ["Artificial Intelligence (AI) represents a transformative field...", "AI systems can be categorized into narrow AI and general AI..."]}, {"reference_id": "2", "file_path": "/documents/ml_basics.txt", "content": ["Machine learning is a subset of AI that enables computers to learn..."]}]}\n{"response": "Artificial Intelligence (AI) is a branch of computer science"}\n{"response": " that aims to create intelligent machines capable of performing"}\n{"response": " tasks that typically require human intelligence."}', }, + "streaming_with_metadata": { + "summary": "Streaming mode with metadata (stream=true, include_metadata=true)", + "description": "Multiple NDJSON lines when stream=True, include_references=True, and include_metadata=True. First line contains references with metadata objects, subsequent lines contain response chunks.", + "value": '{"references": [{"reference_id": "1", "file_path": "/documents/ai_overview.pdf", "metadata": {"author": "Jane Doe", "category": "research", "institution": "AI Research Institute"}}, {"reference_id": "2", "file_path": "/documents/ml_basics.txt", "metadata": {"author": "John Doe", "year": 2026}}]}\n{"response": "Artificial Intelligence (AI) is a branch of computer science"}\n{"response": " that aims to create intelligent machines capable of performing"}\n{"response": " tasks that typically require human intelligence."}', + }, "streaming_without_references": { "summary": "Streaming mode without references (stream=true)", "description": "Multiple NDJSON lines when stream=True and include_references=False. Only response chunks are sent.", @@ -674,27 +738,49 @@ async def stream_generator(): references = result.get("data", {}).get("references", []) llm_response = result.get("llm_response", {}) - # Enrich references with chunk content if requested - if request.include_references and request.include_chunk_content: + # Enrich references with chunk content and/or metadata if requested + if request.include_references and ( + request.include_chunk_content or request.include_metadata + ): data = result.get("data", {}) chunks = data.get("chunks", []) - # Create a mapping from reference_id to chunk content + # Create mappings from reference_id to chunk content and metadata ref_id_to_content = {} + ref_id_to_metadata = {} + for chunk in chunks: ref_id = chunk.get("reference_id", "") - content = chunk.get("content", "") - if ref_id and content: - # Collect chunk content - ref_id_to_content.setdefault(ref_id, []).append(content) - - # Add content to references + if ref_id: + # Collect chunk content if requested + if request.include_chunk_content: + content = chunk.get("content", "") + if content: + ref_id_to_content.setdefault(ref_id, []).append( + content + ) + + # Collect metadata if requested (use first non-None metadata for each ref_id) + if ( + request.include_metadata + and ref_id not in ref_id_to_metadata + ): + metadata = chunk.get("metadata") + if metadata is not None: + ref_id_to_metadata[ref_id] = metadata + + # Add content and/or metadata to references enriched_references = [] for ref in references: ref_copy = ref.copy() ref_id = ref.get("reference_id", "") + if ref_id in ref_id_to_content: # Keep content as a list of chunks (one file may have multiple chunks) ref_copy["content"] = ref_id_to_content[ref_id] + + if ref_id in ref_id_to_metadata: + ref_copy["metadata"] = ref_id_to_metadata[ref_id] + enriched_references.append(ref_copy) references = enriched_references @@ -804,9 +890,13 @@ async def stream_generator(): "file_path": {"type": "string"}, "chunk_id": {"type": "string"}, "reference_id": {"type": "string"}, + "metadata": { + "type": ["object", "null"], + "description": "Document metadata (included when include_metadata=True)", + }, }, }, - "description": "Retrieved text chunks from vector database", + "description": "Retrieved text chunks from vector database. Set include_metadata=true to include document metadata.", }, "references": { "type": "array", @@ -1001,6 +1091,60 @@ async def stream_generator(): }, }, }, + "with_metadata": { + "summary": "Query with document metadata", + "description": "Example response when include_metadata=True is specified in the query request", + "value": { + "status": "success", + "message": "Query executed successfully", + "data": { + "entities": [ + { + "entity_name": "Neural Networks", + "entity_type": "TECHNOLOGY", + "description": "Computational models inspired by biological neural networks", + "source_id": "chunk-123", + "file_path": "/documents/ai_basics.pdf", + "reference_id": "1", + } + ], + "relationships": [], + "chunks": [ + { + "content": "Neural networks are computational models that mimic the way biological neural networks work...", + "file_path": "/documents/ai_basics.pdf", + "chunk_id": "chunk-123", + "reference_id": "1", + "metadata": { + "author": "Jane Doe", + "category": "research", + "institution": "AI Research Institute", + }, + } + ], + "references": [ + { + "reference_id": "1", + "file_path": "/documents/ai_basics.pdf", + } + ], + }, + "metadata": { + "query_mode": "local", + "keywords": { + "high_level": ["neural", "networks"], + "low_level": ["computation", "model"], + }, + "processing_info": { + "total_entities_found": 5, + "total_relations_found": 0, + "entities_after_truncation": 1, + "relations_after_truncation": 0, + "final_chunks_count": 1, + }, + }, + }, + }, }, } }, diff --git a/lightrag/base.py b/lightrag/base.py index b6380e7eb5..19013bac46 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -162,6 +162,12 @@ class QueryParam: Default is True to enable reranking when rerank model is available. """ + include_metadata: bool = False + """If True, retrieves document metadata for each chunk using the full_doc_id. + Metadata is looked up on-demand from document storage. This allows queries to + include document-level metadata without storing it with every chunk. + """ + include_references: bool = False """If True, includes reference list in the response for supported endpoints. This parameter controls whether the API response includes a references field diff --git a/lightrag/lightrag.py b/lightrag/lightrag.py index 264c6cbf8a..34618f36fd 100644 --- a/lightrag/lightrag.py +++ b/lightrag/lightrag.py @@ -170,6 +170,30 @@ def _normalize_string_list(raw_values: Any, context: str = "") -> list[str]: return result +_TRANSIENT_RESET_METADATA_FIELDS = { + "processing_start_time", + "processing_end_time", + "error_type", +} + + +def _metadata_for_pending_reset(status_doc: "DocProcessingStatus") -> dict[str, Any]: + """Return metadata to keep when resetting a doc back to PENDING. + + Keeps user-defined metadata and non-transient system keys while removing + processing/error fields that should not survive a reset. + """ + existing_metadata = status_doc.metadata + if not isinstance(existing_metadata, dict): + return {} + + return { + key: value + for key, value in existing_metadata.items() + if key not in _TRANSIENT_RESET_METADATA_FIELDS + } + + @final @dataclass class LightRAG: @@ -1207,6 +1231,7 @@ async def ainsert( ids: str | list[str] | None = None, file_paths: str | list[str] | None = None, track_id: str | None = None, + metadata: dict[str, Any] | list[dict[str, Any]] | None = None, ) -> str: """Async Insert documents with checkpoint support @@ -1219,6 +1244,7 @@ async def ainsert( ids: list of unique document IDs, if not provided, MD5 hash IDs will be generated file_paths: list of file paths corresponding to each document, used for citation track_id: tracking ID for monitoring processing status, if not provided, will be generated + metadata: single metadata dict (applied to all docs) or list of metadata dicts (one per document) Returns: str: tracking ID for monitoring processing status @@ -1227,7 +1253,9 @@ async def ainsert( if track_id is None: track_id = generate_track_id("insert") - await self.apipeline_enqueue_documents(input, ids, file_paths, track_id) + await self.apipeline_enqueue_documents( + input, ids, file_paths, track_id, metadata + ) await self.apipeline_process_enqueue_documents( split_by_character, split_by_character_only ) @@ -1312,6 +1340,7 @@ async def apipeline_enqueue_documents( ids: list[str] | None = None, file_paths: str | list[str] | None = None, track_id: str | None = None, + metadata: dict[str, Any] | list[dict[str, Any]] | None = None, ) -> str: """ Pipeline for Processing Documents @@ -1326,6 +1355,7 @@ async def apipeline_enqueue_documents( ids: list of unique document IDs, if not provided, MD5 hash IDs will be generated file_paths: list of file paths corresponding to each document, used for citation track_id: tracking ID for monitoring processing status, if not provided, will be generated with "enqueue" prefix + metadata: single metadata dict (applied to all docs) or list of metadata dicts (one per document) Returns: str: tracking ID for monitoring processing status @@ -1356,6 +1386,24 @@ async def apipeline_enqueue_documents( # If no file paths provided, use placeholder file_paths = ["unknown_source"] * len(input) + # Process metadata parameter + if metadata is not None: + if isinstance(metadata, dict): + # Single dict: apply to all documents + metadata_list = [metadata.copy() for _ in range(len(input))] + elif isinstance(metadata, list): + # List of dicts: validate length matches + if len(metadata) != len(input): + raise ValueError( + "Number of metadata dicts must match the number of documents" + ) + metadata_list = metadata + else: + raise ValueError("metadata must be a dict or list of dicts") + else: + # No metadata provided + metadata_list = [None] * len(input) + # 1. Validate ids if provided or generate MD5 hash IDs and remove duplicate contents if ids is not None: # Check if the number of IDs matches the number of documents @@ -1368,31 +1416,42 @@ async def apipeline_enqueue_documents( # Generate contents dict and remove duplicates in one pass unique_contents = {} - for id_, doc, path in zip(ids, input, file_paths): + for id_, doc, path, meta in zip(ids, input, file_paths, metadata_list): cleaned_content = sanitize_text_for_encoding(doc) if cleaned_content not in unique_contents: - unique_contents[cleaned_content] = (id_, path) + unique_contents[cleaned_content] = (id_, path, meta) # Reconstruct contents with unique content contents = { - id_: {"content": content, "file_path": file_path} - for content, (id_, file_path) in unique_contents.items() + id_: {"content": content, "file_path": file_path, "metadata": meta} + for content, (id_, file_path, meta) in unique_contents.items() } else: # Clean input text and remove duplicates in one pass unique_content_with_paths = {} - for doc, path in zip(input, file_paths): + for doc, path, meta in zip(input, file_paths, metadata_list): cleaned_content = sanitize_text_for_encoding(doc) if cleaned_content not in unique_content_with_paths: - unique_content_with_paths[cleaned_content] = path + unique_content_with_paths[cleaned_content] = (path, meta) + else: + # Log when duplicate content has different metadata + existing_path, existing_meta = unique_content_with_paths[ + cleaned_content + ] + if meta != existing_meta: + logger.debug( + f"Duplicate content detected: keeping first metadata {existing_meta}, " + f"discarding {meta} (same content, different metadata)" + ) # Generate contents dict of MD5 hash IDs and documents with paths contents = { compute_mdhash_id(content, prefix="doc-"): { "content": content, "file_path": path, + "metadata": meta, } - for content, path in unique_content_with_paths.items() + for content, (path, meta) in unique_content_with_paths.items() } # 2. Generate document initial status (without content) @@ -1407,6 +1466,7 @@ async def apipeline_enqueue_documents( "file_path" ], # Store file path in document status "track_id": track_id, # Store track_id in document status + "metadata": content_data.get("metadata") or {}, # Store user metadata } for id_, content_data in contents.items() } @@ -1438,6 +1498,17 @@ async def apipeline_enqueue_documents( # Create a new record with unique ID for this duplicate attempt dup_record_id = compute_mdhash_id(f"{doc_id}-{track_id}", prefix="dup-") + # Merge user metadata with system duplicate metadata + user_metadata = new_docs.get(doc_id, {}).get("metadata", {}) + system_metadata = { + "is_duplicate": True, + "original_doc_id": doc_id, + "original_track_id": existing_track_id, + } + merged_metadata = { + **user_metadata, + **system_metadata, + } # System metadata takes priority duplicate_docs[dup_record_id] = { "status": DocStatus.FAILED, "content_summary": f"[DUPLICATE] Original document: {doc_id}", @@ -1449,11 +1520,7 @@ async def apipeline_enqueue_documents( "file_path": file_path, "track_id": track_id, # Use current track_id for tracking "error_msg": f"Content already exists. Original doc_id: {doc_id}, Status: {existing_status}", - "metadata": { - "is_duplicate": True, - "original_doc_id": doc_id, - "original_track_id": existing_track_id, - }, + "metadata": merged_metadata, } # Store duplicate records in doc_status @@ -1540,6 +1607,13 @@ async def apipeline_enqueue_error_documents( doc_id_content = f"{file_path}-{error_description}" doc_id = compute_mdhash_id(doc_id_content, prefix="error-") + # Preserve any existing user metadata and add system fields + existing_metadata = error_file.get("metadata", {}) + error_metadata = { + **existing_metadata, + "error_type": "file_extraction_error", + } + error_docs[doc_id] = { "status": DocStatus.FAILED, "content_summary": error_description, @@ -1551,9 +1625,7 @@ async def apipeline_enqueue_error_documents( "updated_at": current_time, "file_path": file_path, "track_id": track_id, - "metadata": { - "error_type": "file_extraction_error", - }, + "metadata": error_metadata, } # Store error documents in doc_status @@ -1681,7 +1753,7 @@ async def _validate_and_fix_document_consistency( "track_id": getattr(status_doc, "track_id", ""), # Clear any error messages and processing metadata "error_msg": "", - "metadata": {}, + "metadata": _metadata_for_pending_reset(status_doc), } # Update the status in to_process_docs as well @@ -1958,6 +2030,12 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]: # Process document in two stages # Stage 1: Process text chunks and docs (parallel execution) + # Preserve user metadata and add system fields + existing_metadata = status_doc.metadata or {} + updated_metadata = { + **existing_metadata, + "processing_start_time": processing_start_time, + } doc_status_task = asyncio.create_task( self.doc_status.upsert( { @@ -1975,9 +2053,7 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]: ).isoformat(), "file_path": file_path, "track_id": status_doc.track_id, # Preserve existing track_id - "metadata": { - "processing_start_time": processing_start_time - }, + "metadata": updated_metadata, } } ) @@ -2057,6 +2133,14 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]: get_failed_chunk_snapshot() ) + # Preserve user metadata and add system fields + existing_metadata = status_doc.metadata or {} + failed_metadata = { + **existing_metadata, + "processing_start_time": processing_start_time, + "processing_end_time": processing_end_time, + } + # Update document status to failed await self.doc_status.upsert( { @@ -2073,10 +2157,7 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]: ).isoformat(), "file_path": file_path, "track_id": status_doc.track_id, # Preserve existing track_id - "metadata": { - "processing_start_time": processing_start_time, - "processing_end_time": processing_end_time, - }, + "metadata": failed_metadata, } } ) @@ -2116,6 +2197,14 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]: # Record processing end time processing_end_time = int(time.time()) + # Preserve user metadata and add system fields + existing_metadata = status_doc.metadata or {} + success_metadata = { + **existing_metadata, + "processing_start_time": processing_start_time, + "processing_end_time": processing_end_time, + } + await self.doc_status.upsert( { doc_id: { @@ -2130,10 +2219,7 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]: ).isoformat(), "file_path": file_path, "track_id": status_doc.track_id, # Preserve existing track_id - "metadata": { - "processing_start_time": processing_start_time, - "processing_end_time": processing_end_time, - }, + "metadata": success_metadata, } } ) @@ -2189,6 +2275,14 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]: get_failed_chunk_snapshot() ) + # Preserve user metadata and add system fields + existing_metadata = status_doc.metadata or {} + merge_failed_metadata = { + **existing_metadata, + "processing_start_time": processing_start_time, + "processing_end_time": processing_end_time, + } + # Update document status to failed await self.doc_status.upsert( { @@ -2203,10 +2297,7 @@ def get_failed_chunk_snapshot() -> tuple[list[str], int]: "updated_at": datetime.now().isoformat(), "file_path": file_path, "track_id": status_doc.track_id, # Preserve existing track_id - "metadata": { - "processing_start_time": processing_start_time, - "processing_end_time": processing_end_time, - }, + "metadata": merge_failed_metadata, } } ) @@ -2643,7 +2734,8 @@ async def aquery_data( "content": str, # Document chunk content "file_path": str, # Origin file path "chunk_id": str, # Unique chunk identifier - "reference_id": str # Reference identifier for citations + "reference_id": str, # Reference identifier for citations + "metadata": dict | None # Document metadata (only when include_metadata=True) } ], "references": [ @@ -2722,6 +2814,7 @@ async def aquery_data( model_func=param.model_func, user_prompt=param.user_prompt, enable_rerank=param.enable_rerank, + include_metadata=param.include_metadata, ) query_result = None @@ -2739,6 +2832,7 @@ async def aquery_data( hashing_kv=self.llm_response_cache, system_prompt=None, chunks_vdb=self.chunks_vdb, + doc_status_storage=self.doc_status, ) elif data_param.mode == "naive": logger.debug(f"[aquery_data] Using naive_query for mode: {data_param.mode}") @@ -2749,16 +2843,19 @@ async def aquery_data( global_config, hashing_kv=self.llm_response_cache, system_prompt=None, + doc_status_storage=self.doc_status, ) elif data_param.mode == "bypass": logger.debug("[aquery_data] Using bypass mode") # bypass mode returns empty data using convert_to_user_format - empty_raw_data = convert_to_user_format( + empty_raw_data = await convert_to_user_format( [], # no entities [], # no relationships [], # no chunks [], # no references "bypass", + doc_status_storage=self.doc_status, + include_metadata=data_param.include_metadata, ) query_result = QueryResult(content="", raw_data=empty_raw_data) else: @@ -2836,6 +2933,7 @@ async def aquery_llm( hashing_kv=self.llm_response_cache, system_prompt=system_prompt, chunks_vdb=self.chunks_vdb, + doc_status_storage=self.doc_status, ) elif param.mode == "naive": query_result = await naive_query( @@ -2845,6 +2943,7 @@ async def aquery_llm( global_config, hashing_kv=self.llm_response_cache, system_prompt=system_prompt, + doc_status_storage=self.doc_status, ) elif param.mode == "bypass": # Bypass mode: directly use LLM without knowledge retrieval diff --git a/lightrag/operate.py b/lightrag/operate.py index 20af067908..81bde18ef0 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -3092,6 +3092,7 @@ async def kg_query( hashing_kv: BaseKVStorage | None = None, system_prompt: str | None = None, chunks_vdb: BaseVectorStorage = None, + doc_status_storage: BaseKVStorage | None = None, ) -> QueryResult | None: """ Execute knowledge graph query and return unified QueryResult object. @@ -3107,6 +3108,7 @@ async def kg_query( hashing_kv: Cache storage system_prompt: System prompt chunks_vdb: Document chunks vector database + doc_status_storage: Document status storage for metadata retrieval (optional) Returns: QueryResult | None: Unified query result object containing: @@ -3166,6 +3168,7 @@ async def kg_query( text_chunks_db, query_param, chunks_vdb, + doc_status_storage, ) if context_result is None: @@ -3475,6 +3478,7 @@ async def _get_vector_context( "content": result["content"], "created_at": result.get("created_at", None), "file_path": result.get("file_path", "unknown_source"), + "full_doc_id": result.get("full_doc_id"), "source_type": "vector", # Mark the source type "chunk_id": result.get("id"), # Add chunk_id for deduplication } @@ -3983,21 +3987,36 @@ async def _build_context_str( chunk_tracking: dict = None, entity_id_to_original: dict = None, relation_id_to_original: dict = None, + doc_status_storage: BaseKVStorage | None = None, ) -> tuple[str, dict[str, Any]]: """ Build the final LLM context string with token processing. This includes dynamic token calculation and final chunk truncation. + + Args: + entities_context: List of entity context dicts + relations_context: List of relation context dicts + merged_chunks: List of merged chunk dicts + query: Query string + query_param: Query parameters (includes include_metadata flag) + global_config: Global configuration + chunk_tracking: Chunk tracking information + entity_id_to_original: Mapping from entity IDs to original data + relation_id_to_original: Mapping from relation IDs to original data + doc_status_storage: Document status storage for metadata lookup """ tokenizer = global_config.get("tokenizer") if not tokenizer: logger.error("Missing tokenizer, cannot build LLM context") # Return empty raw data structure when no tokenizer - empty_raw_data = convert_to_user_format( + empty_raw_data = await convert_to_user_format( [], [], [], [], query_param.mode, + doc_status_storage=doc_status_storage, + include_metadata=query_param.include_metadata, ) empty_raw_data["status"] = "failure" empty_raw_data["message"] = "Missing tokenizer, cannot build LLM context." @@ -4100,12 +4119,14 @@ async def _build_context_str( # not necessary to use LLM to generate a response if not entities_context and not relations_context and not chunks_context: # Return empty raw data structure when no entities/relations - empty_raw_data = convert_to_user_format( + empty_raw_data = await convert_to_user_format( [], [], [], [], query_param.mode, + doc_status_storage=doc_status_storage, + include_metadata=query_param.include_metadata, ) empty_raw_data["status"] = "failure" empty_raw_data["message"] = "Query returned empty dataset." @@ -4140,7 +4161,7 @@ async def _build_context_str( logger.debug( f"[_build_context_str] Converting to user format: {len(entities_context)} entities, {len(relations_context)} relations, {len(truncated_chunks)} chunks" ) - final_data = convert_to_user_format( + final_data = await convert_to_user_format( entities_context, relations_context, truncated_chunks, @@ -4148,6 +4169,8 @@ async def _build_context_str( query_param.mode, entity_id_to_original, relation_id_to_original, + doc_status_storage=doc_status_storage, + include_metadata=query_param.include_metadata, ) logger.debug( f"[_build_context_str] Final data after conversion: {len(final_data.get('entities', []))} entities, {len(final_data.get('relationships', []))} relationships, {len(final_data.get('chunks', []))} chunks" @@ -4166,12 +4189,25 @@ async def _build_query_context( text_chunks_db: BaseKVStorage, query_param: QueryParam, chunks_vdb: BaseVectorStorage = None, + doc_status_storage: BaseKVStorage | None = None, ) -> QueryContextResult | None: """ Main query context building function using the new 4-stage architecture: 1. Search -> 2. Truncate -> 3. Merge chunks -> 4. Build LLM context Returns unified QueryContextResult containing both context and raw_data. + + Args: + query: Query string + ll_keywords: Low-level keywords + hl_keywords: High-level keywords + knowledge_graph_inst: Knowledge graph storage + entities_vdb: Entity vector database + relationships_vdb: Relationship vector database + text_chunks_db: Text chunks storage + query_param: Query parameters (includes include_metadata flag) + chunks_vdb: Document chunks vector database + doc_status_storage: Document status storage for metadata lookup """ if not query: @@ -4238,6 +4274,7 @@ async def _build_query_context( chunk_tracking=search_result["chunk_tracking"], entity_id_to_original=truncation_result["entity_id_to_original"], relation_id_to_original=truncation_result["relation_id_to_original"], + doc_status_storage=doc_status_storage, ) # Convert keywords strings to lists and add complete metadata to raw_data @@ -4877,6 +4914,7 @@ async def naive_query( global_config: dict[str, str], hashing_kv: BaseKVStorage | None = None, system_prompt: str | None = None, + doc_status_storage: BaseKVStorage | None = None, ) -> QueryResult | None: """ Execute naive query and return unified QueryResult object. @@ -4884,10 +4922,11 @@ async def naive_query( Args: query: Query string chunks_vdb: Document chunks vector database - query_param: Query parameters + query_param: Query parameters (includes include_metadata flag) global_config: Global configuration hashing_kv: Cache storage system_prompt: System prompt + doc_status_storage: Document status storage for metadata lookup (optional) Returns: QueryResult | None: Unified query result object containing: @@ -4979,12 +5018,14 @@ async def naive_query( logger.info(f"Final context: {len(processed_chunks_with_ref_ids)} chunks") # Build raw data structure for naive mode using processed chunks with reference IDs - raw_data = convert_to_user_format( + raw_data = await convert_to_user_format( [], # naive mode has no entities [], # naive mode has no relationships processed_chunks_with_ref_ids, reference_list, "naive", + doc_status_storage=doc_status_storage, + include_metadata=query_param.include_metadata, ) # Add complete metadata for naive mode diff --git a/lightrag/utils.py b/lightrag/utils.py index fc8bc401c9..6f503d191f 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -3114,7 +3114,7 @@ def create_prefixed_exception(original_exception: Exception, prefix: str) -> Exc ) -def convert_to_user_format( +async def convert_to_user_format( entities_context: list[dict], relations_context: list[dict], chunks: list[dict], @@ -3122,8 +3122,25 @@ def convert_to_user_format( query_mode: str, entity_id_to_original: dict = None, relation_id_to_original: dict = None, + doc_status_storage=None, + include_metadata: bool = False, ) -> dict[str, Any]: - """Convert internal data format to user-friendly format using original database data""" + """Convert internal data format to user-friendly format using original database data + + Args: + entities_context: List of entity context dicts + relations_context: List of relation context dicts + chunks: List of chunk dicts (with full_doc_id) + references: List of reference dicts + query_mode: Query mode string + entity_id_to_original: Mapping of entity IDs to original data + relation_id_to_original: Mapping of relation IDs to original data + doc_status_storage: Document status storage for metadata lookup (optional) + include_metadata: If True, retrieves metadata for each chunk using full_doc_id + + Returns: + dict containing formatted data with optional metadata + """ # Convert entities format using original data when available formatted_entities = [] @@ -3201,6 +3218,28 @@ def convert_to_user_format( } ) + # Fetch metadata if requested and doc_status_storage is provided + doc_id_to_metadata = {} + if include_metadata and doc_status_storage is not None: + # Collect unique full_doc_ids from chunks + unique_doc_ids = set() + for chunk in chunks: + full_doc_id = chunk.get("full_doc_id") + if full_doc_id: + unique_doc_ids.add(full_doc_id) + + # Batch lookup metadata for all unique document IDs + if unique_doc_ids: + for doc_id in unique_doc_ids: + try: + doc_data = await doc_status_storage.get_by_id(doc_id) + if doc_data and "metadata" in doc_data: + doc_id_to_metadata[doc_id] = doc_data.get("metadata") + except Exception as e: + logger.warning( + f"[convert_to_user_format] Failed to fetch metadata for doc_id {doc_id}: {e}" + ) + # Convert chunks format (chunks already contain complete data) formatted_chunks = [] for i, chunk in enumerate(chunks): @@ -3210,6 +3249,15 @@ def convert_to_user_format( "file_path": chunk.get("file_path", "unknown_source"), "chunk_id": chunk.get("chunk_id", ""), } + + # Add metadata if requested and available + if include_metadata: + full_doc_id = chunk.get("full_doc_id") + if full_doc_id and full_doc_id in doc_id_to_metadata: + chunk_data["metadata"] = doc_id_to_metadata[full_doc_id] + else: + chunk_data["metadata"] = None + formatted_chunks.append(chunk_data) logger.debug( diff --git a/lightrag_webui/src/api/lightrag.ts b/lightrag_webui/src/api/lightrag.ts index 7831a4c7fc..4bb06a820e 100644 --- a/lightrag_webui/src/api/lightrag.ts +++ b/lightrag_webui/src/api/lightrag.ts @@ -849,6 +849,21 @@ export const deleteDocuments = async ( return response.data } +export const getDocumentById = async (documentId: string): Promise => { + const response = await axiosInstance.get(`/documents/${documentId}`) + return response.data +} + +export const updateDocumentMetadata = async ( + documentId: string, + metadata: Record +): Promise => { + const response = await axiosInstance.patch(`/documents/${documentId}/metadata`, { + metadata + }) + return response.data +} + export const getAuthStatus = async (): Promise => { try { // Add a timeout to the request to prevent hanging diff --git a/lightrag_webui/src/components/documents/MetadataEditorDialog.tsx b/lightrag_webui/src/components/documents/MetadataEditorDialog.tsx new file mode 100644 index 0000000000..e165d0acbe --- /dev/null +++ b/lightrag_webui/src/components/documents/MetadataEditorDialog.tsx @@ -0,0 +1,316 @@ +import { useState, useCallback, useEffect } from 'react' +import Button from '@/components/ui/Button' +import { + Dialog, + DialogContent, + DialogDescription, + DialogHeader, + DialogTitle, + DialogTrigger, + DialogFooter +} from '@/components/ui/Dialog' +import Input from '@/components/ui/Input' +import { toast } from 'sonner' +import { errorMessage } from '@/lib/utils' +import { updateDocumentMetadata } from '@/api/lightrag' + +import { PencilIcon, PlusIcon, XIcon, LockIcon } from 'lucide-react' +import { useTranslation } from 'react-i18next' + +// System-reserved metadata fields that cannot be modified +const RESERVED_METADATA_FIELDS = new Set([ + 'is_duplicate', + 'original_doc_id', + 'original_track_id', + 'error_type', + 'processing_start_time', + 'processing_end_time' +]) + +// Simple Label component +const Label = ({ + htmlFor, + className, + children, + ...props +}: React.LabelHTMLAttributes) => ( + +) + +interface MetadataRow { + id: string + key: string + value: string + isReserved: boolean +} + +interface MetadataEditorDialogProps { + documentId: string + documentName?: string + currentMetadata?: Record + onMetadataUpdated?: () => Promise + triggerButton?: React.ReactNode +} + +export default function MetadataEditorDialog({ + documentId, + documentName, + currentMetadata = {}, + onMetadataUpdated, + triggerButton +}: MetadataEditorDialogProps) { + const { t } = useTranslation() + const [open, setOpen] = useState(false) + const [rows, setRows] = useState([]) + const [isSaving, setIsSaving] = useState(false) + + // Initialize rows from current metadata when dialog opens + useEffect(() => { + if (open) { + // Separate reserved and user fields, keeping both in original order + const entries = Object.entries(currentMetadata) + const reservedEntries = entries.filter(([key]) => RESERVED_METADATA_FIELDS.has(key)) + const userEntries = entries.filter(([key]) => !RESERVED_METADATA_FIELDS.has(key)) + + // Reserved fields first, then user fields (both in original order) + const orderedEntries = [...reservedEntries, ...userEntries] + + const initialRows: MetadataRow[] = orderedEntries.map( + ([key, value], index) => ({ + id: `initial-${index}`, + key, + value: typeof value === 'object' ? JSON.stringify(value) : String(value), + isReserved: RESERVED_METADATA_FIELDS.has(key) + }) + ) + // Always start with at least one empty row if no metadata exists + if (initialRows.length === 0) { + initialRows.push({ id: 'new-0', key: '', value: '', isReserved: false }) + } + setRows(initialRows) + } + }, [open, currentMetadata]) + + // Add a new empty row + const handleAddRow = useCallback(() => { + const newRow: MetadataRow = { + id: `new-${Date.now()}`, + key: '', + value: '', + isReserved: false + } + setRows((prev) => [...prev, newRow]) + }, []) + + // Delete a row (only for non-reserved fields) + const handleDeleteRow = useCallback((rowId: string) => { + setRows((prev) => { + const newRows = prev.filter((row) => row.id !== rowId) + // Ensure at least one row remains + if (newRows.length === 0) { + return [{ id: `new-${Date.now()}`, key: '', value: '', isReserved: false }] + } + return newRows + }) + }, []) + + // Update a row's key or value (only for non-reserved fields) + const handleUpdateRow = useCallback((rowId: string, field: 'key' | 'value', newValue: string) => { + setRows((prev) => + prev.map((row) => + row.id === rowId ? { ...row, [field]: newValue } : row + ) + ) + }, []) + + // Save metadata + const handleSave = useCallback(async () => { + // Validate: no duplicate keys, no empty keys for non-empty values + const metadata: Record = {} + const keys = new Set() + + for (const row of rows) { + // Skip reserved fields - they are read-only + if (row.isReserved) { + continue + } + + const trimmedKey = row.key.trim() + const trimmedValue = row.value.trim() + + // Skip completely empty rows + if (!trimmedKey && !trimmedValue) { + continue + } + + // Check for empty key with non-empty value + if (!trimmedKey && trimmedValue) { + toast.error(t('documentPanel.metadataEditor.errorEmptyKey')) + return + } + + // Check for duplicate keys + if (keys.has(trimmedKey)) { + toast.error(t('documentPanel.metadataEditor.errorDuplicateKey', { key: trimmedKey })) + return + } + + // Check if user is trying to use a reserved field name + if (RESERVED_METADATA_FIELDS.has(trimmedKey)) { + toast.error(t('documentPanel.metadataEditor.errorReservedKey', { key: trimmedKey })) + return + } + + keys.add(trimmedKey) + + // Try to parse value as JSON, otherwise use as string + try { + metadata[trimmedKey] = JSON.parse(trimmedValue) + } catch { + metadata[trimmedKey] = trimmedValue + } + } + + setIsSaving(true) + try { + await updateDocumentMetadata(documentId, metadata) + toast.success(t('documentPanel.metadataEditor.success')) + + // Refresh document list if provided + if (onMetadataUpdated) { + await onMetadataUpdated() + } + + // Close dialog + setOpen(false) + } catch (err) { + toast.error(t('documentPanel.metadataEditor.error', { error: errorMessage(err) })) + } finally { + setIsSaving(false) + } + }, [rows, documentId, t, onMetadataUpdated]) + + const defaultTrigger = ( + + ) + + return ( + + + {triggerButton || defaultTrigger} + + e.preventDefault()}> + + + + {t('documentPanel.metadataEditor.title')} + + + {documentName + ? t('documentPanel.metadataEditor.descriptionWithName', { name: documentName }) + : t('documentPanel.metadataEditor.description', { id: documentId })} + + + +
+
+ + +
+
+ + {rows.map((row) => ( +
+
+ ) => + handleUpdateRow(row.id, 'key', e.target.value) + } + placeholder={t('documentPanel.metadataEditor.keyPlaceholder')} + disabled={isSaving || row.isReserved} + className={`font-mono text-sm ${row.isReserved ? 'bg-muted cursor-not-allowed' : ''}`} + /> + {row.isReserved && ( + + )} +
+ ) => + handleUpdateRow(row.id, 'value', e.target.value) + } + placeholder={t('documentPanel.metadataEditor.valuePlaceholder')} + disabled={isSaving || row.isReserved} + className={`font-mono text-sm ${row.isReserved ? 'bg-muted cursor-not-allowed' : ''}`} + /> + {row.isReserved ? ( +
+ +
+ ) : ( + + )} +
+ ))} + + +
+ + + + + +
+
+ ) +} diff --git a/lightrag_webui/src/features/DocumentManager.tsx b/lightrag_webui/src/features/DocumentManager.tsx index a646a39886..d6789f1325 100644 --- a/lightrag_webui/src/features/DocumentManager.tsx +++ b/lightrag_webui/src/features/DocumentManager.tsx @@ -17,6 +17,7 @@ import Checkbox from '@/components/ui/Checkbox' import UploadDocumentsDialog from '@/components/documents/UploadDocumentsDialog' import ClearDocumentsDialog from '@/components/documents/ClearDocumentsDialog' import DeleteDocumentsDialog from '@/components/documents/DeleteDocumentsDialog' +import MetadataEditorDialog from '@/components/documents/MetadataEditorDialog' import PaginationControls from '@/components/ui/PaginationControls' import { @@ -1370,6 +1371,9 @@ export default function DocumentManager() { )} + + {t('documentPanel.documentManager.columns.actions')} + {t('documentPanel.documentManager.columns.select')} @@ -1461,6 +1465,16 @@ export default function DocumentManager() { {new Date(doc.updated_at).toLocaleString()} + + { + await handleIntelligentRefresh(pagination.page) + }} + /> +