Конспект 15: Advanced RAG - Продвинутые техники

Pavel 13.12.2025 22:19 94 просмотров

Оглавление

  1. Введение в Advanced RAG
  2. Базовый RAG vs Advanced RAG
  3. Query Understanding
  4. Retrieval Optimization
  5. Re-ranking и Fusion
  6. Multi-hop Reasoning
  7. Adaptive RAG
  8. Contextual Compression
  9. Knowledge Graph Integration
  10. Практические примеры
  11. Лучшие практики

Введение в Advanced RAG

Что такое Advanced RAG?

Определение: Высокоуровневые техники для улучшения качества Retrieval-Augmented Generation систем

Проблемы базового RAG:

Базовый RAG:
Query → Embed → Search → Retrieve top-k → LLM → Answer

Проблемы:
❌ Может найти неправильные документы
❌ Может вернуть нерелевантные чанки
❌ Не понимает сложные запросы
❌ Не может делать multi-step reasoning
❌ Плохо работает с редкими вопросами

Advanced RAG решение:

Advanced RAG:
Query → Understand intent → Smart retrieval → Re-rank → 
Fuse multiple sources → Compress context → LLM → Answer

Улучшения:
✅ Понимает что нужно
✅ Находит лучшие документы
✅ Может комбинировать источники
✅ Multi-hop reasoning
✅ Работает с редким и сложным

История RAG

2020: RAG основы (Lewis et al.)
- Простой retriever + LLM
- BERT для embeddings

2021-2022: Dense retrieval (DPR, ColBERT)
- Лучше embeddings
- Более точный поиск

2023: Advanced RAG techniques
- Query expansion
- Multi-modal retrieval
- Reranking

2024: Hybrid RAG
- Text + Knowledge graphs
- Multi-hop reasoning
- Adaptive retrieval

2025: Reasoning-augmented RAG
- Integration with o1/o3
- Complex reasoning over documents

Базовый RAG vs Advanced RAG

Архитектурное сравнение

БАЗОВЫЙ RAG:

User Query
    ↓
Embed Query
    ↓
Vector Search
    ↓
Top-K Retrieval
    ↓
LLM Generation
    ↓
Answer

Простой, быстрый, но часто неправильный


ADVANCED RAG:

User Query
    ↓
Query Understanding (NER, intent detection)
    ↓
Query Expansion (паraphrase, decomposition)
    ↓
Multi-source Retrieval:
├─ Vector search
├─ BM25 (keyword search)
├─ Knowledge graph
└─ Structured data
    ↓
Re-ranking (cross-encoder)
    ↓
Fusion (combine results)
    ↓
Contextual Compression
    ↓
LLM Generation
    ↓
Answer Verification
    ↓
Answer

Метрики улучшения

Метрика           | Базовый RAG | Advanced RAG | Улучшение
|-----------------|-------------|-------------|-----------|
| Retrieval Success| 70%         | 92%         | +22%
| Answer Accuracy | 75%         | 88%         | +13%
| Latency         | 2 sec       | 5 sec       | -150%
| Hallucination   | 15%         | 5%          | -67%
| Context Quality | 6/10        | 9/10        | +50%

Вывод: Качество выше, но медленнее
Решение: Smart caching, parallel processing

Query Understanding

Intent Detection

Идея: Понять что на самом деле ищет пользователь

from enum import Enum

class QueryIntent(Enum):
    FACTUAL = "factual_qa"       # "What is N8N?"
    ANALYTICAL = "analysis"       # "Compare X vs Y"
    INSTRUCTIONAL = "how_to"     # "How to use N8N?"
    REASONING = "reasoning"       # "Why is X better?"
    CREATIVE = "creative"         # "Generate ideas for..."

def detect_intent(query: str, llm) -> QueryIntent:
    """
    Determine what type of query this is
    """

    prompt = f"""
    Classify this query intent:

    Query: {query}

    Options:
    - FACTUAL: Simple fact lookup
    - ANALYTICAL: Comparison/analysis
    - INSTRUCTIONAL: How-to/tutorial
    - REASONING: Why/explain
    - CREATIVE: Ideation/generation

    Intent:
    """

    response = llm.predict(prompt).strip()
    return QueryIntent[response]

# Использование
intent = detect_intent("How does N8N handle workflows?", llm)
# → INSTRUCTIONAL

Named Entity Recognition (NER)

from spacy import load

nlp = load("en_core_web_lg")

