import itertools
import time
from typing import Any, Dict, List, Optional, Union
from redis._parsers.helpers import pairs_to_dict
from redis.client import NEVER_DECODE, Pipeline
from redis.commands.search.hybrid_query import (
CombineResultsMethod,
HybridCursorQuery,
HybridPostProcessingConfig,
HybridQuery,
)
from redis.commands.search.hybrid_result import HybridCursorResult, HybridResult
from redis.utils import (
check_protocol_version,
decode_field_value,
deprecated_function,
experimental_method,
str_if_bytes,
)
from ..helpers import get_legacy_responses, get_protocol_version
from .aggregation import (
AggregateRequest,
AggregateResult,
Cursor,
)
from .document import Document
from .field import Field
from .index_definition import IndexDefinition
from .profile_information import ProfileInformation
from .query import Query
from .result import Result
from .suggestion import SuggestionParser
NUMERIC = "NUMERIC"
CREATE_CMD = "FT.CREATE"
ALTER_CMD = "FT.ALTER"
SEARCH_CMD = "FT.SEARCH"
ADD_CMD = "FT.ADD"
ADDHASH_CMD = "FT.ADDHASH"
DROPINDEX_CMD = "FT.DROPINDEX"
EXPLAIN_CMD = "FT.EXPLAIN"
EXPLAINCLI_CMD = "FT.EXPLAINCLI"
DEL_CMD = "FT.DEL"
AGGREGATE_CMD = "FT.AGGREGATE"
PROFILE_CMD = "FT.PROFILE"
CURSOR_CMD = "FT.CURSOR"
SPELLCHECK_CMD = "FT.SPELLCHECK"
DICT_ADD_CMD = "FT.DICTADD"
DICT_DEL_CMD = "FT.DICTDEL"
DICT_DUMP_CMD = "FT.DICTDUMP"
MGET_CMD = "FT.MGET"
CONFIG_CMD = "FT.CONFIG"
TAGVALS_CMD = "FT.TAGVALS"
ALIAS_ADD_CMD = "FT.ALIASADD"
ALIAS_UPDATE_CMD = "FT.ALIASUPDATE"
ALIAS_DEL_CMD = "FT.ALIASDEL"
INFO_CMD = "FT.INFO"
SUGADD_COMMAND = "FT.SUGADD"
SUGDEL_COMMAND = "FT.SUGDEL"
SUGLEN_COMMAND = "FT.SUGLEN"
SUGGET_COMMAND = "FT.SUGGET"
SYNUPDATE_CMD = "FT.SYNUPDATE"
SYNDUMP_CMD = "FT.SYNDUMP"
HYBRID_CMD = "FT.HYBRID"
NOOFFSETS = "NOOFFSETS"
NOFIELDS = "NOFIELDS"
NOHL = "NOHL"
NOFREQS = "NOFREQS"
MAXTEXTFIELDS = "MAXTEXTFIELDS"
TEMPORARY = "TEMPORARY"
STOPWORDS = "STOPWORDS"
SKIPINITIALSCAN = "SKIPINITIALSCAN"
WITHSCORES = "WITHSCORES"
FUZZY = "FUZZY"
WITHPAYLOADS = "WITHPAYLOADS"
[docs]class SearchCommands:
"""Search commands."""
# Commands whose parsers require a ``query`` kwarg. When invoked as a
# pipeline response-callback the kwarg is carried inside the options
# dict that ``execute_command`` stored earlier. If the key is absent
# (e.g. a raw ``execute_command("FT.SEARCH", ...)`` call) return the
# response unparsed so we don't crash.
_QUERY_REQUIRED_CMDS = frozenset(
{SEARCH_CMD, AGGREGATE_CMD, CURSOR_CMD, HYBRID_CMD, PROFILE_CMD}
)
def _init_module_callbacks(self):
"""Build the per-protocol module callback maps.
Called from ``Search.__init__``, ``Pipeline.__init__`` and
``AsyncPipeline.__init__`` so the mapping lives in a single place
rather than being duplicated across all three classes.
"""
# ``protocol=2`` + ``legacy_responses=True``: original RESP2 wire
# parsers preserving the v5 Python shapes exactly.
self._RESP2_MODULE_CALLBACKS = {
INFO_CMD: self._parse_info,
SEARCH_CMD: self._parse_search,
HYBRID_CMD: self._parse_hybrid_search,
AGGREGATE_CMD: self._parse_aggregate,
PROFILE_CMD: self._parse_profile,
SPELLCHECK_CMD: self._parse_spellcheck,
CONFIG_CMD: self._parse_config_get,
SYNDUMP_CMD: self._parse_syndump,
}
# Explicit ``protocol=3`` + ``legacy_responses=True`` keeps the
# pre-existing native RESP3 surface. The only registered callback
# is for experimental HYBRID, which normalizes the native shape.
# FT.PROFILE stays on the old direct ``_parse_results`` special
# case and is not registered as a response callback.
self._RESP3_MODULE_CALLBACKS = {
HYBRID_CMD: self._parse_hybrid_search_resp3_native,
}
# ``protocol=None`` + ``legacy_responses=True`` (the v8 default):
# the wire is RESP3 but the Python surface mirrors RESP2 legacy
# objects (``Result``, ``AggregateResult``, ``(result, profile)``
# tuple, ...).
self._RESP3_TO_RESP2_LEGACY_MODULE_CALLBACKS = {
INFO_CMD: self._parse_info_resp3_to_legacy,
SEARCH_CMD: self._parse_search_resp3,
HYBRID_CMD: self._parse_hybrid_search_resp3,
AGGREGATE_CMD: self._parse_aggregate_resp3,
PROFILE_CMD: self._parse_profile_resp3,
SPELLCHECK_CMD: self._parse_spellcheck_resp3,
CONFIG_CMD: self._parse_config_get_resp3_to_legacy,
SYNDUMP_CMD: self._parse_syndump_resp3,
}
# Search pipelines historically returned raw wire responses in
# legacy mode. The default connection now uses RESP3 on the wire,
# so these callbacks adapt only the default legacy pipeline case
# back to the raw RESP2 pipeline shapes users saw prior v8.0.
self._RESP3_TO_RESP2_LEGACY_PIPELINE_CALLBACKS = {
SEARCH_CMD: self._pipeline_parse_search_resp3_to_legacy,
HYBRID_CMD: self._pipeline_parse_hybrid_search_resp3_to_legacy,
}
# ``legacy_responses=False`` + RESP2 wire: enhanced RESP2 parsers
# producing the unified shape (``attributes`` as list of dicts,
# command-specific value normalisation where the approved shape
# requires it).
self._RESP2_UNIFIED_MODULE_CALLBACKS = {
INFO_CMD: self._parse_info_unified,
SEARCH_CMD: self._parse_search,
HYBRID_CMD: self._parse_hybrid_search_unified,
AGGREGATE_CMD: self._parse_aggregate,
PROFILE_CMD: self._parse_profile_unified,
SPELLCHECK_CMD: self._parse_spellcheck,
CONFIG_CMD: self._parse_config_get_unified,
SYNDUMP_CMD: self._parse_syndump_unified,
}
# ``legacy_responses=False`` + RESP3 wire: keeps the native RESP3
# shape for commands whose unified shape diverges from the
# RESP3-to-RESP2-legacy adapter (``FT.INFO`` keeps the native
# nested dict, ``FT.PROFILE`` keeps profile data as a dict).
self._RESP3_UNIFIED_MODULE_CALLBACKS = dict(
self._RESP3_TO_RESP2_LEGACY_MODULE_CALLBACKS
)
self._RESP3_UNIFIED_MODULE_CALLBACKS[INFO_CMD] = self._parse_info_resp3
self._RESP3_UNIFIED_MODULE_CALLBACKS[CONFIG_CMD] = self._parse_config_get_resp3
self._RESP3_UNIFIED_MODULE_CALLBACKS[PROFILE_CMD] = (
self._parse_profile_resp3_unified
)
self._RESP3_UNIFIED_MODULE_CALLBACKS[HYBRID_CMD] = (
self._parse_hybrid_search_resp3_unified
)
def _parse_results(self, cmd, res, **kwargs):
if cmd in self._QUERY_REQUIRED_CMDS and "query" not in kwargs:
return res
protocol = get_protocol_version(self.client)
legacy = get_legacy_responses(self.client)
if legacy:
if protocol in (3, "3"):
if cmd == PROFILE_CMD:
return ProfileInformation(res)
cb = self._RESP3_MODULE_CALLBACKS.get(cmd)
elif check_protocol_version(protocol, 3):
cb = self._RESP3_TO_RESP2_LEGACY_MODULE_CALLBACKS.get(cmd)
else:
cb = self._RESP2_MODULE_CALLBACKS.get(cmd)
else:
if check_protocol_version(protocol, 3):
cb = self._RESP3_UNIFIED_MODULE_CALLBACKS.get(cmd)
else:
cb = self._RESP2_UNIFIED_MODULE_CALLBACKS.get(cmd)
if cb is None:
return res
return cb(res, **kwargs)
@staticmethod
def _resp3_get(mapping, key, default=None):
if not isinstance(mapping, dict):
return default
return mapping.get(key, mapping.get(key.encode(), default))
@staticmethod
def _flatten_resp3_mapping(mapping):
if not isinstance(mapping, dict):
return mapping
flat = []
for key, value in mapping.items():
flat.append(str_if_bytes(key))
flat.append(value)
return flat
def _pipeline_parse_search_resp3_to_legacy(self, res, **kwargs):
"""Convert RESP3 FT.SEARCH pipeline output to raw RESP2 pipeline shape."""
query = kwargs.get("query")
if query is None or not isinstance(res, dict):
return res
output = [self._resp3_get(res, "total_results", 0)]
for item in self._resp3_get(res, "results", []):
output.append(self._resp3_get(item, "id"))
if query._with_scores:
output.append(self._resp3_get(item, "score"))
if query._with_payloads:
output.append(self._resp3_get(item, "payload"))
if not query._no_content:
output.append(
self._flatten_resp3_mapping(
self._resp3_get(item, "extra_attributes", {})
)
)
return output
def _pipeline_parse_hybrid_search_resp3_to_legacy(self, res, **kwargs):
"""Convert RESP3 FT.HYBRID pipeline output to raw RESP2 pipeline shape."""
if not isinstance(res, dict):
return res
res = {str_if_bytes(key): value for key, value in res.items()}
if "cursor" in kwargs:
return ["SEARCH", res.get("SEARCH"), "VSIM", res.get("VSIM")]
results = [
self._flatten_resp3_mapping(item) if isinstance(item, dict) else item
for item in res.get("results", [])
]
return [
"total_results",
res.get("total_results", 0),
"results",
results,
"warnings",
res.get("warnings", []),
"execution_time",
res.get("execution_time", 0),
]
# ---- RESP2 legacy parsers ----
def _parse_info(self, res, **kwargs):
it = map(str_if_bytes, res)
return dict(zip(it, it))
def _parse_search(self, res, **kwargs):
return Result(
res,
not kwargs["query"]._no_content,
duration=kwargs["duration"],
has_payload=kwargs["query"]._with_payloads,
with_scores=kwargs["query"]._with_scores,
field_encodings=kwargs["query"]._return_fields_decode_as,
)
def _parse_hybrid_search(self, res, **kwargs):
res_dict = pairs_to_dict(res, decode_keys=True)
if "cursor" in kwargs:
return HybridCursorResult(
search_cursor_id=int(res_dict["SEARCH"]),
vsim_cursor_id=int(res_dict["VSIM"]),
)
results: List[Dict[str, Any]] = []
# the original results are a list of lists
# we convert them to a list of dicts
for res_item in res_dict["results"]:
item_dict = pairs_to_dict(res_item, decode_keys=True)
results.append(item_dict)
return HybridResult(
total_results=int(res_dict["total_results"]),
results=results,
warnings=res_dict["warnings"],
execution_time=float(res_dict["execution_time"]),
)
def _parse_aggregate(self, res, **kwargs):
return self._get_aggregate_result(res, kwargs["query"], kwargs["has_cursor"])
def _parse_profile(self, res, **kwargs):
query = kwargs["query"]
if isinstance(query, AggregateRequest):
result = self._get_aggregate_result(res[0], query, query._cursor)
else:
result = Result(
res[0],
not query._no_content,
duration=kwargs["duration"],
has_payload=query._with_payloads,
with_scores=query._with_scores,
)
return result, ProfileInformation(res[1])
def _parse_spellcheck(self, res, **kwargs):
corrections = {}
if res == 0:
return corrections
for _correction in res:
if isinstance(_correction, int) and _correction == 0:
continue
if len(_correction) != 3:
continue
if not _correction[2]:
continue
if not _correction[2][0]:
continue
# For spellcheck output
# 1) 1) "TERM"
# 2) "{term1}"
# 3) 1) 1) "{score1}"
# 2) "{suggestion1}"
# 2) 1) "{score2}"
# 2) "{suggestion2}"
#
# Following dictionary will be made
# corrections = {
# '{term1}': [
# {'score': '{score1}', 'suggestion': '{suggestion1}'},
# {'score': '{score2}', 'suggestion': '{suggestion2}'}
# ]
# }
corrections[_correction[1]] = [
{"score": _item[0], "suggestion": _item[1]} for _item in _correction[2]
]
return corrections
def _parse_config_get(self, res, **kwargs):
return {kvs[0]: kvs[1] for kvs in res} if res else {}
def _parse_syndump(self, res, **kwargs):
return {res[i]: res[i + 1] for i in range(0, len(res), 2)}
# ---- RESP2 unified parsers (legacy_responses=False) ----
# Known FT.INFO attribute keys that are followed by a value
# (key-value pairs in the RESP2 flat list).
_INFO_ATTR_PAIR_KEYS = frozenset(
{"identifier", "attribute", "type", "WEIGHT", "SEPARATOR", "PHONETIC"}
)
@staticmethod
def _normalize_info_attribute(attr_list):
"""Convert a RESP2 flat attribute list into a RESP3-style dict.
RESP2 format: ``[identifier, name, attribute, alias, type, TEXT,
WEIGHT, 1, SORTABLE, NOSTEM]``.
RESP3 format: ``{"identifier": name, "attribute": alias, "type":
"TEXT", "WEIGHT": "1", "flags": ["SORTABLE", "NOSTEM"]}``.
"""
result = {}
flags = []
pair_keys = SearchCommands._INFO_ATTR_PAIR_KEYS
i = 0
while i < len(attr_list):
key = str_if_bytes(attr_list[i])
if key in pair_keys and i + 1 < len(attr_list):
result[key] = str_if_bytes(attr_list[i + 1])
i += 2
else:
flags.append(key)
i += 1
result["flags"] = flags
return result
def _parse_info_unified(self, res, **kwargs):
"""Parse FT.INFO into the unified shape with ``attributes`` as a
list of dicts so RESP2 output matches RESP3 output.
"""
it = map(str_if_bytes, res)
info = dict(zip(it, it))
if "attributes" in info and isinstance(info["attributes"], list):
info["attributes"] = [
self._normalize_info_attribute(attr) if isinstance(attr, list) else attr
for attr in info["attributes"]
]
return info
def _parse_hybrid_search_unified(self, res, **kwargs):
res_dict = pairs_to_dict(res, decode_keys=True)
if "cursor" in kwargs:
return HybridCursorResult(
search_cursor_id=int(res_dict["SEARCH"]),
vsim_cursor_id=int(res_dict["VSIM"]),
)
field_encodings = self._hybrid_field_encodings(**kwargs)
results: List[Dict[str, Any]] = []
for res_item in res_dict["results"]:
item_dict = pairs_to_dict(res_item, decode_keys=True)
results.append(
{
key: self._decode_hybrid_field_value(value, key, field_encodings)
for key, value in item_dict.items()
}
)
return HybridResult(
total_results=int(res_dict["total_results"]),
results=results,
warnings=res_dict["warnings"],
execution_time=float(res_dict["execution_time"]),
)
def _parse_profile_unified(self, res, **kwargs):
"""Parse FT.PROFILE into ``(result, ProfileInformation)`` with
the profile_data normalised to a dict on >= 7.9.0 servers.
"""
query = kwargs["query"]
if isinstance(query, AggregateRequest):
result = self._get_aggregate_result(res[0], query, query._cursor)
else:
result = Result(
res[0],
not query._no_content,
duration=kwargs["duration"],
has_payload=query._with_payloads,
with_scores=query._with_scores,
)
profile_data = res[1]
# >= 7.9.0 servers return a flat ``[key, value, ...]`` list at the
# top level; convert to dict to match the RESP3 profile shape.
# < 7.9.0 servers return a list-of-pairs whose first element is
# itself a list — leave as-is.
if (
isinstance(profile_data, list)
and profile_data
and isinstance(profile_data[0], (str, bytes))
):
profile_data = pairs_to_dict(profile_data, decode_keys=True)
return result, ProfileInformation(profile_data)
def _parse_config_get_unified(self, res, **kwargs):
if not res:
return {}
return {str_if_bytes(kvs[0]): str_if_bytes(kvs[1]) for kvs in res}
def _parse_syndump_unified(self, res, **kwargs):
if not res:
return {}
return {
str_if_bytes(res[i]): [str_if_bytes(s) for s in res[i + 1]]
if isinstance(res[i + 1], list)
else str_if_bytes(res[i + 1])
for i in range(0, len(res), 2)
}
# ---- RESP3 shared result parsers ----
def _parse_search_resp3(self, res, **kwargs):
"""Parse RESP3 FT.SEARCH response into a Result object."""
query = kwargs.get("query")
return Result.from_resp3(
res,
duration=kwargs.get("duration", 0),
with_scores=getattr(query, "_with_scores", False),
field_encodings=getattr(query, "_return_fields_decode_as", None),
)
def _parse_aggregate_resp3(self, res, **kwargs):
"""Parse RESP3 FT.AGGREGATE response into an AggregateResult object."""
query = kwargs.get("query")
has_cursor = kwargs.get("has_cursor", False)
# When has_cursor is True, RESP3 returns [data_dict, cursor_id].
cursor_id = 0
if has_cursor and isinstance(res, list):
data = res[0]
cursor_id = res[1] if len(res) > 1 else 0
else:
data = res
warnings = [str_if_bytes(w) for w in data.get("warning", [])]
total = data.get("total_results", 0)
rows = []
for result_item in data.get("results", []):
extra_attrs = result_item.get("extra_attributes", {})
# Convert dict to flat list [key, value, key, value, ...]
# to match RESP2 row format consumers expect.
flat = []
for k, v in extra_attrs.items():
flat.append(k)
flat.append(v)
rows.append(flat)
cursor = None
if has_cursor:
if isinstance(query, Cursor):
query.cid = cursor_id
cursor = query
else:
cursor = Cursor(cursor_id)
return AggregateResult(rows, cursor, None, total=total, warnings=warnings)
# ---- RESP3 HYBRID parsers ----
def _parse_hybrid_search_resp3(self, res, **kwargs):
"""Parse RESP3 FT.HYBRID response into HybridResult/HybridCursorResult.
Top-level keys are normalised to strings. Values are preserved
as delivered by the wire (bytes when ``NEVER_DECODE`` is set,
strings otherwise) so byte/str semantics match the RESP2 legacy
parser.
"""
res = {str_if_bytes(k): v for k, v in res.items()}
if "cursor" in kwargs:
return HybridCursorResult(
search_cursor_id=int(res["SEARCH"]),
vsim_cursor_id=int(res["VSIM"]),
)
results: List[Dict[str, Any]] = []
for res_item in res.get("results", []):
if isinstance(res_item, dict):
results.append({str_if_bytes(k): v for k, v in res_item.items()})
else:
results.append(pairs_to_dict(res_item, decode_keys=True))
return HybridResult(
total_results=int(res.get("total_results", 0)),
results=results,
warnings=res.get("warnings", []),
execution_time=float(res.get("execution_time", 0)),
)
def _parse_hybrid_search_resp3_unified(self, res, **kwargs):
"""Parse RESP3 FT.HYBRID into the approved unified HybridResult."""
res = {str_if_bytes(k): v for k, v in res.items()}
if "cursor" in kwargs:
return HybridCursorResult(
search_cursor_id=int(res["SEARCH"]),
vsim_cursor_id=int(res["VSIM"]),
)
field_encodings = self._hybrid_field_encodings(**kwargs)
results: List[Dict[str, Any]] = []
for res_item in res.get("results", []):
if isinstance(res_item, dict):
results.append(
{
str_if_bytes(key): self._decode_hybrid_field_value(
value, str_if_bytes(key), field_encodings
)
for key, value in res_item.items()
}
)
else:
item_dict = pairs_to_dict(res_item, decode_keys=True)
results.append(
{
key: self._decode_hybrid_field_value(
value, key, field_encodings
)
for key, value in item_dict.items()
}
)
return HybridResult(
total_results=int(res.get("total_results", 0)),
results=results,
warnings=res.get("warnings", []),
execution_time=float(res.get("execution_time", 0)),
)
@staticmethod
def _hybrid_field_encodings(**kwargs):
encodings = {}
for source_name in ("query", "post_processing"):
source = kwargs.get(source_name)
source_encodings = getattr(source, "_return_fields_decode_as", None)
if source_encodings:
encodings.update(source_encodings)
return encodings or None
@staticmethod
def _decode_hybrid_field_value(value, key, field_encodings):
if not field_encodings or key not in field_encodings:
return value
return decode_field_value(value, key, field_encodings)
def _parse_hybrid_search_resp3_native(self, res, **kwargs):
"""Normalise RESP3 FT.HYBRID map keys while preserving native shape.
``protocol=3`` + ``legacy_responses=True`` keeps the RESP3 dict
surface, but HYBRID uses ``NEVER_DECODE`` so result values mirror
legacy RESP2 bytes. Decode only structural keys so callers can use
the same native RESP3 key names as before.
"""
res = {str_if_bytes(k): v for k, v in res.items()}
if "cursor" in kwargs:
return res
if "results" in res:
res["results"] = [
{str_if_bytes(k): v for k, v in item.items()}
if isinstance(item, dict)
else pairs_to_dict(item, decode_keys=True)
for item in res["results"]
]
if "warnings" in res:
res["warnings"] = [str_if_bytes(w) for w in res["warnings"]]
return res
# ---- RESP3 spellcheck parser ----
def _parse_spellcheck_resp3(self, res, **kwargs):
"""Parse RESP3 FT.SPELLCHECK response into unified format.
RESP3 format:
{"results": {"term": [{"suggestion": score}, ...], ...}}
Unified format (matches RESP2 parsed output):
{"term": [{"score": score_str, "suggestion": suggestion}, ...], ...}
"""
if not isinstance(res, dict):
return self._parse_spellcheck(res, **kwargs)
corrections = {}
results = res.get("results", {})
for term, suggestions in results.items():
if not suggestions:
continue
term_corrections = []
for suggestion_dict in suggestions:
for suggestion, score in suggestion_dict.items():
# Normalize score to match RESP2's string form: RESP3
# returns a float (e.g. ``0.0``) but RESP2 returns the
# string ``"0"``.
score_str = str(score)
if score_str.endswith(".0"):
score_str = score_str[:-2]
term_corrections.append(
{"score": score_str, "suggestion": str(suggestion)}
)
if term_corrections:
corrections[term] = term_corrections
return corrections
# ---- RESP3 profile parsers ----
def _extract_resp3_profile_parts(self, res, **kwargs):
"""Extract ``(result, profile_data_dict)`` from a RESP3 FT.PROFILE
response. ``profile_data_dict`` has its keys/values normalised
to strings but is otherwise left as the native RESP3 dict.
"""
query = kwargs["query"]
# RESP3 returns a dict with "Results" and "Profile" keys. Handle
# both decoded (str) and raw (bytes) keys. Use ``is not None`` to
# avoid dropping falsy values such as empty dicts/lists.
results_data = res.get("Results")
if results_data is None:
results_data = res.get(b"Results")
if results_data is None:
results_data = res.get("results")
if results_data is None:
results_data = res.get(b"results")
if results_data is None:
results_data = res.get(0)
profile_data = res.get("Profile")
if profile_data is None:
profile_data = res.get(b"Profile")
if profile_data is None:
profile_data = res.get("profile")
if profile_data is None:
profile_data = res.get(b"profile")
if profile_data is None:
profile_data = res.get(1)
# On older servers (pre MOD-6816, e.g. Redis 7.2/7.4) the "Results"
# value is a bare list of result-item dicts, not the wrapper dict
# ``{"total_results": N, "results": [...], "warning": [...]}``.
# Wrap the list so downstream parsers receive the expected format.
if isinstance(results_data, list):
results_data = {
"total_results": len(results_data),
"results": results_data,
}
if isinstance(query, AggregateRequest):
result = self._parse_aggregate_resp3(
results_data, query=query, has_cursor=bool(query._cursor)
)
else:
result = Result.from_resp3(
results_data,
duration=kwargs.get("duration", 0),
with_scores=getattr(query, "_with_scores", False),
)
profile_data = self._to_string_recursive(profile_data)
return result, profile_data
def _parse_profile_resp3(self, res, **kwargs):
"""Parse RESP3 FT.PROFILE response into ``(result, ProfileInformation)``.
RESP3 format (aligned, RediSearch >= MOD-6816):
{"Results": {search/aggregate result dict},
"Profile": {profile information dict}}
Older RediSearch versions may return a list (same as RESP2) even
when the connection uses RESP3. In that case we delegate to the
RESP2 ``_parse_profile`` parser.
"""
if isinstance(res, list):
return self._parse_profile(res, **kwargs)
result, profile_data = self._extract_resp3_profile_parts(res, **kwargs)
# Convert the RESP3 profile dict to the RESP2 list shape so
# consumers see the same structure as the RESP2 wire path.
# Post-7.9.0 servers return a top-level ``{"Shards": ...,
# "Coordinator": ...}`` dict which RESP2 wires as a flat
# alternating ``[key, value, key, value]`` list. Pre-7.9.0
# servers return a nested list-of-pairs.
if isinstance(profile_data, dict):
flat_top = "Shards" in profile_data or "Coordinator" in profile_data
profile_data = self._resp3_profile_dict_to_list(
profile_data, flat_top=flat_top
)
return result, ProfileInformation(profile_data)
def _parse_profile_resp3_unified(self, res, **kwargs):
"""Parse RESP3 FT.PROFILE for the unified shape.
Redis < 7.9.0 returns RESP2 profile data as nested list-of-pairs,
while RESP3 returns the same data as a dict. Convert that pre-7.9
RESP3 dict back to the RESP2 list shape so the unified surface is
protocol-independent. Redis >= 7.9.0 coordinator profiles contain
``Shards``/``Coordinator`` keys and stay as dicts, matching the
RESP2 unified parser.
"""
if isinstance(res, list):
return self._parse_profile_unified(res, **kwargs)
result, profile_data = self._extract_resp3_profile_parts(res, **kwargs)
if isinstance(profile_data, dict) and not (
"Shards" in profile_data or "Coordinator" in profile_data
):
profile_data = self._resp3_profile_dict_to_list(profile_data)
return result, ProfileInformation(profile_data)
@staticmethod
def _resp3_profile_dict_to_list(data, flat_top=False):
"""Convert a RESP3 profile dict into the RESP2 wire list shape.
Pre-7.9.0 servers serialise the RESP2 profile as a list of
``[key, value]`` pairs at the top level, with nested structures
as flat alternating key-value lists. Post-7.9.0 servers serialise
the top level itself as a flat ``[key, value, key, value]`` list
(with ``"Shards"``/``"Coordinator"`` keys). ``flat_top`` selects
between these two top-level shapes; nested dicts always use the
flat alternating form.
Key structural difference: when a dict value is a list of dicts
(e.g. ``"Child iterators": [{...}, {...}]``), RESP2 expands each
dict as a separate sibling element in the parent flat list rather
than keeping them nested inside a single value.
"""
def _is_list_of_dicts(obj):
return isinstance(obj, list) and obj and isinstance(obj[0], dict)
def _convert(obj, top_level=False):
if isinstance(obj, dict):
if top_level and not flat_top:
result = []
for k, v in obj.items():
entry = [k]
if _is_list_of_dicts(v):
for item in v:
entry.append(_convert(item))
else:
entry.append(_convert(v))
result.append(entry)
return result
else:
result = []
for k, v in obj.items():
result.append(k)
if _is_list_of_dicts(v):
for item in v:
result.append(_convert(item))
else:
result.append(_convert(v))
return result
elif isinstance(obj, list):
return [_convert(item) for item in obj]
return obj
return _convert(data, top_level=True)
# ---- RESP3 structural parsers ----
@staticmethod
def _to_string_recursive(obj):
"""Recursively convert bytes keys/values to strings in nested
structures. Non-bytes scalars (int, float, None, bool) pass
through unchanged.
"""
if isinstance(obj, bytes):
return str_if_bytes(obj)
if isinstance(obj, dict):
return {
str_if_bytes(k) if isinstance(k, bytes) else k: (
SearchCommands._to_string_recursive(v)
)
for k, v in obj.items()
}
if isinstance(obj, list):
return [SearchCommands._to_string_recursive(item) for item in obj]
return obj
def _parse_info_resp3(self, res, **kwargs):
"""Parse RESP3 FT.INFO response, normalising bytes to strings."""
return self._to_string_recursive(res)
@staticmethod
def _flatten_info_attribute(attr_dict):
"""Convert a RESP3-style attribute dict back into the RESP2 flat
list (key-value pairs followed by bare flag tokens).
"""
flat = []
flags = attr_dict.get("flags") or []
for k, v in attr_dict.items():
if k == "flags":
continue
flat.append(k)
flat.append(v)
flat.extend(flags)
return flat
def _parse_info_resp3_to_legacy(self, res, **kwargs):
"""Parse RESP3 FT.INFO into the legacy RESP2 flat shape so
``legacy_responses=True`` on a RESP3 wire matches RESP2 output.
"""
info = self._to_string_recursive(res)
attrs = info.get("attributes")
if isinstance(attrs, list):
info["attributes"] = [
self._flatten_info_attribute(attr) if isinstance(attr, dict) else attr
for attr in attrs
]
return info
def _parse_config_get_resp3(self, res, **kwargs):
"""Parse RESP3 FT.CONFIG GET response, normalising bytes to strings."""
if not res:
return {}
return {str_if_bytes(k): str_if_bytes(v) for k, v in res.items()}
def _parse_config_get_resp3_to_legacy(self, res, **kwargs):
"""Parse RESP3 FT.CONFIG GET back to the RESP2 legacy dict shape."""
return dict(res) if res else {}
def _parse_syndump_resp3(self, res, **kwargs):
"""Parse RESP3 FT.SYNDUMP response, normalising bytes to strings."""
if not res:
return {}
return self._to_string_recursive(res)
[docs] def batch_indexer(self, chunk_size=100):
"""
Create a new batch indexer from the client with a given chunk size
"""
return self.BatchIndexer(self, chunk_size=chunk_size)
[docs] def create_index(
self,
fields: List[Field],
no_term_offsets: bool = False,
no_field_flags: bool = False,
stopwords: Optional[List[str]] = None,
definition: Optional[IndexDefinition] = None,
max_text_fields=False,
temporary=None,
no_highlight: bool = False,
no_term_frequencies: bool = False,
skip_initial_scan: bool = False,
):
"""
Creates the search index. The index must not already exist.
For more information, see https://redis.io/commands/ft.create/
Args:
fields: A list of Field objects.
no_term_offsets: If `true`, term offsets will not be saved in the index.
no_field_flags: If true, field flags that allow searching in specific fields
will not be saved.
stopwords: If provided, the index will be created with this custom stopword
list. The list can be empty.
definition: If provided, the index will be created with this custom index
definition.
max_text_fields: If true, indexes will be encoded as if there were more than
32 text fields, allowing for additional fields beyond 32.
temporary: Creates a lightweight temporary index which will expire after the
specified period of inactivity. The internal idle timer is reset
whenever the index is searched or added to.
no_highlight: If true, disables highlighting support. Also implied by
`no_term_offsets`.
no_term_frequencies: If true, term frequencies will not be saved in the
index.
skip_initial_scan: If true, the initial scan and indexing will be skipped.
"""
args = [CREATE_CMD, self.index_name]
if definition is not None:
args += definition.args
if max_text_fields:
args.append(MAXTEXTFIELDS)
if temporary is not None and isinstance(temporary, int):
args.append(TEMPORARY)
args.append(temporary)
if no_term_offsets:
args.append(NOOFFSETS)
if no_highlight:
args.append(NOHL)
if no_field_flags:
args.append(NOFIELDS)
if no_term_frequencies:
args.append(NOFREQS)
if skip_initial_scan:
args.append(SKIPINITIALSCAN)
if stopwords is not None and isinstance(stopwords, (list, tuple, set)):
args += [STOPWORDS, len(stopwords)]
if len(stopwords) > 0:
args += list(stopwords)
args.append("SCHEMA")
try:
args += list(itertools.chain(*(f.redis_args() for f in fields)))
except TypeError:
args += fields.redis_args()
return self.execute_command(*args)
[docs] def alter_schema_add(self, fields: Union[Field, List[Field]]):
"""
Alter the existing search index by adding new fields. The index
must already exist.
### Parameters:
- **fields**: a list of Field objects to add for the index
For more information see `FT.ALTER <https://redis.io/commands/ft.alter>`_.
""" # noqa
args = [ALTER_CMD, self.index_name, "SCHEMA", "ADD"]
try:
args += list(itertools.chain(*(f.redis_args() for f in fields)))
except TypeError:
args += fields.redis_args()
return self.execute_command(*args)
[docs] def dropindex(self, delete_documents: bool = False):
"""
Drop the index if it exists.
Replaced `drop_index` in RediSearch 2.0.
Default behavior was changed to not delete the indexed documents.
### Parameters:
- **delete_documents**: If `True`, all documents will be deleted.
For more information see `FT.DROPINDEX <https://redis.io/commands/ft.dropindex>`_.
""" # noqa
args = [DROPINDEX_CMD, self.index_name]
delete_str = (
"DD"
if isinstance(delete_documents, bool) and delete_documents is True
else ""
)
if delete_str:
args.append(delete_str)
return self.execute_command(*args)
def _add_document(
self,
doc_id,
conn=None,
nosave=False,
score=1.0,
payload=None,
replace=False,
partial=False,
language=None,
no_create=False,
**fields,
):
"""
Internal add_document used for both batch and single doc indexing
"""
if partial or no_create:
replace = True
args = [ADD_CMD, self.index_name, doc_id, score]
if nosave:
args.append("NOSAVE")
if payload is not None:
args.append("PAYLOAD")
args.append(payload)
if replace:
args.append("REPLACE")
if partial:
args.append("PARTIAL")
if no_create:
args.append("NOCREATE")
if language:
args += ["LANGUAGE", language]
args.append("FIELDS")
args += list(itertools.chain(*fields.items()))
if conn is not None:
return conn.execute_command(*args)
return self.execute_command(*args)
def _add_document_hash(
self, doc_id, conn=None, score=1.0, language=None, replace=False
):
"""
Internal add_document_hash used for both batch and single doc indexing
"""
args = [ADDHASH_CMD, self.index_name, doc_id, score]
if replace:
args.append("REPLACE")
if language:
args += ["LANGUAGE", language]
if conn is not None:
return conn.execute_command(*args)
return self.execute_command(*args)
[docs] @deprecated_function(
version="2.0.0", reason="deprecated since redisearch 2.0, call hset instead"
)
def add_document(
self,
doc_id: str,
nosave: bool = False,
score: float = 1.0,
payload: Optional[bool] = None,
replace: bool = False,
partial: bool = False,
language: Optional[str] = None,
no_create: bool = False,
**fields: List[str],
):
"""
Add a single document to the index.
Args:
doc_id: the id of the saved document.
nosave: if set to true, we just index the document, and don't
save a copy of it. This means that searches will just
return ids.
score: the document ranking, between 0.0 and 1.0
payload: optional inner-index payload we can save for fast
access in scoring functions
replace: if True, and the document already is in the index,
we perform an update and reindex the document
partial: if True, the fields specified will be added to the
existing document.
This has the added benefit that any fields specified
with `no_index`
will not be reindexed again. Implies `replace`
language: Specify the language used for document tokenization.
no_create: if True, the document is only updated and reindexed
if it already exists.
If the document does not exist, an error will be
returned. Implies `replace`
fields: kwargs dictionary of the document fields to be saved
and/or indexed.
NOTE: Geo points shoule be encoded as strings of "lon,lat"
""" # noqa
return self._add_document(
doc_id,
conn=None,
nosave=nosave,
score=score,
payload=payload,
replace=replace,
partial=partial,
language=language,
no_create=no_create,
**fields,
)
[docs] @deprecated_function(
version="2.0.0", reason="deprecated since redisearch 2.0, call hset instead"
)
def add_document_hash(self, doc_id, score=1.0, language=None, replace=False):
"""
Add a hash document to the index.
### Parameters
- **doc_id**: the document's id. This has to be an existing HASH key
in Redis that will hold the fields the index needs.
- **score**: the document ranking, between 0.0 and 1.0
- **replace**: if True, and the document already is in the index, we
perform an update and reindex the document
- **language**: Specify the language used for document tokenization.
""" # noqa
return self._add_document_hash(
doc_id, conn=None, score=score, language=language, replace=replace
)
[docs] @deprecated_function(version="2.0.0", reason="deprecated since redisearch 2.0")
def delete_document(self, doc_id, conn=None, delete_actual_document=False):
"""
Delete a document from index
Returns 1 if the document was deleted, 0 if not
### Parameters
- **delete_actual_document**: if set to True, RediSearch also delete
the actual document if it is in the index
""" # noqa
args = [DEL_CMD, self.index_name, doc_id]
if delete_actual_document:
args.append("DD")
if conn is not None:
return conn.execute_command(*args)
return self.execute_command(*args)
[docs] def load_document(self, id, field_encodings: Optional[Dict[str, Any]] = None):
"""
Load a single document by id
- **field_encodings**: optional dict mapping field names to encodings.
If a field's encoding is ``None`` the raw bytes value is preserved
(useful for binary data such as vectors).
"""
fields = self.client.hgetall(id)
fields = {
str_if_bytes(k): decode_field_value(v, str_if_bytes(k), field_encodings)
for k, v in fields.items()
}
try:
del fields["id"]
except KeyError:
pass
return Document(id=id, **fields)
[docs] @deprecated_function(version="2.0.0", reason="deprecated since redisearch 2.0")
def get(self, *ids):
"""
Returns the full contents of multiple documents.
### Parameters
- **ids**: the ids of the saved documents.
"""
return self.execute_command(MGET_CMD, self.index_name, *ids)
[docs] def info(self):
"""
Get info an stats about the the current index, including the number of
documents, memory consumption, etc
For more information see `FT.INFO <https://redis.io/commands/ft.info>`_.
"""
res = self.execute_command(INFO_CMD, self.index_name)
return self._parse_results(INFO_CMD, res)
def get_params_args(
self, query_params: Optional[Dict[str, Union[str, int, float, bytes]]]
):
if query_params is None:
return []
args = []
if len(query_params) > 0:
args.append("PARAMS")
args.append(len(query_params) * 2)
for key, value in query_params.items():
args.append(key)
args.append(value)
return args
def _mk_query_args(
self, query, query_params: Optional[Dict[str, Union[str, int, float, bytes]]]
):
args = [self.index_name]
if isinstance(query, str):
# convert the query from a text to a query object
query = Query(query)
if not isinstance(query, Query):
raise ValueError(f"Bad query type {type(query)}")
args += query.get_args()
args += self.get_params_args(query_params)
return args, query
[docs] def search(
self,
query: Union[str, Query],
query_params: Union[Dict[str, Union[str, int, float, bytes]], None] = None,
):
"""
Search the index for a given query, and return a result of documents
### Parameters
- **query**: the search query. Either a text for simple queries with
default parameters, or a Query object for complex queries.
See RediSearch's documentation on query format
For more information see `FT.SEARCH <https://redis.io/commands/ft.search>`_.
""" # noqa
args, query = self._mk_query_args(query, query_params=query_params)
st = time.monotonic()
options = {}
if not check_protocol_version(get_protocol_version(self.client), 3):
options[NEVER_DECODE] = True
if isinstance(self, Pipeline):
options["query"] = query
options["duration"] = 0
res = self.execute_command(SEARCH_CMD, *args, **options)
if isinstance(res, Pipeline):
return res
return self._parse_results(
SEARCH_CMD, res, query=query, duration=(time.monotonic() - st) * 1000.0
)
[docs] @experimental_method()
def hybrid_search(
self,
query: HybridQuery,
combine_method: Optional[CombineResultsMethod] = None,
post_processing: Optional[HybridPostProcessingConfig] = None,
params_substitution: Optional[Dict[str, Union[str, int, float, bytes]]] = None,
timeout: Optional[int] = None,
cursor: Optional[HybridCursorQuery] = None,
) -> Union[HybridResult, HybridCursorResult, Pipeline]:
"""
Execute a hybrid search using both text and vector queries
Args:
- **query**: HybridQuery object
Contains the text and vector queries
- **combine_method**: CombineResultsMethod object
Contains the combine method and parameters
- **post_processing**: HybridPostProcessingConfig object
Contains the post processing configuration
- **params_substitution**: Dict[str, Union[str, int, float, bytes]]
Contains the parameters substitution
- **timeout**: int - contains the timeout in milliseconds
- **cursor**: HybridCursorQuery object - contains the cursor configuration
For more information see `FT.SEARCH <https://redis.io/commands/ft.hybrid>`.
"""
index = self.index_name
options = {}
pieces = [HYBRID_CMD, index]
pieces.extend(query.get_args())
if combine_method:
pieces.extend(combine_method.get_args())
if post_processing:
pieces.extend(post_processing.build_args())
options["post_processing"] = post_processing
if params_substitution:
pieces.extend(self.get_params_args(params_substitution))
if timeout:
pieces.extend(("TIMEOUT", timeout))
if cursor:
options["cursor"] = True
pieces.extend(cursor.build_args())
# Preserve HYBRID result values as bytes by default, matching the
# legacy RESP2 Search surface; selected LOAD fields can opt into
# decoding through HybridPostProcessingConfig.load(..., decode_field=True).
options[NEVER_DECODE] = True
options["query"] = query
res = self.execute_command(*pieces, **options)
if isinstance(res, Pipeline):
return res
return self._parse_results(HYBRID_CMD, res, **options)
[docs] def explain(
self,
query: Union[str, Query],
query_params: Optional[Dict[str, Union[str, int, float, bytes]]] = None,
):
"""Returns the execution plan for a complex query.
For more information see `FT.EXPLAIN <https://redis.io/commands/ft.explain>`_.
""" # noqa
args, query_text = self._mk_query_args(query, query_params=query_params)
return self.execute_command(EXPLAIN_CMD, *args)
def explain_cli(self, query: Union[str, Query]): # noqa
raise NotImplementedError("EXPLAINCLI will not be implemented.")
[docs] def aggregate(
self,
query: Union[AggregateRequest, Cursor],
query_params: Optional[Dict[str, Union[str, int, float, bytes]]] = None,
):
"""
Issue an aggregation query.
### Parameters
**query**: This can be either an `AggregateRequest`, or a `Cursor`
An `AggregateResult` object is returned. You can access the rows from
its `rows` property, which will always yield the rows of the result.
For more information see `FT.AGGREGATE <https://redis.io/commands/ft.aggregate>`_.
""" # noqa
if isinstance(query, AggregateRequest):
has_cursor = bool(query._cursor)
cmd = [AGGREGATE_CMD, self.index_name] + query.build_args()
elif isinstance(query, Cursor):
has_cursor = True
cmd = [CURSOR_CMD, "READ", self.index_name] + query.build_args()
else:
raise ValueError("Bad query", query)
cmd += self.get_params_args(query_params)
raw = self.execute_command(*cmd)
return self._parse_results(
AGGREGATE_CMD, raw, query=query, has_cursor=has_cursor
)
def _get_aggregate_result(
self, raw: List, query: Union[AggregateRequest, Cursor], has_cursor: bool
):
if has_cursor:
if isinstance(query, Cursor):
query.cid = raw[1]
cursor = query
else:
cursor = Cursor(raw[1])
raw = raw[0]
else:
cursor = None
if isinstance(query, AggregateRequest) and query._with_schema:
schema = raw[0]
rows = raw[2:]
else:
schema = None
rows = raw[1:]
return AggregateResult(rows, cursor, schema)
[docs] def profile(
self,
query: Union[Query, AggregateRequest],
limited: bool = False,
query_params: Optional[Dict[str, Union[str, int, float, bytes]]] = None,
) -> Union[
tuple[Union[Result, AggregateResult], ProfileInformation],
ProfileInformation,
]:
"""
Performs a search or aggregate command and collects performance
information.
### Parameters
**query**: This can be either an `AggregateRequest` or `Query`.
**limited**: If set to True, removes details of reader iterator.
**query_params**: Define one or more value parameters.
Each parameter has a name and a value.
"""
st = time.monotonic()
cmd = [PROFILE_CMD, self.index_name, ""]
if limited:
cmd.append("LIMITED")
cmd.append("QUERY")
if isinstance(query, AggregateRequest):
cmd[2] = "AGGREGATE"
cmd += query.build_args()
elif isinstance(query, Query):
cmd[2] = "SEARCH"
cmd += query.get_args()
cmd += self.get_params_args(query_params)
else:
raise ValueError("Must provide AggregateRequest object or Query object.")
res = self.execute_command(*cmd)
return self._parse_results(
PROFILE_CMD, res, query=query, duration=(time.monotonic() - st) * 1000.0
)
[docs] def spellcheck(self, query, distance=None, include=None, exclude=None):
"""
Issue a spellcheck query
Args:
query: search query.
distance: the maximal Levenshtein distance for spelling
suggestions (default: 1, max: 4).
include: specifies an inclusion custom dictionary.
exclude: specifies an exclusion custom dictionary.
For more information see `FT.SPELLCHECK <https://redis.io/commands/ft.spellcheck>`_.
""" # noqa
cmd = [SPELLCHECK_CMD, self.index_name, query]
if distance:
cmd.extend(["DISTANCE", distance])
if include:
cmd.extend(["TERMS", "INCLUDE", include])
if exclude:
cmd.extend(["TERMS", "EXCLUDE", exclude])
res = self.execute_command(*cmd)
return self._parse_results(SPELLCHECK_CMD, res)
[docs] def dict_add(self, name: str, *terms: List[str]):
"""Adds terms to a dictionary.
### Parameters
- **name**: Dictionary name.
- **terms**: List of items for adding to the dictionary.
For more information see `FT.DICTADD <https://redis.io/commands/ft.dictadd>`_.
""" # noqa
cmd = [DICT_ADD_CMD, name]
cmd.extend(terms)
return self.execute_command(*cmd)
[docs] def dict_del(self, name: str, *terms: List[str]):
"""Deletes terms from a dictionary.
### Parameters
- **name**: Dictionary name.
- **terms**: List of items for removing from the dictionary.
For more information see `FT.DICTDEL <https://redis.io/commands/ft.dictdel>`_.
""" # noqa
cmd = [DICT_DEL_CMD, name]
cmd.extend(terms)
return self.execute_command(*cmd)
[docs] def dict_dump(self, name: str):
"""Dumps all terms in the given dictionary.
### Parameters
- **name**: Dictionary name.
For more information see `FT.DICTDUMP <https://redis.io/commands/ft.dictdump>`_.
""" # noqa
cmd = [DICT_DUMP_CMD, name]
return self.execute_command(*cmd)
[docs] @deprecated_function(
version="8.0.0",
reason="deprecated since Redis 8.0, call config_set from core module instead",
)
def config_set(self, option: str, value: str) -> bool:
"""Set runtime configuration option.
### Parameters
- **option**: the name of the configuration option.
- **value**: a value for the configuration option.
For more information see `FT.CONFIG SET <https://redis.io/commands/ft.config-set>`_.
""" # noqa
cmd = [CONFIG_CMD, "SET", option, value]
raw = self.execute_command(*cmd)
return raw == "OK"
[docs] @deprecated_function(
version="8.0.0",
reason="deprecated since Redis 8.0, call config_get from core module instead",
)
def config_get(self, option: str) -> str:
"""Get runtime configuration option value.
### Parameters
- **option**: the name of the configuration option.
For more information see `FT.CONFIG GET <https://redis.io/commands/ft.config-get>`_.
""" # noqa
cmd = [CONFIG_CMD, "GET", option]
res = self.execute_command(*cmd)
return self._parse_results(CONFIG_CMD, res)
[docs] def tagvals(self, tagfield: str):
"""
Return a list of all possible tag values
### Parameters
- **tagfield**: Tag field name
For more information see `FT.TAGVALS <https://redis.io/commands/ft.tagvals>`_.
""" # noqa
return self.execute_command(TAGVALS_CMD, self.index_name, tagfield)
[docs] def aliasadd(self, alias: str):
"""
Alias a search index - will fail if alias already exists
### Parameters
- **alias**: Name of the alias to create
For more information see `FT.ALIASADD <https://redis.io/commands/ft.aliasadd>`_.
""" # noqa
return self.execute_command(ALIAS_ADD_CMD, alias, self.index_name)
[docs] def aliasupdate(self, alias: str):
"""
Updates an alias - will fail if alias does not already exist
### Parameters
- **alias**: Name of the alias to create
For more information see `FT.ALIASUPDATE <https://redis.io/commands/ft.aliasupdate>`_.
""" # noqa
return self.execute_command(ALIAS_UPDATE_CMD, alias, self.index_name)
[docs] def aliasdel(self, alias: str):
"""
Removes an alias to a search index
### Parameters
- **alias**: Name of the alias to delete
For more information see `FT.ALIASDEL <https://redis.io/commands/ft.aliasdel>`_.
""" # noqa
return self.execute_command(ALIAS_DEL_CMD, alias)
[docs] def sugadd(self, key, *suggestions, **kwargs):
"""
Add suggestion terms to the AutoCompleter engine. Each suggestion has
a score and string.
If kwargs["increment"] is true and the terms are already in the
server's dictionary, we increment their scores.
For more information see `FT.SUGADD <https://redis.io/commands/ft.sugadd/>`_.
""" # noqa
# If Transaction is not False it will MULTI/EXEC which will error
pipe = self.pipeline(transaction=False)
for sug in suggestions:
args = [SUGADD_COMMAND, key, sug.string, sug.score]
if kwargs.get("increment"):
args.append("INCR")
if sug.payload:
args.append("PAYLOAD")
args.append(sug.payload)
pipe.execute_command(*args)
return pipe.execute()[-1]
[docs] def suglen(self, key: str) -> int:
"""
Return the number of entries in the AutoCompleter index.
For more information see `FT.SUGLEN <https://redis.io/commands/ft.suglen>`_.
""" # noqa
return self.execute_command(SUGLEN_COMMAND, key)
[docs] def sugdel(self, key: str, string: str) -> int:
"""
Delete a string from the AutoCompleter index.
Returns 1 if the string was found and deleted, 0 otherwise.
For more information see `FT.SUGDEL <https://redis.io/commands/ft.sugdel>`_.
""" # noqa
return self.execute_command(SUGDEL_COMMAND, key, string)
[docs] def sugget(
self,
key: str,
prefix: str,
fuzzy: bool = False,
num: int = 10,
with_scores: bool = False,
with_payloads: bool = False,
) -> List[SuggestionParser]:
"""
Get a list of suggestions from the AutoCompleter, for a given prefix.
Parameters:
prefix : str
The prefix we are searching. **Must be valid ascii or utf-8**
fuzzy : bool
If set to true, the prefix search is done in fuzzy mode.
**NOTE**: Running fuzzy searches on short (<3 letters) prefixes
can be very
slow, and even scan the entire index.
with_scores : bool
If set to true, we also return the (refactored) score of
each suggestion.
This is normally not needed, and is NOT the original score
inserted into the index.
with_payloads : bool
Return suggestion payloads
num : int
The maximum number of results we return. Note that we might
return less. The algorithm trims irrelevant suggestions.
Returns:
list:
A list of Suggestion objects. If with_scores was False, the
score of all suggestions is 1.
For more information see `FT.SUGGET <https://redis.io/commands/ft.sugget>`_.
""" # noqa
args = [SUGGET_COMMAND, key, prefix, "MAX", num]
if fuzzy:
args.append(FUZZY)
if with_scores:
args.append(WITHSCORES)
if with_payloads:
args.append(WITHPAYLOADS)
res = self.execute_command(*args)
results = []
if not res:
return results
parser = SuggestionParser(with_scores, with_payloads, res)
return [s for s in parser]
[docs] def synupdate(self, groupid: str, skipinitial: bool = False, *terms: List[str]):
"""
Updates a synonym group.
The command is used to create or update a synonym group with
additional terms.
Only documents which were indexed after the update will be affected.
Parameters:
groupid :
Synonym group id.
skipinitial : bool
If set to true, we do not scan and index.
terms :
The terms.
For more information see `FT.SYNUPDATE <https://redis.io/commands/ft.synupdate>`_.
""" # noqa
cmd = [SYNUPDATE_CMD, self.index_name, groupid]
if skipinitial:
cmd.extend(["SKIPINITIALSCAN"])
cmd.extend(terms)
return self.execute_command(*cmd)
[docs] def syndump(self):
"""
Dumps the contents of a synonym group.
The command is used to dump the synonyms data structure.
Returns a list of synonym terms and their synonym group ids.
For more information see `FT.SYNDUMP <https://redis.io/commands/ft.syndump>`_.
""" # noqa
res = self.execute_command(SYNDUMP_CMD, self.index_name)
return self._parse_results(SYNDUMP_CMD, res)
class AsyncSearchCommands(SearchCommands):
async def info(self):
"""
Get info an stats about the the current index, including the number of
documents, memory consumption, etc
For more information see `FT.INFO <https://redis.io/commands/ft.info>`_.
"""
res = await self.execute_command(INFO_CMD, self.index_name)
return self._parse_results(INFO_CMD, res)
async def search(
self,
query: Union[str, Query],
query_params: Optional[Dict[str, Union[str, int, float, bytes]]] = None,
):
"""
Search the index for a given query, and return a result of documents
### Parameters
- **query**: the search query. Either a text for simple queries with
default parameters, or a Query object for complex queries.
See RediSearch's documentation on query format
For more information see `FT.SEARCH <https://redis.io/commands/ft.search>`_.
""" # noqa
args, query = self._mk_query_args(query, query_params=query_params)
st = time.monotonic()
options = {}
if not check_protocol_version(get_protocol_version(self.client), 3):
options[NEVER_DECODE] = True
if isinstance(self, Pipeline):
options["query"] = query
options["duration"] = 0
res = await self.execute_command(SEARCH_CMD, *args, **options)
if isinstance(res, Pipeline):
return res
return self._parse_results(
SEARCH_CMD, res, query=query, duration=(time.monotonic() - st) * 1000.0
)
@experimental_method()
async def hybrid_search(
self,
query: HybridQuery,
combine_method: Optional[CombineResultsMethod] = None,
post_processing: Optional[HybridPostProcessingConfig] = None,
params_substitution: Optional[Dict[str, Union[str, int, float, bytes]]] = None,
timeout: Optional[int] = None,
cursor: Optional[HybridCursorQuery] = None,
) -> Union[HybridResult, HybridCursorResult, Pipeline]:
"""
Execute a hybrid search using both text and vector queries
Args:
- **query**: HybridQuery object
Contains the text and vector queries
- **combine_method**: CombineResultsMethod object
Contains the combine method and parameters
- **post_processing**: HybridPostProcessingConfig object
Contains the post processing configuration
- **params_substitution**: Dict[str, Union[str, int, float, bytes]]
Contains the parameters substitution
- **timeout**: int - contains the timeout in milliseconds
- **cursor**: HybridCursorQuery object - contains the cursor configuration
For more information see `FT.SEARCH <https://redis.io/commands/ft.hybrid>`.
"""
index = self.index_name
options = {}
pieces = [HYBRID_CMD, index]
pieces.extend(query.get_args())
if combine_method:
pieces.extend(combine_method.get_args())
if post_processing:
pieces.extend(post_processing.build_args())
options["post_processing"] = post_processing
if params_substitution:
pieces.extend(self.get_params_args(params_substitution))
if timeout:
pieces.extend(("TIMEOUT", timeout))
if cursor:
options["cursor"] = True
pieces.extend(cursor.build_args())
# Preserve HYBRID result values as bytes by default, matching the
# legacy RESP2 Search surface; selected LOAD fields can opt into
# decoding through HybridPostProcessingConfig.load(..., decode_field=True).
options[NEVER_DECODE] = True
options["query"] = query
res = await self.execute_command(*pieces, **options)
if isinstance(res, Pipeline):
return res
return self._parse_results(HYBRID_CMD, res, **options)
async def aggregate(
self,
query: Union[AggregateResult, Cursor],
query_params: Optional[Dict[str, Union[str, int, float, bytes]]] = None,
):
"""
Issue an aggregation query.
### Parameters
**query**: This can be either an `AggregateRequest`, or a `Cursor`
An `AggregateResult` object is returned. You can access the rows from
its `rows` property, which will always yield the rows of the result.
For more information see `FT.AGGREGATE <https://redis.io/commands/ft.aggregate>`_.
""" # noqa
if isinstance(query, AggregateRequest):
has_cursor = bool(query._cursor)
cmd = [AGGREGATE_CMD, self.index_name] + query.build_args()
elif isinstance(query, Cursor):
has_cursor = True
cmd = [CURSOR_CMD, "READ", self.index_name] + query.build_args()
else:
raise ValueError("Bad query", query)
cmd += self.get_params_args(query_params)
raw = await self.execute_command(*cmd)
return self._parse_results(
AGGREGATE_CMD, raw, query=query, has_cursor=has_cursor
)
async def spellcheck(self, query, distance=None, include=None, exclude=None):
"""
Issue a spellcheck query
### Parameters
**query**: search query.
**distance***: the maximal Levenshtein distance for spelling
suggestions (default: 1, max: 4).
**include**: specifies an inclusion custom dictionary.
**exclude**: specifies an exclusion custom dictionary.
For more information see `FT.SPELLCHECK <https://redis.io/commands/ft.spellcheck>`_.
""" # noqa
cmd = [SPELLCHECK_CMD, self.index_name, query]
if distance:
cmd.extend(["DISTANCE", distance])
if include:
cmd.extend(["TERMS", "INCLUDE", include])
if exclude:
cmd.extend(["TERMS", "EXCLUDE", exclude])
res = await self.execute_command(*cmd)
return self._parse_results(SPELLCHECK_CMD, res)
@deprecated_function(
version="8.0.0",
reason="deprecated since Redis 8.0, call config_set from core module instead",
)
async def config_set(self, option: str, value: str) -> bool:
"""Set runtime configuration option.
### Parameters
- **option**: the name of the configuration option.
- **value**: a value for the configuration option.
For more information see `FT.CONFIG SET <https://redis.io/commands/ft.config-set>`_.
""" # noqa
cmd = [CONFIG_CMD, "SET", option, value]
raw = await self.execute_command(*cmd)
return raw == "OK"
@deprecated_function(
version="8.0.0",
reason="deprecated since Redis 8.0, call config_get from core module instead",
)
async def config_get(self, option: str) -> str:
"""Get runtime configuration option value.
### Parameters
- **option**: the name of the configuration option.
For more information see `FT.CONFIG GET <https://redis.io/commands/ft.config-get>`_.
""" # noqa
cmd = [CONFIG_CMD, "GET", option]
res = {}
res = await self.execute_command(*cmd)
return self._parse_results(CONFIG_CMD, res)
async def load_document(self, id, field_encodings: Optional[Dict[str, Any]] = None):
"""
Load a single document by id
- **field_encodings**: optional dict mapping field names to encodings.
If a field's encoding is ``None`` the raw bytes value is preserved
(useful for binary data such as vectors).
"""
fields = await self.client.hgetall(id)
fields = {
str_if_bytes(k): decode_field_value(v, str_if_bytes(k), field_encodings)
for k, v in fields.items()
}
try:
del fields["id"]
except KeyError:
pass
return Document(id=id, **fields)
async def sugadd(self, key, *suggestions, **kwargs):
"""
Add suggestion terms to the AutoCompleter engine. Each suggestion has
a score and string.
If kwargs["increment"] is true and the terms are already in the
server's dictionary, we increment their scores.
For more information see `FT.SUGADD <https://redis.io/commands/ft.sugadd>`_.
""" # noqa
# If Transaction is not False it will MULTI/EXEC which will error
pipe = self.pipeline(transaction=False)
for sug in suggestions:
args = [SUGADD_COMMAND, key, sug.string, sug.score]
if kwargs.get("increment"):
args.append("INCR")
if sug.payload:
args.append("PAYLOAD")
args.append(sug.payload)
pipe.execute_command(*args)
return (await pipe.execute())[-1]
async def sugget(
self,
key: str,
prefix: str,
fuzzy: bool = False,
num: int = 10,
with_scores: bool = False,
with_payloads: bool = False,
) -> List[SuggestionParser]:
"""
Get a list of suggestions from the AutoCompleter, for a given prefix.
Parameters:
prefix : str
The prefix we are searching. **Must be valid ascii or utf-8**
fuzzy : bool
If set to true, the prefix search is done in fuzzy mode.
**NOTE**: Running fuzzy searches on short (<3 letters) prefixes
can be very
slow, and even scan the entire index.
with_scores : bool
If set to true, we also return the (refactored) score of
each suggestion.
This is normally not needed, and is NOT the original score
inserted into the index.
with_payloads : bool
Return suggestion payloads
num : int
The maximum number of results we return. Note that we might
return less. The algorithm trims irrelevant suggestions.
Returns:
list:
A list of Suggestion objects. If with_scores was False, the
score of all suggestions is 1.
For more information see `FT.SUGGET <https://redis.io/commands/ft.sugget>`_.
""" # noqa
args = [SUGGET_COMMAND, key, prefix, "MAX", num]
if fuzzy:
args.append(FUZZY)
if with_scores:
args.append(WITHSCORES)
if with_payloads:
args.append(WITHPAYLOADS)
ret = await self.execute_command(*args)
results = []
if not ret:
return results
parser = SuggestionParser(with_scores, with_payloads, ret)
return [s for s in parser]