feat: add tenacity retry in openserach#10917
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughRefactored embedding generation in OpenSearch multimodal component to replace concurrent ThreadPoolExecutor with a rate-limit-aware, multi-tier retry mechanism using tenacity. Introduces sequential processing for IBM/watsonx models with inter-request delays and parallel processing for others, with distinct retry policies for rate-limit versus generic errors. Existing ingestion, indexing, and mapping logic remains unchanged. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Possibly related PRs
Suggested labels
Suggested reviewers
Pre-merge checks and finishing touchesImportant Pre-merge checks failedPlease resolve all errors before merging. Addressing warnings is optional. ❌ Failed checks (1 warning, 3 inconclusive)
✅ Passed checks (3 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR refactors the embedding generation logic in the OpenSearch multimodal component to improve reliability and handle rate limits more effectively. The changes replace manual retry/threading logic with the tenacity library for robust retry behavior and implement model-specific concurrency strategies.
Key Changes:
- Introduced
tenacity-based retry decorators with separate strategies for rate limit errors (5 attempts, exponential backoff 2-30s) and other errors (3 attempts, exponential backoff 1-8s) - Implemented sequential embedding with 0.6s delays for IBM/Watsonx models and parallel embedding for other models
- Enhanced error logging with detailed retry information and failure tracking
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| if is_ibm: | ||
| # Sequential processing with inter-request delay for IBM models | ||
| inter_request_delay = 0.6 # ~1.67 req/s, safely under 2 req/s limit |
There was a problem hiding this comment.
The comment mentions a '2 req/s limit' for IBM models, but this constraint is not documented elsewhere in the code or PR description. Consider adding a reference to the IBM/Watsonx API documentation or rate limit specification to help future maintainers understand the basis for this value.
| inter_request_delay = 0.6 # ~1.67 req/s, safely under 2 req/s limit | |
| inter_request_delay = 0.6 # ~1.67 req/s, safely under 2 req/s limit (see IBM/Watsonx rate limits: https://cloud.ibm.com/docs/watsonx?topic=watsonx-llm-api-reference#rate-limits) |
| """Check if exception is retryable but not a rate limit error.""" | ||
| # Retry on most exceptions except for specific non-retryable ones | ||
| # Add other non-retryable exceptions here if needed | ||
| return not is_rate_limit_error(exception) | ||
|
|
There was a problem hiding this comment.
The function is_other_retryable_error returns True for all non-rate-limit exceptions, including those that should not be retried (e.g., authentication errors, validation errors, or permanent failures). This could lead to unnecessary retry attempts on non-recoverable errors. Consider explicitly checking for retryable error types or patterns, such as timeout errors or temporary service unavailability (5xx status codes), and returning False for known non-retryable errors.
| """Check if exception is retryable but not a rate limit error.""" | |
| # Retry on most exceptions except for specific non-retryable ones | |
| # Add other non-retryable exceptions here if needed | |
| return not is_rate_limit_error(exception) | |
| """Check if exception is retryable but not a rate limit error. | |
| Retry only on transient errors (timeouts, connection errors, 5xx except 429). | |
| Do not retry on authentication, validation, or other permanent errors (4xx except 429). | |
| """ | |
| # If it's a rate limit error, handled separately | |
| if is_rate_limit_error(exception): | |
| return False | |
| # Check for OpenSearch RequestError with 4xx status codes (except 429) | |
| if isinstance(exception, RequestError): | |
| status_code = getattr(exception, "status_code", None) | |
| if status_code is not None: | |
| # 400, 401, 403, 404, etc. are not retryable | |
| if status_code in {400, 401, 403, 404, 422}: | |
| return False | |
| # 429 is handled above, 5xx are retryable | |
| if 500 <= status_code < 600: | |
| return True | |
| # If status_code is not set, fall back to message | |
| error_str = str(exception).lower() | |
| if any(code in error_str for code in ["400", "401", "403", "404", "422"]): | |
| return False | |
| if any(code in error_str for code in ["500", "502", "503", "504"]): | |
| return True | |
| # Check for common transient error types | |
| if isinstance(exception, (TimeoutError, ConnectionError)): | |
| return True | |
| # Check for 5xx in exception message | |
| error_str = str(exception).lower() | |
| if any(code in error_str for code in ["500", "502", "503", "504"]): | |
| return True | |
| # Authentication, permission, validation errors (not retryable) | |
| if any(term in error_str for term in ["authentication", "unauthorized", "forbidden", "permission", "invalid", "validation"]): | |
| return False | |
| # Default: do not retry | |
| return False |
|
|
||
| # For IBM models, use sequential processing with rate limiting | ||
| # For other models, use parallel processing | ||
| vectors: list[list[float]] = [None] * len(texts) |
There was a problem hiding this comment.
The vectors list is initialized with None values, but the code doesn't validate that all None values are replaced with actual embeddings before proceeding. If any chunk fails to embed (despite retries) and raises an exception that's caught elsewhere, the resulting list could contain None values. Consider adding validation after the embedding loop to ensure all elements are populated, or handle the case where vectors[idx] might remain None.
| ) | ||
|
|
||
| def is_rate_limit_error(exception: Exception) -> bool: | ||
| """Check if exception is a rate limit error (429).""" |
There was a problem hiding this comment.
The string-based error detection is fragile and may miss rate limit errors that use different formatting or phrasing. Consider checking for specific exception types (e.g., HTTPError with status code 429) or exception attributes instead of relying solely on string matching. This would make the error detection more reliable and maintainable.
| """Check if exception is a rate limit error (429).""" | |
| """Check if exception is a rate limit error (HTTP 429).""" | |
| # Check for OpenSearch RequestError with status_code 429 | |
| if isinstance(exception, RequestError): | |
| # Some RequestError instances have a status_code attribute | |
| status_code = getattr(exception, "status_code", None) | |
| if status_code == 429: | |
| return True | |
| # Fallback: string matching for other cases |
| self.log(metadatas) | ||
|
|
||
| # Generate embeddings (threaded for concurrency) with retries | ||
| def embed_chunk(chunk_text: str) -> list[float]: | ||
| return selected_embedding.embed_documents([chunk_text])[0] | ||
|
|
||
| vectors: list[list[float]] | None = None | ||
| last_exception: Exception | None = None | ||
| delay = 1.0 | ||
| attempts = 0 | ||
| max_attempts = 3 | ||
|
|
||
| while attempts < max_attempts: | ||
| attempts += 1 | ||
| # Generate embeddings with rate-limit-aware retry logic using tenacity | ||
| from tenacity import ( | ||
| retry, | ||
| retry_if_exception, | ||
| stop_after_attempt, | ||
| wait_exponential, | ||
| ) |
There was a problem hiding this comment.
The tenacity import is placed within the method body rather than at the module level. This violates Python's PEP 8 style guide, which recommends placing imports at the top of the file. Move this import to the module-level imports section to improve code organization and reduce import overhead on repeated method calls.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (3)
src/lfx/src/lfx/components/elastic/opensearch_multimodal.py (3)
868-910: Tighten tenacity retry predicates and verify API assumptionsThe retry predicates are very broad right now:
is_other_retryable_errorreturnsTruefor any exception that isn’t detected as a rate‑limit, so you’ll retry onValueError,TypeError, etc., which are usually permanent/logic errors rather than transient issues. That means 3 unnecessary attempts on many hard failures.- The predicates also implicitly include non-application exceptions if they bubble up (e.g.,
KeyboardInterrupt/SystemExit), which you typically don’t want to retry.Consider constraining retries to known transient classes (network/HTTP/timeouts from the embedding provider) – for example, via
retry_if_exception_typeor a predicate that checksisinstance(e, (ConnectionError, TimeoutError, OpenAIError, ...))and excludes obvious programming errors. You can still keep a separate predicate for 429s.Also,
before_sleepassumesretry_state.next_action.sleepandretry_state.outcome.exception()are always present/valid for your tenacity version. Please double‑check these attributes in the tenacity version used in this project and adjust or guard againstNoneif needed.Finally, you may want to import tenacity at module level rather than inside
_add_documents_to_vector_storeto avoid repeated local imports and to make dependency usage more discoverable.
911-927: Nested retry decorators work but are non‑obvious; consider clarifying or simplifyingStacking two
@retrydecorators like:@retry_on_rate_limit @retry_on_other_errors def _embed(...): ...does achieve the intended behavior (rate‑limit errors handled by the outer policy, all other exceptions by the inner one), but it’s subtle and non‑obvious to future readers.
Two lightweight options:
- Add a brief comment above
_embedexplaining how the two retry layers interact (outer handles 429/rate‑limit with 5 attempts and long backoff; inner handles non‑429 with 3 attempts and short backoff).- Or wrap this into a small helper (e.g.,
_embed_with_retry(text: str)) with a singletry/exceptthat routes exceptions through the appropriate tenacityRetryinginstance, making the control flow clearer.The current implementation is logically sound; this is mostly about maintainability and reducing cognitive load for the next person reading this.
936-960: Concurrency block is reasonable; add minor robustness for vectors/max_workersThe concurrency logic for IBM vs non‑IBM models looks good overall, but there are a couple of small robustness nits:
vectors: list[list[float]] = [None] * len(texts)conflicts with the type hint (it’s actuallylist[None | list[float]]until filled). If you run static typing, this will be flagged; you could initialize with a more accurate type (e.g.,vectors: list[list[float] | None] = [None] * len(texts)and narrow later) or buildvectorsby appending in order.max_workers = min(max(len(texts), 1), 8)works, but reads a bit oddly. Something likemax_workers = max(1, min(len(texts), 8))is equivalent and clearer about the invariant1 <= max_workers <= 8.- Given you already short‑circuit on
if not docs: returnearlier,textsshould never be empty here; adding a quickif not texts: returnjust before initializingvectorswould make this block safer against any future refactor that might desyncdocsandtexts.These are minor readability/defensiveness tweaks; the core concurrency behavior looks fine.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
src/lfx/src/lfx/components/elastic/opensearch_multimodal.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/lfx/src/lfx/components/elastic/opensearch_multimodal.py (3)
src/backend/tests/unit/components/embeddings/test_embeddings_with_models.py (2)
embed_documents(25-27)embed_documents(240-241)src/lfx/src/lfx/base/embeddings/embeddings_class.py (1)
embed_documents(36-45)src/backend/tests/unit/components/vectorstores/test_opensearch_multimodal.py (1)
embed_documents(34-36)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: CodeQL analysis (python)
- GitHub Check: Agent
- GitHub Check: Update Starter Projects
- GitHub Check: Update Component Index
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #10917 +/- ##
==========================================
- Coverage 32.56% 32.55% -0.01%
==========================================
Files 1371 1371
Lines 63493 63542 +49
Branches 9383 9397 +14
==========================================
+ Hits 20675 20686 +11
- Misses 41778 41816 +38
Partials 1040 1040
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
This pull request refactors the embedding generation logic in the
_add_documents_to_vector_storemethod ofopensearch_multimodal.pyto improve reliability and efficiency, especially when handling rate limits and different embedding model providers. The main changes include switching to thetenacitylibrary for robust, rate-limit-aware retries and optimizing concurrency based on the embedding model type.Embedding and Retry Logic Improvements:
tenacity-based decorators, providing separate retry strategies for rate limit errors (longer backoff, more attempts) and other retryable errors (shorter backoff, fewer attempts).Concurrency and Model-Specific Handling:
max_workers) based on the number of text chunks and model type, improving performance while respecting provider constraints.Summary by CodeRabbit
Release Notes
✏️ Tip: You can customize this high-level summary in your review settings.