Source code for langchain.storage.file_system

import os
import re
import time
from pathlib import Path
from typing import Iterator, List, Optional, Sequence, Tuple, Union

from langchain_core.stores import ByteStore

from langchain.storage.exceptions import InvalidKeyException


[docs] class LocalFileStore(ByteStore): """BaseStore interface that works on the local file system. Examples: Create a LocalFileStore instance and perform operations on it: .. code-block:: python from langchain.storage import LocalFileStore # Instantiate the LocalFileStore with the root path file_store = LocalFileStore("/path/to/root") # Set values for keys file_store.mset([("key1", b"value1"), ("key2", b"value2")]) # Get values for keys values = file_store.mget(["key1", "key2"]) # Returns [b"value1", b"value2"] # Delete keys file_store.mdelete(["key1"]) # Iterate over keys for key in file_store.yield_keys(): print(key) # noqa: T201 """
[docs] def __init__( self, root_path: Union[str, Path], *, chmod_file: Optional[int] = None, chmod_dir: Optional[int] = None, update_atime: bool = False, ) -> None: """Implement the BaseStore interface for the local file system. Args: root_path (Union[str, Path]): The root path of the file store. All keys are interpreted as paths relative to this root. chmod_file: (optional, defaults to `None`) If specified, sets permissions for newly created files, overriding the current `umask` if needed. chmod_dir: (optional, defaults to `None`) If specified, sets permissions for newly created dirs, overriding the current `umask` if needed. update_atime: (optional, defaults to `False`) If `True`, updates the filesystem access time (but not the modified time) when a file is read. This allows MRU/LRU cache policies to be implemented for filesystems where access time updates are disabled. """ self.root_path = Path(root_path).absolute() self.chmod_file = chmod_file self.chmod_dir = chmod_dir self.update_atime = update_atime
def _get_full_path(self, key: str) -> Path: """Get the full path for a given key relative to the root path. Args: key (str): The key relative to the root path. Returns: Path: The full path for the given key. """ if not re.match(r"^[a-zA-Z0-9_.\-/]+$", key): raise InvalidKeyException(f"Invalid characters in key: {key}") full_path = os.path.abspath(self.root_path / key) common_path = os.path.commonpath([str(self.root_path), full_path]) if common_path != str(self.root_path): raise InvalidKeyException( f"Invalid key: {key}. Key should be relative to the full path." f"{self.root_path} vs. {common_path} and full path of {full_path}" ) return Path(full_path) def _mkdir_for_store(self, dir: Path) -> None: """Makes a store directory path (including parents) with specified permissions This is needed because `Path.mkdir()` is restricted by the current `umask`, whereas the explicit `os.chmod()` used here is not. Args: dir: (Path) The store directory to make Returns: None """ if not dir.exists(): self._mkdir_for_store(dir.parent) dir.mkdir(exist_ok=True) if self.chmod_dir is not None: os.chmod(dir, self.chmod_dir)
[docs] def mget(self, keys: Sequence[str]) -> List[Optional[bytes]]: """Get the values associated with the given keys. Args: keys: A sequence of keys. Returns: A sequence of optional values associated with the keys. If a key is not found, the corresponding value will be None. """ values: List[Optional[bytes]] = [] for key in keys: full_path = self._get_full_path(key) if full_path.exists(): value = full_path.read_bytes() values.append(value) if self.update_atime: # update access time only; preserve modified time os.utime(full_path, (time.time(), os.stat(full_path).st_mtime)) else: values.append(None) return values
[docs] def mset(self, key_value_pairs: Sequence[Tuple[str, bytes]]) -> None: """Set the values for the given keys. Args: key_value_pairs: A sequence of key-value pairs. Returns: None """ for key, value in key_value_pairs: full_path = self._get_full_path(key) self._mkdir_for_store(full_path.parent) full_path.write_bytes(value) if self.chmod_file is not None: os.chmod(full_path, self.chmod_file)
[docs] def mdelete(self, keys: Sequence[str]) -> None: """Delete the given keys and their associated values. Args: keys (Sequence[str]): A sequence of keys to delete. Returns: None """ for key in keys: full_path = self._get_full_path(key) if full_path.exists(): full_path.unlink()
[docs] def yield_keys(self, prefix: Optional[str] = None) -> Iterator[str]: """Get an iterator over keys that match the given prefix. Args: prefix (Optional[str]): The prefix to match. Returns: Iterator[str]: An iterator over keys that match the given prefix. """ prefix_path = self._get_full_path(prefix) if prefix else self.root_path for file in prefix_path.rglob("*"): if file.is_file(): relative_path = file.relative_to(self.root_path) yield str(relative_path)