Source code for langchain.storage.file_system
import os
import re
import time
from pathlib import Path
from typing import Iterator, List, Optional, Sequence, Tuple, Union
from langchain_core.stores import ByteStore
from langchain.storage.exceptions import InvalidKeyException
[docs]
class LocalFileStore(ByteStore):
"""BaseStore interface that works on the local file system.
Examples:
Create a LocalFileStore instance and perform operations on it:
.. code-block:: python
from langchain.storage import LocalFileStore
# Instantiate the LocalFileStore with the root path
file_store = LocalFileStore("/path/to/root")
# Set values for keys
file_store.mset([("key1", b"value1"), ("key2", b"value2")])
# Get values for keys
values = file_store.mget(["key1", "key2"]) # Returns [b"value1", b"value2"]
# Delete keys
file_store.mdelete(["key1"])
# Iterate over keys
for key in file_store.yield_keys():
print(key) # noqa: T201
"""
[docs]
def __init__(
self,
root_path: Union[str, Path],
*,
chmod_file: Optional[int] = None,
chmod_dir: Optional[int] = None,
update_atime: bool = False,
) -> None:
"""Implement the BaseStore interface for the local file system.
Args:
root_path (Union[str, Path]): The root path of the file store. All keys are
interpreted as paths relative to this root.
chmod_file: (optional, defaults to `None`) If specified, sets permissions
for newly created files, overriding the current `umask` if needed.
chmod_dir: (optional, defaults to `None`) If specified, sets permissions
for newly created dirs, overriding the current `umask` if needed.
update_atime: (optional, defaults to `False`) If `True`, updates the
filesystem access time (but not the modified time) when a file is read.
This allows MRU/LRU cache policies to be implemented for filesystems
where access time updates are disabled.
"""
self.root_path = Path(root_path).absolute()
self.chmod_file = chmod_file
self.chmod_dir = chmod_dir
self.update_atime = update_atime
def _get_full_path(self, key: str) -> Path:
"""Get the full path for a given key relative to the root path.
Args:
key (str): The key relative to the root path.
Returns:
Path: The full path for the given key.
"""
if not re.match(r"^[a-zA-Z0-9_.\-/]+$", key):
raise InvalidKeyException(f"Invalid characters in key: {key}")
full_path = os.path.abspath(self.root_path / key)
common_path = os.path.commonpath([str(self.root_path), full_path])
if common_path != str(self.root_path):
raise InvalidKeyException(
f"Invalid key: {key}. Key should be relative to the full path."
f"{self.root_path} vs. {common_path} and full path of {full_path}"
)
return Path(full_path)
def _mkdir_for_store(self, dir: Path) -> None:
"""Makes a store directory path (including parents) with specified permissions
This is needed because `Path.mkdir()` is restricted by the current `umask`,
whereas the explicit `os.chmod()` used here is not.
Args:
dir: (Path) The store directory to make
Returns:
None
"""
if not dir.exists():
self._mkdir_for_store(dir.parent)
dir.mkdir(exist_ok=True)
if self.chmod_dir is not None:
os.chmod(dir, self.chmod_dir)
[docs]
def mget(self, keys: Sequence[str]) -> List[Optional[bytes]]:
"""Get the values associated with the given keys.
Args:
keys: A sequence of keys.
Returns:
A sequence of optional values associated with the keys.
If a key is not found, the corresponding value will be None.
"""
values: List[Optional[bytes]] = []
for key in keys:
full_path = self._get_full_path(key)
if full_path.exists():
value = full_path.read_bytes()
values.append(value)
if self.update_atime:
# update access time only; preserve modified time
os.utime(full_path, (time.time(), os.stat(full_path).st_mtime))
else:
values.append(None)
return values
[docs]
def mset(self, key_value_pairs: Sequence[Tuple[str, bytes]]) -> None:
"""Set the values for the given keys.
Args:
key_value_pairs: A sequence of key-value pairs.
Returns:
None
"""
for key, value in key_value_pairs:
full_path = self._get_full_path(key)
self._mkdir_for_store(full_path.parent)
full_path.write_bytes(value)
if self.chmod_file is not None:
os.chmod(full_path, self.chmod_file)
[docs]
def mdelete(self, keys: Sequence[str]) -> None:
"""Delete the given keys and their associated values.
Args:
keys (Sequence[str]): A sequence of keys to delete.
Returns:
None
"""
for key in keys:
full_path = self._get_full_path(key)
if full_path.exists():
full_path.unlink()
[docs]
def yield_keys(self, prefix: Optional[str] = None) -> Iterator[str]:
"""Get an iterator over keys that match the given prefix.
Args:
prefix (Optional[str]): The prefix to match.
Returns:
Iterator[str]: An iterator over keys that match the given prefix.
"""
prefix_path = self._get_full_path(prefix) if prefix else self.root_path
for file in prefix_path.rglob("*"):
if file.is_file():
relative_path = file.relative_to(self.root_path)
yield str(relative_path)