def extract_entities(query: str) -> dict:
    """
    Extract important entities from query
    """

    doc = nlp(query)

    entities = {
        "PERSON": [],
        "ORG": [],
        "PRODUCT": [],
        "LOCATION": [],
        "DATE": []
    }

    for ent in doc.ents:
        if ent.label_ in entities:
            entities[ent.label_].append(ent.text)

    return entities

# Использование
query = "How does OpenAI's GPT-4 work in production?"
entities = extract_entities(query)
# → {"PERSON": [], "ORG": ["OpenAI"], "PRODUCT": ["GPT-4"]}

Query Expansion

Идея: Сгенерировать варианты вопроса для лучшего поиска

def expand_query(query: str, llm, num_expansions: int = 3) -> list[str]:
    """
    Generate multiple versions of the query
    """

    prompt = f"""
    Generate {num_expansions} paraphrases of this query:

    Original: {query}

    Paraphrases (one per line):
    """

    response = llm.predict(prompt)
    paraphrases = [line.strip() for line in response.split('\n') if line.strip()]

    return [query] + paraphrases  # Include original

# Использование
original = "How to use N8N for automation?"
expanded = expand_query(original, llm)
# → [
#    "How to use N8N for automation?",
#    "How can I use N8N to automate workflows?",
#    "Guide to using N8N for task automation",
#    "What is the process of automating tasks with N8N?"
# ]

# Теперь можно искать все варианты
for query_variant in expanded:
    results.extend(vector_search(query_variant, top_k=2))
# Больше релевантных результатов!

Retrieval Optimization

Hybrid Retrieval (Vector + BM25)

from sklearn.metrics.pairwise import cosine_similarity
from rank_bm25 import BM25Okapi

class HybridRetriever:
    def __init__(self, documents, embeddings_model):
        self.documents = documents
        self.embeddings = embeddings_model

        # Vector search
        self.document_embeddings = [
            embeddings_model.embed(doc) for doc in documents
        ]

        # BM25 search
        tokenized_docs = [doc.split() for doc in documents]
        self.bm25 = BM25Okapi(tokenized_docs)

    def retrieve(self, query: str, top_k: int = 5) -> list[dict]:
        """
        Retrieve using both vector and keyword search
        """

        # Vector search
        query_embedding = self.embeddings.embed(query)
        vector_scores = cosine_similarity(
            [query_embedding],
            self.document_embeddings
        )[0]

        # BM25 search
        bm25_scores = self.bm25.get_scores(query.split())

        # Normalize and combine
        vector_scores_norm = (vector_scores - vector_scores.min()) / (vector_scores.max() - vector_scores.min())
        bm25_scores_norm = (bm25_scores - bm25_scores.min()) / (bm25_scores.max() - bm25_scores.min())

        combined_scores = 0.7 * vector_scores_norm + 0.3 * bm25_scores_norm

        # Get top-k
        top_indices = combined_scores.argsort()[-top_k:][::-1]

        return [
            {
                "document": self.documents[i],
                "score": combined_scores[i],
                "vector_score": vector_scores_norm[i],
                "bm25_score": bm25_scores_norm[i]
            }
            for i in top_indices
        ]

# Использование
retriever = HybridRetriever(documents, embeddings_model)
results = retriever.retrieve("How to automate workflows?", top_k=5)

Multi-modal Retrieval

from sentence_transformers import util

class MultimodalRetriever:
    def __init__(self, documents, images, embeddings_model):
        self.documents = documents
        self.images = images
        self.embeddings = embeddings_model

        # Embed all modalities
        self.doc_embeddings = [embeddings_model.embed(doc) for doc in documents]
        self.img_embeddings = [embeddings_model.embed(img) for img in images]

    def retrieve(self, query: str, top_k: int = 5) -> list[dict]:
        """
        Retrieve both text and images
        """

        query_embedding = self.embeddings.embed(query)

        # Search documents
        doc_scores = util.cos_sim(query_embedding, self.doc_embeddings)[0]
        top_docs = doc_scores.argsort()[-top_k:][::-1]

        # Search images
        img_scores = util.cos_sim(query_embedding, self.img_embeddings)[0]
        top_imgs = img_scores.argsort()[-top_k:][::-1]

        results = []
        for idx in top_docs:
            results.append({
                "type": "text",
                "content": self.documents[idx],
                "score": doc_scores[idx]
            })

        for idx in top_imgs:
            results.append({
                "type": "image",
                "content": self.images[idx],
                "score": img_scores[idx]
            })

        return sorted(results, key=lambda x: x["score"], reverse=True)[:top_k]

