from __future__ import annotations
import json
import logging
import warnings
from typing import (
Any,
AsyncIterator,
Callable,
Dict,
Iterator,
List,
Optional,
Union,
)
from astrapy.authentication import TokenProvider
from astrapy.db import AstraDB, AsyncAstraDB
from langchain_core.document_loaders import BaseLoader
from langchain_core.documents import Document
from langchain_astradb.utils.astradb import (
SetupMode,
_AstraDBCollectionEnvironment,
)
logger = logging.getLogger(__name__)
_NOT_SET = object()
[docs]
class AstraDBLoader(BaseLoader):
[docs]
def __init__(
self,
collection_name: str,
*,
token: Optional[Union[str, TokenProvider]] = None,
api_endpoint: Optional[str] = None,
environment: Optional[str] = None,
astra_db_client: Optional[AstraDB] = None,
async_astra_db_client: Optional[AsyncAstraDB] = None,
namespace: Optional[str] = None,
filter_criteria: Optional[Dict[str, Any]] = None,
projection: Optional[Dict[str, Any]] = _NOT_SET, # type: ignore[assignment]
find_options: Optional[Dict[str, Any]] = None,
limit: Optional[int] = None,
nb_prefetched: int = _NOT_SET, # type: ignore[assignment]
page_content_mapper: Callable[[Dict], str] = json.dumps,
metadata_mapper: Optional[Callable[[Dict], Dict[str, Any]]] = None,
) -> None:
"""Load DataStax Astra DB documents.
Args:
collection_name: name of the Astra DB collection to use.
token: API token for Astra DB usage, either in the form of a string
or a subclass of `astrapy.authentication.TokenProvider`.
If not provided, the environment variable
ASTRA_DB_APPLICATION_TOKEN is inspected.
api_endpoint: full URL to the API endpoint, such as
`https://<DB-ID>-us-east1.apps.astra.datastax.com`. If not provided,
the environment variable ASTRA_DB_API_ENDPOINT is inspected.
environment: a string specifying the environment of the target Data API.
If omitted, defaults to "prod" (Astra DB production).
Other values are in `astrapy.constants.Environment` enum class.
astra_db_client:
*DEPRECATED starting from version 0.3.5.*
*Please use 'token', 'api_endpoint' and optionally 'environment'.*
you can pass an already-created 'astrapy.db.AstraDB' instance
(alternatively to 'token', 'api_endpoint' and 'environment').
async_astra_db_client:
*DEPRECATED starting from version 0.3.5.*
*Please use 'token', 'api_endpoint' and optionally 'environment'.*
you can pass an already-created 'astrapy.db.AsyncAstraDB' instance
(alternatively to 'token', 'api_endpoint' and 'environment').
namespace: namespace (aka keyspace) where the collection resides.
If not provided, the environment variable ASTRA_DB_KEYSPACE is
inspected. Defaults to the database's "default namespace".
filter_criteria: Criteria to filter documents.
projection: Specifies the fields to return. If not provided, reads
fall back to the Data API default projection.
find_options: Additional options for the query.
*DEPRECATED starting from version 0.3.5.*
*For limiting, please use `limit`. Other options are ignored.*
limit: a maximum number of documents to return in the read query.
nb_prefetched: Max number of documents to pre-fetch.
*IGNORED starting from v. 0.3.5: astrapy v1.0+ does not support it.*
page_content_mapper: Function applied to collection documents to create
the `page_content` of the LangChain Document. Defaults to `json.dumps`.
"""
astra_db_env = _AstraDBCollectionEnvironment(
collection_name=collection_name,
token=token,
api_endpoint=api_endpoint,
environment=environment,
astra_db_client=astra_db_client,
async_astra_db_client=async_astra_db_client,
namespace=namespace,
setup_mode=SetupMode.OFF,
)
self.astra_db_env = astra_db_env
self.filter = filter_criteria
self._projection: Optional[Dict[str, Any]] = (
projection if projection is not _NOT_SET else {"*": True}
)
# warning if 'prefetched' passed
if nb_prefetched is not _NOT_SET:
warnings.warn(
(
"Parameter 'nb_prefetched' is not supported by the Data API "
"client and will be ignored in reading document."
),
UserWarning,
)
# normalizing limit and options and deprecations
_find_options = find_options.copy() if find_options else {}
if "limit" in _find_options:
if limit is not None:
raise ValueError(
"Duplicate 'limit' directive supplied. Please remove it "
"from the 'find_options' map parameter."
)
else:
warnings.warn(
(
"Passing 'limit' as part of the 'find_options' "
"dictionary is deprecated starting from version 0.3.5. "
"Please switch to passing 'limit=<number>' "
"directly in the constructor."
),
DeprecationWarning,
)
self.limit = _find_options.pop("limit", limit)
if _find_options:
warnings.warn(
(
"Unknown keys passed in the 'find_options' dictionary. "
"This parameter is deprecated starting from version 0.3.5."
),
DeprecationWarning,
)
#
self.nb_prefetched = nb_prefetched
self.page_content_mapper = page_content_mapper
self.metadata_mapper = metadata_mapper or (
lambda _: {
"namespace": self.astra_db_env.database.namespace,
"api_endpoint": self.astra_db_env.database.api_endpoint,
"collection": collection_name,
}
)
def _to_langchain_doc(self, doc: Dict[str, Any]) -> Document:
return Document(
page_content=self.page_content_mapper(doc),
metadata=self.metadata_mapper(doc),
)
[docs]
def lazy_load(self) -> Iterator[Document]:
for doc in self.astra_db_env.collection.find(
filter=self.filter,
projection=self._projection,
limit=self.limit,
# prefetch: not available at the moment (silently ignored)
# prefetched=self.nb_prefetched,
):
yield self._to_langchain_doc(doc)
[docs]
async def aload(self) -> List[Document]:
"""Load data into Document objects."""
return [doc async for doc in self.alazy_load()]
[docs]
async def alazy_load(self) -> AsyncIterator[Document]:
async for doc in self.astra_db_env.async_collection.find(
filter=self.filter,
projection=self._projection,
limit=self.limit,
# prefetch: not available at the moment (silently ignored):
# prefetched=self.nb_prefetched,
):
yield self._to_langchain_doc(doc)