RAG(検索拡張生成)システム完全実装ガイド2026

Tech Trends AI
- 15 minutes read - 3173 wordsRAG(検索拡張生成)システム完全実装ガイド2026
RAG(Retrieval-Augmented Generation)は、大規模言語モデルの知識を外部データで補強し、より精度の高い回答を生成する技術です。本記事では、実用的なRAGシステムをゼロから構築する方法を包括的に解説します。
目次
- RAGシステムの基本概念
- システム設計と要件定義
- 環境構築と必要ライブラリ
- 文書処理パイプラインの実装
- ベクトルデータベースの構築
- 検索・生成エンジンの実装
- Webアプリケーションとしての実装
- 性能最適化とチューニング
- 監視・運用・保守
- トラブルシューティング
RAGシステムの基本概念
RAGの動作原理
RAGシステムは以下の3つのステップで動作します:
- 検索(Retrieval): ユーザーの質問に関連する文書を検索
- 拡張(Augmented): 検索結果を生成モデルのコンテキストに追加
- 生成(Generation): 拡張されたコンテキストから回答を生成
アーキテクチャ概要
# RAGシステムの基本構造
class RAGSystem:
def __init__(self):
self.document_processor = DocumentProcessor()
self.embedding_model = EmbeddingModel()
self.vector_store = VectorDatabase()
self.retriever = Retriever()
self.generator = LLMGenerator()
def process_query(self, query: str) -> str:
# 1. クエリをベクトル化
query_embedding = self.embedding_model.embed(query)
# 2. 関連文書を検索
relevant_docs = self.retriever.retrieve(query_embedding)
# 3. コンテキストを構築
context = self.build_context(relevant_docs, query)
# 4. 回答を生成
response = self.generator.generate(context)
return response
システム設計と要件定義
機能要件
- 文書インデックス機能: PDF、テキスト、Webページの取り込み
- 意味検索機能: 自然言語による高精度な検索
- 回答生成機能: コンテキストに基づいた回答生成
- 管理機能: 文書の追加・削除・更新
非機能要件
- スケーラビリティ: 大量文書への対応
- レスポンス時間: 5秒以内の回答生成
- 精度: 80%以上の回答精度
- 可用性: 99.9%のシステム稼働率
システム構成図
graph TB
A[ユーザー] --> B[WebAPI]
B --> C[RAGエンジン]
C --> D[文書処理]
C --> E[ベクトル検索]
C --> F[LLM生成]
D --> G[ベクトルDB]
E --> G
F --> H[回答]
H --> B
B --> A
環境構築と必要ライブラリ
基本環境
# Python仮想環境の作成
python -m venv venv
source venv/bin/activate # Linux/Mac
# venv\Scripts\activate # Windows
# 基本パッケージのインストール
pip install --upgrade pip
必要ライブラリ一覧
# requirements.txt
langchain==0.1.0
langchain-community==0.0.10
openai==1.10.0
chromadb==0.4.22
sentence-transformers==2.2.2
pypdf==3.17.4
python-dotenv==1.0.0
fastapi==0.104.1
uvicorn==0.24.0
streamlit==1.29.0
tiktoken==0.5.2
numpy==1.24.3
pandas==2.0.3
# パッケージインストール
pip install -r requirements.txt
環境変数設定
# .env ファイル
OPENAI_API_KEY=your_openai_api_key_here
CHROMA_PERSIST_DIRECTORY=./chroma_db
EMBEDDING_MODEL=sentence-transformers/all-MiniLM-L6-v2
LLM_MODEL=gpt-3.5-turbo
MAX_TOKENS=1000
TEMPERATURE=0.1
文書処理パイプラインの実装
文書読み込みクラス
import os
from typing import List, Dict
from pathlib import Path
from langchain.document_loaders import PyPDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
class DocumentProcessor:
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
def load_documents(self, file_path: str) -> List[Document]:
"""ファイルからドキュメントを読み込む"""
file_extension = Path(file_path).suffix.lower()
if file_extension == '.pdf':
loader = PyPDFLoader(file_path)
elif file_extension == '.txt':
loader = TextLoader(file_path, encoding='utf-8')
else:
raise ValueError(f"Unsupported file type: {file_extension}")
documents = loader.load()
return documents
def split_documents(self, documents: List[Document]) -> List[Document]:
"""文書をチャンクに分割"""
chunks = self.text_splitter.split_documents(documents)
# メタデータの追加
for i, chunk in enumerate(chunks):
chunk.metadata.update({
'chunk_id': i,
'chunk_size': len(chunk.page_content)
})
return chunks
def process_directory(self, directory_path: str) -> List[Document]:
"""ディレクトリ内の全ファイルを処理"""
all_chunks = []
for file_path in Path(directory_path).glob("**/*"):
if file_path.is_file() and file_path.suffix.lower() in ['.pdf', '.txt']:
try:
print(f"Processing: {file_path}")
documents = self.load_documents(str(file_path))
chunks = self.split_documents(documents)
all_chunks.extend(chunks)
except Exception as e:
print(f"Error processing {file_path}: {e}")
return all_chunks
# 使用例
processor = DocumentProcessor()
documents = processor.process_directory("./data/documents")
print(f"Processed {len(documents)} chunks")
高度な文書処理
import re
from typing import Optional
class AdvancedDocumentProcessor(DocumentProcessor):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def preprocess_text(self, text: str) -> str:
"""テキストの前処理"""
# 余分な空白の削除
text = re.sub(r'\s+', ' ', text)
# 特殊文字の正規化
text = text.replace('\u00a0', ' ') # non-breaking space
text = text.replace('\u2019', "'") # right single quotation mark
text = text.replace('\u201c', '"') # left double quotation mark
text = text.replace('\u201d', '"') # right double quotation mark
return text.strip()
def extract_metadata(self, document: Document) -> Dict:
"""文書からメタデータを抽出"""
content = document.page_content
metadata = document.metadata.copy()
# タイトルの抽出(最初の行や# で始まる行)
lines = content.split('\n')
title = None
for line in lines[:5]: # 最初の5行から探す
line = line.strip()
if line.startswith('#'):
title = line.lstrip('#').strip()
break
elif len(line) > 10 and len(line) < 100:
title = line
break
if title:
metadata['title'] = title
# キーワードの抽出(簡易版)
keywords = self.extract_keywords(content)
metadata['keywords'] = keywords
return metadata
def extract_keywords(self, text: str, top_k: int = 5) -> List[str]:
"""簡易キーワード抽出"""
# 単語の出現頻度をカウント
words = re.findall(r'\b[a-zA-Zあ-ん一-龯]{3,}\b', text)
word_freq = {}
for word in words:
word = word.lower()
word_freq[word] = word_freq.get(word, 0) + 1
# 頻出語を返す
sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
return [word for word, freq in sorted_words[:top_k]]
ベクトルデータベースの構築
ChromaDBセットアップ
import chromadb
from chromadb.config import Settings
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from typing import List, Optional
class VectorStoreManager:
def __init__(self,
persist_directory: str = "./chroma_db",
embedding_model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
self.persist_directory = persist_directory
self.embedding_model = HuggingFaceEmbeddings(
model_name=embedding_model_name,
model_kwargs={'device': 'cpu'},
encode_kwargs={'normalize_embeddings': True}
)
# ChromaDBクライアントの初期化
self.client = chromadb.PersistentClient(path=persist_directory)
# Langchainベクトルストアの初期化
self.vector_store = Chroma(
client=self.client,
embedding_function=self.embedding_model,
persist_directory=persist_directory
)
def add_documents(self, documents: List[Document], collection_name: str = "default"):
"""文書をベクトルストアに追加"""
try:
# メタデータの準備
texts = [doc.page_content for doc in documents]
metadatas = [doc.metadata for doc in documents]
# 文書の追加
self.vector_store.add_texts(
texts=texts,
metadatas=metadatas
)
print(f"Added {len(documents)} documents to vector store")
return True
except Exception as e:
print(f"Error adding documents: {e}")
return False
def similarity_search(self,
query: str,
k: int = 5,
filter_metadata: Optional[Dict] = None) -> List[Document]:
"""類似度検索"""
try:
results = self.vector_store.similarity_search(
query=query,
k=k,
filter=filter_metadata
)
return results
except Exception as e:
print(f"Error during search: {e}")
return []
def similarity_search_with_score(self,
query: str,
k: int = 5) -> List[tuple]:
"""スコア付き類似度検索"""
try:
results = self.vector_store.similarity_search_with_score(
query=query,
k=k
)
return results
except Exception as e:
print(f"Error during scored search: {e}")
return []
def delete_collection(self):
"""コレクションの削除"""
try:
self.vector_store.delete_collection()
print("Collection deleted successfully")
except Exception as e:
print(f"Error deleting collection: {e}")
# 使用例
vector_manager = VectorStoreManager()
# 文書の追加
documents = processor.process_directory("./data/documents")
vector_manager.add_documents(documents)
# 検索の実行
results = vector_manager.similarity_search("機械学習の基本概念", k=3)
for doc in results:
print(f"Content: {doc.page_content[:200]}...")
print(f"Metadata: {doc.metadata}")
print("---")
高度なベクトル検索
from langchain.retrievers import MultiQueryRetriever
from langchain.llms import OpenAI
class AdvancedRetriever:
def __init__(self, vector_store: Chroma, llm):
self.vector_store = vector_store
self.llm = llm
# マルチクエリレトリーバーの設定
self.multi_query_retriever = MultiQueryRetriever.from_llm(
retriever=vector_store.as_retriever(search_kwargs={"k": 5}),
llm=llm
)
def hybrid_search(self, query: str, alpha: float = 0.5) -> List[Document]:
"""ハイブリッド検索(意味検索とキーワード検索の組み合わせ)"""
# 意味検索
semantic_results = self.vector_store.similarity_search(query, k=10)
# キーワード検索(簡易実装)
keyword_results = self.keyword_search(query)
# 結果の統合
combined_results = self.combine_results(semantic_results, keyword_results, alpha)
return combined_results[:5]
def keyword_search(self, query: str) -> List[Document]:
"""キーワード検索(簡易実装)"""
# 実際の実装では、より高度なキーワード検索が必要
keywords = query.lower().split()
# 全文書から関連文書を検索
all_docs = self.vector_store.similarity_search("", k=1000) # 全取得は非効率的
relevant_docs = []
for doc in all_docs:
content_lower = doc.page_content.lower()
score = sum(1 for keyword in keywords if keyword in content_lower)
if score > 0:
relevant_docs.append((doc, score))
# スコア順にソート
relevant_docs.sort(key=lambda x: x[1], reverse=True)
return [doc for doc, score in relevant_docs[:10]]
def combine_results(self, semantic_results: List[Document],
keyword_results: List[Document], alpha: float) -> List[Document]:
"""検索結果の統合"""
# 重複排除と重み付け結合
doc_scores = {}
# 意味検索結果のスコア
for i, doc in enumerate(semantic_results):
doc_id = id(doc) # 簡易的なID
doc_scores[doc_id] = {
'doc': doc,
'semantic_score': (len(semantic_results) - i) / len(semantic_results),
'keyword_score': 0
}
# キーワード検索結果のスコア
for i, doc in enumerate(keyword_results):
doc_id = id(doc)
if doc_id in doc_scores:
doc_scores[doc_id]['keyword_score'] = (len(keyword_results) - i) / len(keyword_results)
else:
doc_scores[doc_id] = {
'doc': doc,
'semantic_score': 0,
'keyword_score': (len(keyword_results) - i) / len(keyword_results)
}
# 統合スコアの計算
for doc_id in doc_scores:
semantic_score = doc_scores[doc_id]['semantic_score']
keyword_score = doc_scores[doc_id]['keyword_score']
doc_scores[doc_id]['combined_score'] = alpha * semantic_score + (1 - alpha) * keyword_score
# スコア順にソート
sorted_docs = sorted(doc_scores.values(), key=lambda x: x['combined_score'], reverse=True)
return [item['doc'] for item in sorted_docs]
検索・生成エンジンの実装
RAGエンジンのコア実装
import openai
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
from typing import List, Dict, Optional
class RAGEngine:
def __init__(self,
vector_store_manager: VectorStoreManager,
openai_api_key: str,
model_name: str = "gpt-3.5-turbo",
temperature: float = 0.1):
self.vector_store_manager = vector_store_manager
openai.api_key = openai_api_key
# LLMの初期化
self.llm = OpenAI(
model_name=model_name,
temperature=temperature,
openai_api_key=openai_api_key
)
# プロンプトテンプレートの定義
self.prompt_template = PromptTemplate(
template="""以下のコンテキスト情報を使用して、質問に対して正確で有用な回答を提供してください。
コンテキストに含まれていない情報については「提供された情報では回答できません」と述べてください。
コンテキスト:
{context}
質問: {question}
回答:""",
input_variables=["context", "question"]
)
# RetrievalQAチェーンの設定
self.qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=self.vector_store_manager.vector_store.as_retriever(
search_kwargs={"k": 5}
),
chain_type_kwargs={"prompt": self.prompt_template},
return_source_documents=True
)
def query(self, question: str, return_sources: bool = True) -> Dict:
"""質問に対する回答を生成"""
try:
# 関連文書の検索
relevant_docs = self.vector_store_manager.similarity_search(question, k=5)
# コンテキストの構築
context = self.build_context(relevant_docs)
# 回答の生成
prompt = self.prompt_template.format(context=context, question=question)
response = self.llm(prompt)
result = {
'question': question,
'answer': response.strip(),
'context_length': len(context),
'num_sources': len(relevant_docs)
}
if return_sources:
result['sources'] = [
{
'content': doc.page_content[:200] + '...',
'metadata': doc.metadata
}
for doc in relevant_docs
]
return result
except Exception as e:
return {
'question': question,
'answer': f"エラーが発生しました: {e}",
'error': str(e)
}
def build_context(self, documents: List[Document]) -> str:
"""関連文書からコンテキストを構築"""
context_parts = []
for i, doc in enumerate(documents, 1):
context_parts.append(f"文書{i}:\n{doc.page_content}\n")
return "\n".join(context_parts)
def batch_query(self, questions: List[str]) -> List[Dict]:
"""複数質問の一括処理"""
results = []
for question in questions:
result = self.query(question)
results.append(result)
return results
# 使用例
rag_engine = RAGEngine(
vector_store_manager=vector_manager,
openai_api_key=os.getenv("OPENAI_API_KEY")
)
# 質問の実行
result = rag_engine.query("機械学習の主要なアルゴリズムは何ですか?")
print(f"質問: {result['question']}")
print(f"回答: {result['answer']}")
print(f"ソース数: {result['num_sources']}")
高度な回答生成
class AdvancedRAGEngine(RAGEngine):
def __init__(self, **kwargs):
super().__init__(**kwargs)
# 複数のプロンプトテンプレート
self.prompt_templates = {
'default': self.prompt_template,
'analytical': PromptTemplate(
template="""以下の情報を分析し、構造的で詳細な回答を提供してください。
分析対象:
{context}
質問: {question}
以下の形式で回答してください:
1. 概要
2. 詳細説明
3. 具体例(該当する場合)
4. 注意点や制限事項
回答:""",
input_variables=["context", "question"]
),
'comparative': PromptTemplate(
template="""以下の情報を基に、比較分析を行ってください。
比較対象情報:
{context}
質問: {question}
以下の観点から比較してください:
- 特徴
- 利点・欠点
- 適用場面
- 推奨事項
回答:""",
input_variables=["context", "question"]
)
}
def intelligent_query(self, question: str, query_type: str = 'auto') -> Dict:
"""知的クエリ処理"""
# クエリタイプの自動判定
if query_type == 'auto':
query_type = self.detect_query_type(question)
# 適切なプロンプトテンプレートの選択
template = self.prompt_templates.get(query_type, self.prompt_templates['default'])
# 関連文書の検索(重要度順)
relevant_docs = self.enhanced_retrieval(question)
# コンテキストの構築
context = self.build_enhanced_context(relevant_docs, question)
# 回答の生成
prompt = template.format(context=context, question=question)
response = self.llm(prompt)
return {
'question': question,
'answer': response.strip(),
'query_type': query_type,
'context_length': len(context),
'sources': [
{
'content': doc.page_content[:300] + '...',
'metadata': doc.metadata,
'relevance_score': getattr(doc, 'relevance_score', 0)
}
for doc in relevant_docs
]
}
def detect_query_type(self, question: str) -> str:
"""質問タイプの自動判定"""
question_lower = question.lower()
# 比較を求める質問
if any(word in question_lower for word in ['比較', 'vs', '違い', 'どちらが', '差']):
return 'comparative'
# 分析を求める質問
if any(word in question_lower for word in ['分析', '詳しく', '詳細', '構造', '仕組み']):
return 'analytical'
return 'default'
def enhanced_retrieval(self, question: str, top_k: int = 5) -> List[Document]:
"""強化された文書検索"""
# 通常の類似度検索
docs_with_scores = self.vector_store_manager.similarity_search_with_score(question, k=top_k*2)
# スコアの正規化と追加情報の付与
enhanced_docs = []
for doc, score in docs_with_scores[:top_k]:
doc.relevance_score = 1 - score # 距離をスコアに変換
enhanced_docs.append(doc)
return enhanced_docs
def build_enhanced_context(self, documents: List[Document], question: str) -> str:
"""強化されたコンテキスト構築"""
context_parts = []
for i, doc in enumerate(documents, 1):
relevance_score = getattr(doc, 'relevance_score', 0)
context_parts.append(
f"文書{i} (関連度: {relevance_score:.2f}):\n{doc.page_content}\n"
)
return "\n".join(context_parts)
Webアプリケーションとしての実装
FastAPI バックエンド
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import uvicorn
app = FastAPI(title="RAG API", version="1.0.0")
# レスポンスモデル
class QueryRequest(BaseModel):
question: str
query_type: Optional[str] = "auto"
return_sources: Optional[bool] = True
class QueryResponse(BaseModel):
question: str
answer: str
query_type: Optional[str] = None
context_length: int
num_sources: int
sources: Optional[List[dict]] = None
class HealthResponse(BaseModel):
status: str
message: str
# グローバル変数(実際の本番環境では適切な依存性注入を使用)
rag_engine = None
@app.on_event("startup")
async def startup_event():
"""アプリケーション起動時の処理"""
global rag_engine
try:
# RAGエンジンの初期化
vector_manager = VectorStoreManager()
rag_engine = AdvancedRAGEngine(
vector_store_manager=vector_manager,
openai_api_key=os.getenv("OPENAI_API_KEY")
)
print("RAG Engine initialized successfully")
except Exception as e:
print(f"Error initializing RAG Engine: {e}")
raise
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""ヘルスチェック"""
if rag_engine is None:
raise HTTPException(status_code=503, detail="RAG Engine not initialized")
return HealthResponse(status="healthy", message="RAG API is running")
@app.post("/query", response_model=QueryResponse)
async def process_query(request: QueryRequest):
"""クエリ処理エンドポイント"""
if rag_engine is None:
raise HTTPException(status_code=503, detail="RAG Engine not initialized")
try:
# RAGエンジンによる処理
result = rag_engine.intelligent_query(
question=request.question,
query_type=request.query_type
)
return QueryResponse(**result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Query processing failed: {e}")
@app.post("/upload")
async def upload_document(file_path: str):
"""文書アップロードエンドポイント"""
try:
# 文書処理
processor = DocumentProcessor()
documents = processor.load_documents(file_path)
chunks = processor.split_documents(documents)
# ベクトルストアへの追加
vector_manager = VectorStoreManager()
success = vector_manager.add_documents(chunks)
if success:
return {"message": f"Successfully uploaded {len(chunks)} chunks"}
else:
raise HTTPException(status_code=500, detail="Failed to add documents")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Upload failed: {e}")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Streamlit フロントエンド
import streamlit as st
import requests
import json
from typing import Dict
# ページ設定
st.set_page_config(
page_title="RAG System Demo",
page_icon="🤖",
layout="wide"
)
# タイトル
st.title("🤖 RAG System Demo")
st.markdown("検索拡張生成(RAG)システムのデモンストレーション")
# サイドバー設定
st.sidebar.header("設定")
api_url = st.sidebar.text_input(
"API URL",
value="http://localhost:8000"
)
query_type = st.sidebar.selectbox(
"クエリタイプ",
["auto", "default", "analytical", "comparative"]
)
return_sources = st.sidebar.checkbox("ソース情報を表示", value=True)
# メイン画面
col1, col2 = st.columns([2, 1])
with col1:
st.header("質問入力")
# 質問入力
question = st.text_area(
"質問を入力してください:",
height=100,
placeholder="例: 機械学習の主要なアルゴリズムについて教えてください"
)
# クエリ実行ボタン
if st.button("回答を生成", type="primary"):
if not question.strip():
st.error("質問を入力してください")
else:
with st.spinner("回答を生成中..."):
try:
# API呼び出し
response = requests.post(
f"{api_url}/query",
json={
"question": question,
"query_type": query_type,
"return_sources": return_sources
}
)
if response.status_code == 200:
result = response.json()
# 回答表示
st.header("回答")
st.write(result["answer"])
# メタ情報表示
col_meta1, col_meta2, col_meta3 = st.columns(3)
with col_meta1:
st.metric("クエリタイプ", result.get("query_type", "default"))
with col_meta2:
st.metric("コンテキスト長", result["context_length"])
with col_meta3:
st.metric("ソース数", result["num_sources"])
# ソース情報表示
if return_sources and result.get("sources"):
st.header("参照ソース")
for i, source in enumerate(result["sources"], 1):
with st.expander(f"ソース {i}"):
st.write(source["content"])
if source.get("metadata"):
st.json(source["metadata"])
else:
st.error(f"API Error: {response.status_code}")
st.write(response.text)
except Exception as e:
st.error(f"エラーが発生しました: {e}")
with col2:
st.header("システム情報")
# ヘルスチェック
try:
health_response = requests.get(f"{api_url}/health")
if health_response.status_code == 200:
st.success("✅ システム正常")
else:
st.error("❌ システム異常")
except:
st.error("❌ API接続不可")
# 使用例
st.header("使用例")
examples = [
"機械学習のアルゴリズムを比較してください",
"ディープラーニングについて詳しく説明してください",
"PythonとRの違いは何ですか?",
"データサイエンスのワークフローを教えてください"
]
for example in examples:
if st.button(f"📝 {example[:20]}...", key=example):
st.session_state.example_question = example
# セッション状態の例文を質問欄に反映
if 'example_question' in st.session_state:
question = st.session_state.example_question
del st.session_state.example_question
# フッター
st.markdown("---")
st.markdown("RAG System Demo v1.0 | Built with Streamlit & FastAPI")
性能最適化とチューニング
キャッシュシステム
import redis
import hashlib
import json
from typing import Optional, Dict, Any
class CacheManager:
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379,
redis_db: int = 0, ttl: int = 3600):
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=True
)
self.ttl = ttl
def _generate_cache_key(self, query: str, **kwargs) -> str:
"""クエリからキャッシュキーを生成"""
cache_data = {"query": query, **kwargs}
cache_string = json.dumps(cache_data, sort_keys=True)
return hashlib.md5(cache_string.encode()).hexdigest()
def get(self, query: str, **kwargs) -> Optional[Dict]:
"""キャッシュから結果を取得"""
cache_key = self._generate_cache_key(query, **kwargs)
try:
cached_result = self.redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
except Exception as e:
print(f"Cache get error: {e}")
return None
def set(self, query: str, result: Dict, **kwargs) -> bool:
"""結果をキャッシュに保存"""
cache_key = self._generate_cache_key(query, **kwargs)
try:
self.redis_client.setex(
cache_key,
self.ttl,
json.dumps(result)
)
return True
except Exception as e:
print(f"Cache set error: {e}")
return False
class OptimizedRAGEngine(AdvancedRAGEngine):
def __init__(self, cache_manager: Optional[CacheManager] = None, **kwargs):
super().__init__(**kwargs)
self.cache_manager = cache_manager
def query_with_cache(self, question: str, **kwargs) -> Dict:
"""キャッシュ機能付きクエリ処理"""
# キャッシュから結果を確認
if self.cache_manager:
cached_result = self.cache_manager.get(question, **kwargs)
if cached_result:
cached_result['from_cache'] = True
return cached_result
# キャッシュにない場合は新規処理
result = self.intelligent_query(question, **kwargs)
result['from_cache'] = False
# 結果をキャッシュに保存
if self.cache_manager:
self.cache_manager.set(question, result, **kwargs)
return result
パフォーマンス監視
import time
import psutil
import threading
from dataclasses import dataclass
from typing import List
from datetime import datetime
@dataclass
class PerformanceMetrics:
timestamp: datetime
query_time: float
memory_usage: float
cpu_usage: float
cache_hit_rate: float
class PerformanceMonitor:
def __init__(self):
self.metrics: List[PerformanceMetrics] = []
self.cache_hits = 0
self.total_queries = 0
self.lock = threading.Lock()
def record_query(self, execution_time: float, from_cache: bool = False):
"""クエリ実行のメトリクスを記録"""
with self.lock:
self.total_queries += 1
if from_cache:
self.cache_hits += 1
# システムメトリクスの取得
memory_usage = psutil.virtual_memory().percent
cpu_usage = psutil.cpu_percent()
cache_hit_rate = self.cache_hits / self.total_queries if self.total_queries > 0 else 0
# メトリクスの記録
metric = PerformanceMetrics(
timestamp=datetime.now(),
query_time=execution_time,
memory_usage=memory_usage,
cpu_usage=cpu_usage,
cache_hit_rate=cache_hit_rate
)
self.metrics.append(metric)
# 古いメトリクスの削除(直近1000件のみ保持)
if len(self.metrics) > 1000:
self.metrics = self.metrics[-1000:]
def get_average_metrics(self, last_n: int = 100) -> Dict:
"""直近のメトリクス平均値を取得"""
recent_metrics = self.metrics[-last_n:] if len(self.metrics) >= last_n else self.metrics
if not recent_metrics:
return {}
avg_query_time = sum(m.query_time for m in recent_metrics) / len(recent_metrics)
avg_memory_usage = sum(m.memory_usage for m in recent_metrics) / len(recent_metrics)
avg_cpu_usage = sum(m.cpu_usage for m in recent_metrics) / len(recent_metrics)
return {
'avg_query_time': avg_query_time,
'avg_memory_usage': avg_memory_usage,
'avg_cpu_usage': avg_cpu_usage,
'cache_hit_rate': recent_metrics[-1].cache_hit_rate if recent_metrics else 0,
'total_queries': self.total_queries
}
# 監視機能付きRAGエンジン
class MonitoredRAGEngine(OptimizedRAGEngine):
def __init__(self, performance_monitor: PerformanceMonitor, **kwargs):
super().__init__(**kwargs)
self.performance_monitor = performance_monitor
def query_with_monitoring(self, question: str, **kwargs) -> Dict:
"""監視機能付きクエリ処理"""
start_time = time.time()
try:
# クエリ実行
result = self.query_with_cache(question, **kwargs)
# 実行時間の計算
execution_time = time.time() - start_time
# メトリクスの記録
self.performance_monitor.record_query(
execution_time,
result.get('from_cache', False)
)
# 結果にメトリクス情報を追加
result['execution_time'] = execution_time
result['performance_metrics'] = self.performance_monitor.get_average_metrics()
return result
except Exception as e:
execution_time = time.time() - start_time
self.performance_monitor.record_query(execution_time, False)
raise e
監視・運用・保守
ログ設定
import logging
import sys
from logging.handlers import RotatingFileHandler
from pathlib import Path
def setup_logging():
"""ログシステムのセットアップ"""
# ログディレクトリの作成
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
# ログフォーマットの定義
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# ルートロガーの設定
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
# コンソールハンドラー
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# ファイルハンドラー(ローテーション)
file_handler = RotatingFileHandler(
log_dir / "rag_system.log",
maxBytes=10*1024*1024, # 10MB
backupCount=5
)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
# エラー専用ハンドラー
error_handler = RotatingFileHandler(
log_dir / "errors.log",
maxBytes=5*1024*1024, # 5MB
backupCount=3
)
error_handler.setLevel(logging.ERROR)
error_handler.setFormatter(formatter)
root_logger.addHandler(error_handler)
# 使用例
setup_logging()
logger = logging.getLogger(__name__)
class LoggingRAGEngine(MonitoredRAGEngine):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.logger = logging.getLogger(self.__class__.__name__)
def query_with_logging(self, question: str, **kwargs) -> Dict:
"""ログ機能付きクエリ処理"""
self.logger.info(f"Query received: {question[:100]}...")
try:
result = self.query_with_monitoring(question, **kwargs)
self.logger.info(
f"Query processed successfully. "
f"Execution time: {result.get('execution_time', 0):.2f}s, "
f"From cache: {result.get('from_cache', False)}"
)
return result
except Exception as e:
self.logger.error(f"Query processing failed: {e}", exc_info=True)
raise
ヘルスチェック機能
from typing import Dict, List
import asyncio
class HealthChecker:
def __init__(self, rag_engine, vector_store_manager):
self.rag_engine = rag_engine
self.vector_store_manager = vector_store_manager
def check_vector_store(self) -> Dict:
"""ベクトルストアのヘルスチェック"""
try:
# 簡単な検索テスト
test_results = self.vector_store_manager.similarity_search("test", k=1)
return {
'status': 'healthy',
'details': f'Vector store accessible, {len(test_results)} results returned'
}
except Exception as e:
return {
'status': 'unhealthy',
'details': f'Vector store error: {e}'
}
def check_llm(self) -> Dict:
"""LLMのヘルスチェック"""
try:
# 簡単なテストクエリ
test_response = self.rag_engine.llm("Hello")
return {
'status': 'healthy',
'details': 'LLM responding normally'
}
except Exception as e:
return {
'status': 'unhealthy',
'details': f'LLM error: {e}'
}
def comprehensive_health_check(self) -> Dict:
"""総合ヘルスチェック"""
checks = {
'vector_store': self.check_vector_store(),
'llm': self.check_llm(),
'system': self.check_system_resources()
}
overall_status = 'healthy' if all(
check['status'] == 'healthy' for check in checks.values()
) else 'unhealthy'
return {
'overall_status': overall_status,
'checks': checks,
'timestamp': datetime.now().isoformat()
}
def check_system_resources(self) -> Dict:
"""システムリソースのチェック"""
memory_usage = psutil.virtual_memory().percent
cpu_usage = psutil.cpu_percent(interval=1)
disk_usage = psutil.disk_usage('/').percent
status = 'healthy'
warnings = []
if memory_usage > 90:
status = 'unhealthy'
warnings.append(f'High memory usage: {memory_usage}%')
if cpu_usage > 90:
status = 'unhealthy'
warnings.append(f'High CPU usage: {cpu_usage}%')
if disk_usage > 90:
status = 'unhealthy'
warnings.append(f'High disk usage: {disk_usage}%')
return {
'status': status,
'details': {
'memory_usage': f'{memory_usage}%',
'cpu_usage': f'{cpu_usage}%',
'disk_usage': f'{disk_usage}%',
'warnings': warnings
}
}
トラブルシューティング
一般的な問題と対策
class RAGTroubleshooter:
def __init__(self, rag_engine, vector_store_manager):
self.rag_engine = rag_engine
self.vector_store_manager = vector_store_manager
self.logger = logging.getLogger(self.__class__.__name__)
def diagnose_poor_results(self, question: str, expected_keywords: List[str] = None) -> Dict:
"""回答品質の問題を診断"""
diagnosis = {
'issue': 'poor_results',
'checks': [],
'recommendations': []
}
# 1. 関連文書の存在確認
relevant_docs = self.vector_store_manager.similarity_search(question, k=10)
if len(relevant_docs) == 0:
diagnosis['checks'].append({
'name': 'document_availability',
'status': 'failed',
'details': 'No relevant documents found'
})
diagnosis['recommendations'].append(
'Add more relevant documents to the knowledge base'
)
else:
diagnosis['checks'].append({
'name': 'document_availability',
'status': 'passed',
'details': f'Found {len(relevant_docs)} relevant documents'
})
# 2. キーワードの存在確認
if expected_keywords:
for keyword in expected_keywords:
keyword_found = any(
keyword.lower() in doc.page_content.lower()
for doc in relevant_docs
)
diagnosis['checks'].append({
'name': f'keyword_presence_{keyword}',
'status': 'passed' if keyword_found else 'failed',
'details': f'Keyword "{keyword}" {"found" if keyword_found else "not found"}'
})
if not keyword_found:
diagnosis['recommendations'].append(
f'Consider adding documents containing "{keyword}"'
)
# 3. チャンクサイズの確認
if relevant_docs:
avg_chunk_size = sum(len(doc.page_content) for doc in relevant_docs) / len(relevant_docs)
if avg_chunk_size < 100:
diagnosis['checks'].append({
'name': 'chunk_size',
'status': 'warning',
'details': f'Small average chunk size: {avg_chunk_size:.0f} characters'
})
diagnosis['recommendations'].append(
'Consider increasing chunk size for better context'
)
elif avg_chunk_size > 2000:
diagnosis['checks'].append({
'name': 'chunk_size',
'status': 'warning',
'details': f'Large average chunk size: {avg_chunk_size:.0f} characters'
})
diagnosis['recommendations'].append(
'Consider decreasing chunk size for more precise retrieval'
)
else:
diagnosis['checks'].append({
'name': 'chunk_size',
'status': 'passed',
'details': f'Appropriate chunk size: {avg_chunk_size:.0f} characters'
})
return diagnosis
def diagnose_slow_response(self) -> Dict:
"""レスポンス速度の問題を診断"""
diagnosis = {
'issue': 'slow_response',
'checks': [],
'recommendations': []
}
# ベクトルストアのサイズ確認
try:
# ChromaDBの場合のサンプル検索で速度測定
start_time = time.time()
test_results = self.vector_store_manager.similarity_search("test query", k=5)
search_time = time.time() - start_time
if search_time > 2.0:
diagnosis['checks'].append({
'name': 'vector_search_speed',
'status': 'failed',
'details': f'Slow vector search: {search_time:.2f}s'
})
diagnosis['recommendations'].extend([
'Consider indexing optimization',
'Reduce vector database size',
'Use faster embedding model'
])
else:
diagnosis['checks'].append({
'name': 'vector_search_speed',
'status': 'passed',
'details': f'Good vector search speed: {search_time:.2f}s'
})
except Exception as e:
diagnosis['checks'].append({
'name': 'vector_search_speed',
'status': 'error',
'details': f'Error measuring search speed: {e}'
})
# システムリソースの確認
memory_usage = psutil.virtual_memory().percent
cpu_usage = psutil.cpu_percent()
if memory_usage > 80:
diagnosis['checks'].append({
'name': 'memory_usage',
'status': 'failed',
'details': f'High memory usage: {memory_usage}%'
})
diagnosis['recommendations'].append('Increase system memory or optimize memory usage')
if cpu_usage > 80:
diagnosis['checks'].append({
'name': 'cpu_usage',
'status': 'failed',
'details': f'High CPU usage: {cpu_usage}%'
})
diagnosis['recommendations'].append('Optimize CPU usage or scale horizontally')
return diagnosis
def auto_fix_common_issues(self) -> Dict:
"""一般的な問題の自動修復"""
fixes_applied = []
try:
# 1. 空のベクトルストアの確認と修復
test_results = self.vector_store_manager.similarity_search("test", k=1)
if not test_results:
self.logger.warning("Empty vector store detected")
# 自動的にサンプル文書を追加(実際の実装では適切な文書を使用)
# fixes_applied.append("Added sample documents to empty vector store")
# 2. キャッシュのクリア(メモリ使用量が高い場合)
memory_usage = psutil.virtual_memory().percent
if memory_usage > 85:
if hasattr(self.rag_engine, 'cache_manager') and self.rag_engine.cache_manager:
# キャッシュクリアの実装(実際のRedisキーに基づく)
fixes_applied.append("Cleared cache to reduce memory usage")
return {
'status': 'success',
'fixes_applied': fixes_applied
}
except Exception as e:
return {
'status': 'error',
'error': str(e),
'fixes_applied': fixes_applied
}
# 統合診断機能
def run_comprehensive_diagnosis(rag_engine, vector_store_manager,
test_question: str = "機械学習とは何ですか?") -> Dict:
"""包括的な診断の実行"""
troubleshooter = RAGTroubleshooter(rag_engine, vector_store_manager)
diagnosis_report = {
'timestamp': datetime.now().isoformat(),
'test_question': test_question,
'results': {}
}
# 1. 基本ヘルスチェック
health_checker = HealthChecker(rag_engine, vector_store_manager)
diagnosis_report['results']['health_check'] = health_checker.comprehensive_health_check()
# 2. 回答品質の診断
diagnosis_report['results']['quality_diagnosis'] = troubleshooter.diagnose_poor_results(
test_question,
expected_keywords=['機械学習', 'アルゴリズム', 'データ']
)
# 3. パフォーマンス診断
diagnosis_report['results']['performance_diagnosis'] = troubleshooter.diagnose_slow_response()
# 4. 自動修復の試行
diagnosis_report['results']['auto_fix'] = troubleshooter.auto_fix_common_issues()
return diagnosis_report
まとめ
この実装ガイドでは、RAGシステムの完全な構築方法を解説しました。重要なポイント:
✅ 実装のベストプラクティス
- モジュラー設計: 各コンポーネントを独立して開発・テスト
- 段階的実装: 基本機能から始めて徐々に高度な機能を追加
- 監視・ログ: 本番運用を考慮した監視機能の実装
- エラーハンドリング: 堅牢なエラー処理とトラブルシューティング
🚀 次のステップ
- スケーラビリティ: 分散処理やロードバランシングの実装
- セキュリティ: 認証・認可機能の追加
- 高度なRAG: マルチモーダルRAGやエージェントベースRAGの実装
- MLOps統合: モデル管理とデプロイメントパイプラインの構築
このガイドを参考に、あなたのユースケースに最適なRAGシステムを構築してください。
本記事は2026年2月時点の情報に基づいています。ライブラリのバージョンやAPIの仕様は変更される可能性があります。