Re-ranking и Fusion

Cross-Encoder Re-ranking

from sentence_transformers import CrossEncoder

class RAGReranker:
    def __init__(self):
        self.model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

    def rerank(self, query: str, documents: list[str], top_k: int = 5) -> list[dict]:
        """
        Re-rank documents using cross-encoder
        """

        # Pair query with each document
        pairs = [[query, doc] for doc in documents]

        # Get scores
        scores = self.model.predict(pairs)

        # Sort by score
        ranked = sorted(
            zip(documents, scores),
            key=lambda x: x[1],
            reverse=True
        )

        return [
            {
                "document": doc,
                "score": float(score)
            }
            for doc, score in ranked[:top_k]
        ]

# Использование
reranker = RAGReranker()

# First retrieve with fast method (100 docs)
initial_results = vector_search(query, top_k=100)

# Then rerank (slower but more accurate)
final_results = reranker.rerank(query, initial_results, top_k=5)

Result Fusion (RRF - Reciprocal Rank Fusion)

def reciprocal_rank_fusion(result_lists: list[list[dict]], k: int = 60) -> list[dict]:
    """
    Combine results from multiple retrievers
    """

    fused_scores = {}

    for result_list in result_lists:
        for rank, result in enumerate(result_list, 1):
            doc_id = id(result["document"])

            # RRF formula: 1 / (k + rank)
            rrf_score = 1 / (k + rank)

            if doc_id not in fused_scores:
                fused_scores[doc_id] = {"document": result["document"], "score": 0}

            fused_scores[doc_id]["score"] += rrf_score

    # Sort by combined score
    return sorted(
        fused_scores.values(),
        key=lambda x: x["score"],
        reverse=True
    )

# Использование
vector_results = vector_search(query, top_k=10)
bm25_results = bm25_search(query, top_k=10)
knowledge_graph_results = kg_search(query, top_k=10)

# Combine all
fused = reciprocal_rank_fusion([vector_results, bm25_results, knowledge_graph_results])

Multi-hop Reasoning

Идея: Для сложных вопросов нужно несколько шагов поиска

class MultiHopRAG:
    def __init__(self, retriever, llm):
        self.retriever = retriever
        self.llm = llm

    def answer_multi_hop(self, query: str, max_hops: int = 3) -> dict:
        """
        Answer complex questions through multiple retrieval steps
        """

        context = []
        current_query = query

        for hop in range(max_hops):
            # 1. Retrieve for current query
            results = self.retriever.retrieve(current_query, top_k=3)
            context.extend([r["document"] for r in results])

            # 2. Check if we have enough info
            enough_info = self.llm.predict(f"""
                Based on this context, can we answer: {query}?

                Context: {' '.join(context)}

                Answer YES or NO and suggest next search:
            """)

            if "YES" in enough_info:
                break

            # 3. Extract next search query
            next_query = self.llm.predict(f"""
                What should we search next to answer: {query}?

                Current context: {' '.join(context)}

                Next search query:
            """)

            current_query = next_query.strip()

        # Generate final answer
        answer = self.llm.predict(f"""
            Answer this question: {query}

            Using context: {' '.join(context)}

            Answer:
        """)

        return {
            "query": query,
            "hops": min(hop + 1, max_hops),
            "context": context,
            "answer": answer
        }

# Пример
# Q: "What year was the founder of N8N born?"
# Hop 1: Search "Who founded N8N?" → "Jan Hruska"
# Hop 2: Search "Jan Hruska birth year" → "1990"
# Answer: "1990"

Adaptive RAG

Идея: Выбирать стратегию retrieval в зависимости от типа вопроса

class AdaptiveRAG:
    def __init__(self, retriever, llm, knowledge_graph=None):
        self.retriever = retriever
        self.llm = llm
        self.kg = knowledge_graph

    def retrieve_adaptively(self, query: str) -> dict:
        """
        Choose retrieval strategy based on query type
        """

        # 1. Determine complexity
        complexity = self.llm.predict(f"""
            Is this query simple, moderate, or complex?
            Query: {query}
            Answer: (simple/moderate/complex)
        """).strip()

        # 2. Determine knowledge type needed
        needs_facts = "factual" in query.lower() or "what" in query.lower()
        needs_reasoning = "why" in query.lower() or "how" in query.lower()
        needs_comparison = "vs" in query.lower() or "compare" in query.lower()

        # 3. Choose strategy
        if complexity == "simple" and needs_facts:
            # Fast path: simple keyword search
            strategy = "fast_retrieval"
            results = self.retriever.retrieve(query, top_k=3, method="bm25")

        elif complexity == "moderate":
            # Balanced path: hybrid search + reranking
            strategy = "hybrid_retrieval"
            results = self.retriever.retrieve(query, top_k=10, method="hybrid")
            results = rerank(query, results, top_k=5)

        elif needs_comparison:
            # Comparison path: multiple searches
            strategy = "comparison_retrieval"
            entities = extract_entities(query)
            results = []
            for entity in entities:
                results.extend(self.retriever.retrieve(entity, top_k=3))

        elif self.kg and needs_reasoning:
            # Knowledge graph path: structured reasoning
            strategy = "kg_retrieval"
            results = self.kg.retrieve(query)

        else:
            # Complex path: multi-hop
            strategy = "multi_hop_retrieval"
            results = multi_hop_retrieval(query)

        return {
            "strategy": strategy,
            "complexity": complexity,
            "results": results
        }

# Использование
rag = AdaptiveRAG(retriever, llm, knowledge_graph)
result = rag.retrieve_adaptively("Compare N8N and Zapier")
# → Automatically chooses comparison_retrieval strategy

Contextual Compression

Идея: Сжать контекст чтобы LLM видел только релевантное

class ContextCompressor:
    def __init__(self, llm):
        self.llm = llm

    def compress(self, query: str, documents: list[str]) -> str:
        """
        Extract only relevant parts from documents
        """

        combined = "\n\n".join([f"[Doc {i+1}]\n{doc}" for i, doc in enumerate(documents)])

        prompt = f"""
        Extract only the parts relevant to: "{query}"

        Documents:
        {combined}

        Compressed context (only relevant parts):
        """

        return self.llm.predict(prompt)

    def summarize(self, documents: list[str]) -> str:
        """
        Summarize documents while keeping relevance
        """

        combined = "\n\n".join(documents)

        prompt = f"""
        Summarize these documents in 3-5 sentences:

        {combined}

        Summary:
        """

        return self.llm.predict(prompt)

# Использование
compressor = ContextCompressor(llm)

# Instead of passing full documents
full_context = "\n".join(documents)  # Can be 10K tokens!

# Compress first
compressed = compressor.compress(query, documents)  # Now 2K tokens

response = llm.predict(f"""
Answer: {query}
Context: {compressed}
""")
# Экономия на токенах: 80%!

Knowledge Graph Integration

class KnowledgeGraphRAG:
    def __init__(self, kg, embeddings_model):
        self.kg = kg  # Neo4j, etc.
        self.embeddings = embeddings_model

    def retrieve_with_kg(self, query: str) -> dict:
        """
        Combine semantic search with structured knowledge
        """

        # 1. Extract entities from query
        entities = extract_entities(query)

        # 2. Find related entities in KG
        related_entities = []
        for entity in entities["PRODUCT"]:
            related = self.kg.query(f"""
                MATCH (n {{name: '{entity}'}})-[r]->(m)
                RETURN m.name, r.type
                LIMIT 10
            """)
            related_entities.extend(related)

        # 3. Retrieve documents for main + related entities
        documents = []
        for entity in entities["PRODUCT"] + [e[0] for e in related_entities]:
            docs = vector_search(entity, top_k=3)
            documents.extend(docs)

        # 4. Build knowledge context
        kg_context = f"""
        Key entities: {', '.join([e['name'] for e in entities['PRODUCT']])}

        Related concepts:
        """
        for entity, rel_type in related_entities[:5]:
            kg_context += f"\n- {entity} ({rel_type})"

        return {
            "entities": entities,
            "documents": documents,
            "kg_context": kg_context
        }

# Использование
kg_rag = KnowledgeGraphRAG(neo4j_connection, embeddings)
result = kg_rag.retrieve_with_kg("How does N8N integrate with Zapier?")
# → Finds both direct documents AND related services from KG

Практические примеры

Пример 1: Production RAG Pipeline

class ProductionRAG:
    def __init__(self, config: dict):
        self.retriever = HybridRetriever(...)
        self.reranker = RAGReranker()
        self.compressor = ContextCompressor(...)
        self.llm = OpenAI(...)
        self.cache = Redis()

    def query(self, user_query: str) -> dict:
        """Full RAG pipeline"""

        # 1. Check cache
        cache_key = hashlib.md5(user_query.encode()).hexdigest()
        cached = self.cache.get(cache_key)
        if cached:
            return json.loads(cached)

        # 2. Understand query
        intent = detect_intent(user_query, self.llm)
        expanded_queries = expand_query(user_query, self.llm)

        # 3. Retrieve (parallel)
        all_results = []
        for q in expanded_queries:
            results = self.retriever.retrieve(q, top_k=20)
            all_results.extend(results)

        # 4. Re-rank
        reranked = self.reranker.rerank(user_query, all_results, top_k=10)

        # 5. Compress context
        documents = [r["document"] for r in reranked]
        compressed_context = self.compressor.compress(user_query, documents)

        # 6. Generate answer
        prompt = f"""
        Question: {user_query}

        Context:
        {compressed_context}

        Answer:
        """

        answer = self.llm.predict(prompt)

        # 7. Verify answer
        verification = self.llm.predict(f"""
            Is this answer supported by the context?
            Answer: {answer}
            Context: {compressed_context}

            YES or NO:
        """)

        result = {
            "query": user_query,
            "intent": intent.value,
            "answer": answer,
            "verified": "YES" in verification,
            "source_count": len(reranked),
            "sources": documents[:3]
        }

        # 8. Cache result
        self.cache.setex(cache_key, 3600, json.dumps(result))

        return result

Пример 2: Multi-hop RAG System

class InvestigativeRAG:
    """For complex research questions"""

    def __init__(self, retriever, llm):
        self.retriever = retriever
        self.llm = llm
        self.research_path = []

    def research(self, research_question: str, max_depth: int = 5) -> dict:
        """
        Conduct multi-step research
        """

        findings = {
            "question": research_question,
            "steps": [],
            "final_answer": "",
            "evidence": []
        }

        current_questions = [research_question]

        for depth in range(max_depth):
            step_findings = []
            next_questions = []

            for q in current_questions:
                # Retrieve
                results = self.retriever.retrieve(q, top_k=5)

                # Ask follow-up questions
                followup = self.llm.predict(f"""
                    To answer "{q}", what else should we know?

                    Current findings: {results}

                    Next questions (3 max, one per line):
                """)

                step_findings.append({
                    "question": q,
                    "results": results,
                    "followup": followup
                })

                next_questions.extend(followup.split('\n')[:3])

            findings["steps"].append({
                "depth": depth + 1,
                "findings": step_findings
            })

            if not next_questions:
                break

            current_questions = next_questions

        # Final synthesis
        all_evidence = []
        for step in findings["steps"]:
            for finding in step["findings"]:
                all_evidence.extend(finding["results"])

        findings["final_answer"] = self.llm.predict(f"""
            Based on this research:

            {json.dumps(findings, indent=2)}

            Answer the original question: {research_question}

            Final answer:
        """)

        findings["evidence"] = all_evidence

        return findings

Лучшие практики

Do's ✅

  1. Используй hybrid retrieval Vector + BM25 > одного

  2. Всегда переранжируй результаты Cross-encoder дает лучше качество

  3. Сжимай контекст перед LLM Меньше токенов = дешевле и быстрее

  4. Кэшируй результаты Одни и те же вопросы часто повторяются

  5. Мониторь качество retrieval Отслеживай успешность поиска

Don'ts ❌

  1. Не передавай весь контекст в LLM Компресс перед отправкой

  2. Не игнорируй специальные вопросы Используй adaptive strategies

  3. Не используй только vector search Гибридный подход работает лучше

  4. Не забывай про latency Advanced RAG медленнее - используй cache и parallel

Чек-лист Advanced RAG

Retrieval:
☑️ Hybrid retrieval настроено
☑️ Query expansion реализована
☑️ Multi-source retrieval работает

Processing:
☑️ Re-ranking применяется
☑️ Contextual compression работает
☑️ Result fusion настроено

Quality:
☑️ Retrieval quality мониторится
☑️ Answer verification включена
☑️ Hallucination detection работает

Performance:
☑️ Caching реализовано
☑️ Parallel processing применено
☑️ Latency оптимизирована

Maintenance:
☑️ Documents регулярно обновляются
☑️ Metrics отслеживаются
☑️ A/B testing проводится

Дата создания: December 2025
Версия: 1.0
Автор: Pavel
Применение: Information Retrieval, Knowledge Integration, Production Search Systems

Комментарии (0)

Для добавления комментария необходимо войти в аккаунт

Войти / Зарегистрироваться