RAG 시스템의 진짜 병목: 벡터 DB가 아니라 원본 데이터의 1:N 관계입니다

RAG 시스템의 정확도 문제를 벡터 DB 튜닝으로 해결하려는 팀이 많습니다. 하지만 실제 병목은 원본 데이터의 관계형 구조를 무시한 Chunking에서 발생합니다. 고객-주문-상품의 1:N:N 관계를 flat하게 임베딩하면, 아무리 좋은 벡터 DB를 써도 hallucination은 피할 수 없습니다.
이 글에서는 SQL 관계형 데이터를 RAG 시스템에 올바르게 통합하는 방법을 다룹니다.
1. 왜 벡터 DB만으로는 부족한가
현실에서 마주치는 문제
RAG 시스템을 구축하면서 이런 질문을 받아본 적 있을 것입니다:
"고객 A가 최근 3개월간 주문한 상품 중 반품률이 가장 높은 카테고리는?"
이 질문에 정확히 답하려면 다음 데이터가 동시에 필요합니다:
- 고객 정보 (customer_id, name, segment)
- 주문 정보 (order_id, order_date, customer_id)
- 주문 상세 (order_id, product_id, quantity, return_status)
- 상품 정보 (product_id, category, name)
문제는 대부분의 RAG 시스템이 이 데이터를 각각 별도의 chunk로 임베딩한다는 것입니다.
# 흔히 보는 잘못된 접근
chunks = [
"고객 A는 VIP 등급이며 서울에 거주합니다.",
"주문 #1234는 2024-01-15에 생성되었습니다.",
"상품 X는 전자제품 카테고리입니다.",
]
# 각 chunk를 독립적으로 임베딩
for chunk in chunks:
vector = embed(chunk)
vector_db.insert(vector)이렇게 하면 벡터 검색 시 "고객 A의 주문"이라는 관계를 찾을 수 없습니다.
벡터 유사도의 한계
벡터 검색은 의미적 유사성을 기반으로 합니다. 하지만 관계형 데이터에서 중요한 것은 구조적 연결입니다.
# 벡터 유사도 예시
query = "고객 A의 주문"
# 의미적으로 유사한 결과 (잘못된 결과)
results = [
"고객 B의 주문 내역입니다.", # '주문' 키워드 유사
"A사의 대량 주문 처리 방법", # 'A'와 '주문' 유사
"고객 만족도 조사 결과", # '고객' 유사
]
# 실제로 필요한 결과
expected = [
"고객 A (ID: 123)의 주문 #456, #789",
"주문 #456: 상품 X, Y (총 150,000원)",
]벡터 DB는 "고객"과 "주문"이라는 단어의 의미는 이해하지만, customer_id = 123인 레코드와 연결된 order_id들이라는 관계는 이해하지 못합니다.
근본적인 원인
데이터의 두 가지 속성
1. 의미적 속성 (Semantic)
- "이 텍스트가 무슨 내용인가?"
- 벡터 임베딩으로 캡처 가능
- 유사도 검색에 적합
2. 구조적 속성 (Structural)
- "이 데이터가 어떤 데이터와 연결되는가?"
- Foreign Key, JOIN으로 표현
- 벡터로는 캡처 불가능
RAG 시스템의 정확도 문제는 구조적 속성을 무시하고 의미적 속성만 인덱싱했기 때문에 발생합니다.
2. 관계형 데이터의 본질: 1:N, N:M 이해하기
E-Commerce 도메인의 관계 구조
실제 비즈니스 데이터는 복잡한 관계를 가진다. E-Commerce를 예로 들면:
-- 고객 테이블 (1의 측면)
CREATE TABLE customers (
customer_id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(255),
segment VARCHAR(50), -- VIP, Regular, New
created_at TIMESTAMP
);
-- 주문 테이블 (N의 측면, 고객과 1:N)
CREATE TABLE orders (
order_id SERIAL PRIMARY KEY,
customer_id INTEGER REFERENCES customers(customer_id),
order_date TIMESTAMP,
total_amount DECIMAL(10, 2),
status VARCHAR(50) -- pending, completed, cancelled
);
-- 주문 상세 테이블 (주문과 1:N, 상품과 N:1)
CREATE TABLE order_items (
item_id SERIAL PRIMARY KEY,
order_id INTEGER REFERENCES orders(order_id),
product_id INTEGER REFERENCES products(product_id),
quantity INTEGER,
unit_price DECIMAL(10, 2),
return_status VARCHAR(50) -- none, requested, completed
);
-- 상품 테이블
CREATE TABLE products (
product_id SERIAL PRIMARY KEY,
name VARCHAR(255),
category VARCHAR(100),
description TEXT,
price DECIMAL(10, 2)
);관계의 방향성과 카디널리티
관계 유형별 특성
1:N (One-to-Many)
고객 → 주문: 한 고객이 여러 주문 가능
주문 → 주문상세: 한 주문에 여러 상품 포함
Chunk 전략: Parent-Child 패턴 적용
N:M (Many-to-Many)
상품 ↔ 태그: 상품은 여러 태그, 태그는 여러 상품
고객 ↔ 상품(위시리스트): 다대다 관계
Chunk 전략: 중간 테이블 기준 그룹핑
1:1 (One-to-One)
고객 → 고객상세: 확장 정보
Chunk 전략: 단일 문서로 병합
관계를 무시하면 생기는 정보 손실
# 원본 데이터의 관계
customer_123 = {
"id": 123,
"name": "김철수",
"orders": [
{
"order_id": 456,
"items": [
{"product": "노트북", "qty": 1, "returned": False},
{"product": "마우스", "qty": 2, "returned": True},
]
},
{
"order_id": 789,
"items": [
{"product": "키보드", "qty": 1, "returned": True},
]
}
]
}
# 잘못된 Chunking (관계 손실)
flat_chunks = [
"김철수 고객 정보",
"주문 456 정보",
"주문 789 정보",
"노트북 상품 정보",
"마우스 상품 정보",
"키보드 상품 정보",
]
# 문제: "김철수가 반품한 상품은?" 질문에 답할 수 없음
# 올바른 Chunking (관계 보존)
relational_chunk = """
고객: 김철수 (ID: 123)
- 주문 #456 (2024-01-15)
- 노트북 x1 (반품: 없음)
- 마우스 x2 (반품: 완료)
- 주문 #789 (2024-02-20)
- 키보드 x1 (반품: 완료)
반품 요약: 마우스, 키보드 (총 2건)
"""3. AS-IS: 잘못된 Chunking이 만드는 Hallucination
전형적인 실수 패턴
패턴 1: 테이블별 독립 Chunking
# AS-IS: 테이블별로 따로 임베딩
def wrong_chunking_by_table():
# 고객 테이블 chunk
customers = db.query("SELECT * FROM customers")
for customer in customers:
chunk = f"고객 {customer.name}은 {customer.segment} 등급입니다."
embed_and_store(chunk)
# 주문 테이블 chunk (별도)
orders = db.query("SELECT * FROM orders")
for order in orders:
chunk = f"주문 #{order.order_id}는 {order.order_date}에 생성되었습니다."
embed_and_store(chunk)
# 상품 테이블 chunk (별도)
products = db.query("SELECT * FROM products")
for product in products:
chunk = f"{product.name}은 {product.category} 카테고리 상품입니다."
embed_and_store(chunk)왜 문제인가요?
# 사용자 질문
query = "VIP 고객이 가장 많이 구매한 카테고리는?"
# 검색 결과 (관계 없이 유사도만으로 검색)
results = [
"고객 A는 VIP 등급입니다.", # VIP 매칭
"고객 B는 VIP 등급입니다.", # VIP 매칭
"전자제품은 인기 카테고리입니다.", # 카테고리 매칭
"의류 카테고리 상품 목록", # 카테고리 매칭
]
# LLM이 받는 context에는 "누가 무엇을 샀는지" 정보가 없음
# → Hallucination 발생: "VIP 고객은 전자제품을 많이 구매합니다" (근거 없음)패턴 2: JOIN 결과를 무분별하게 Flatten
# AS-IS: JOIN 결과를 그대로 chunk로
def wrong_flattening():
query = """
SELECT c.name, o.order_id, p.name as product_name
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
"""
results = db.query(query)
for row in results:
# 각 row를 독립적인 chunk로 처리
chunk = f"{row.name}이 주문 #{row.order_id}에서 {row.product_name}을 구매"
embed_and_store(chunk)왜 문제인가요?
# 원본 데이터
# 고객 A의 주문 #1: 상품 X, Y, Z (3개 아이템)
#
# Flatten 후 chunks:
# - "고객 A가 주문 #1에서 상품 X를 구매"
# - "고객 A가 주문 #1에서 상품 Y를 구매"
# - "고객 A가 주문 #1에서 상품 Z를 구매"
# 문제점:
# 1. 동일 주문이 3개의 chunk로 분리됨
# 2. "고객 A의 주문 #1에 포함된 전체 상품"을 한번에 못 가져옴
# 3. 검색 시 일부 상품만 retrieve되어 불완전한 답변패턴 3: Context Window 최적화라는 함정
# AS-IS: 토큰 수 줄이려고 요약
def wrong_summarization():
customer_data = get_full_customer_profile(customer_id=123)
# 원본: 주문 50건, 상품 200개 상세 정보
# "토큰 절약"을 위해 요약
summary = f"""
고객 123 요약:
- 총 주문: 50건
- 총 구매액: 5,000,000원
- 주요 카테고리: 전자제품
"""
embed_and_store(summary)왜 문제인가요?
# 사용자 질문
query = "고객 123이 2024년 1월에 산 상품 중 아직 배송 안 된 건?"
# 요약본에는 개별 주문/배송 상태가 없음
# → "정보를 찾을 수 없습니다" 또는 hallucinationHallucination 발생 메커니즘
def demonstrate_hallucination():
"""
Hallucination이 발생하는 구체적 과정
"""
# Step 1: 잘못된 chunking으로 저장된 데이터
stored_chunks = [
{"id": 1, "text": "고객 A는 VIP입니다", "vector": [0.1, 0.2, ...]},
{"id": 2, "text": "주문 #100은 2024-01-15에 생성", "vector": [0.3, 0.1, ...]},
{"id": 3, "text": "상품 X는 전자제품", "vector": [0.2, 0.4, ...]},
# 관계 정보 없음: 고객 A가 주문 #100을 했는지? 주문 #100에 상품 X가 있는지?
]
# Step 2: 사용자 질문
question = "고객 A가 주문한 전자제품은?"
# Step 3: 벡터 검색 (의미적 유사도)
retrieved = [
"고객 A는 VIP입니다", # "고객 A" 매칭
"상품 X는 전자제품", # "전자제품" 매칭
]
# Step 4: LLM에게 전달
prompt = f"""
Context: {retrieved}
Question: {question}
"""
# Step 5: LLM의 추론 (hallucination)
llm_response = "고객 A가 주문한 전자제품은 상품 X입니다."
# 실제로는 고객 A와 상품 X 사이에 연결 관계가 있는지 알 수 없음!
# LLM은 context에 두 정보가 함께 있으니 연결된 것으로 "추측"4. TO-BE: 관계를 보존하는 Chunking 전략
핵심 원칙
관계 보존 Chunking 3대 원칙
1. 관계의 "1" 측을 기준으로 그룹핑
- 고객 기준: 고객 + 해당 고객의 모든 주문
- 주문 기준: 주문 + 해당 주문의 모든 아이템
2. Chunk 내에 관계 체인 완전 포함
- 고객 → 주문 → 상품까지 한 chunk에
- FK 참조를 텍스트로 명시
3. Metadata에 관계 식별자 포함
- customer_id, order_id를 metadata로 저장
- 검색 후 SQL로 추가 정보 조회 가능
TO-BE: 관계 보존 Chunking 구현
from typing import List, Dict, Any
from dataclasses import dataclass
import json
@dataclass
class RelationalChunk:
"""관계 정보를 보존하는 Chunk 구조"""
text: str # 임베딩할 텍스트
metadata: Dict[str, Any] # 관계 식별자들
parent_id: str = None # Parent-Child 패턴용
chunk_type: str = "standalone" # standalone, parent, child
def create_customer_centric_chunks(db_connection) -> List[RelationalChunk]:
"""
TO-BE: 고객 중심으로 관계를 보존하는 Chunking
핵심: 고객 한 명의 전체 컨텍스트를 하나의 Chunk 그룹으로
"""
query = """
WITH customer_orders AS (
SELECT
c.customer_id,
c.name,
c.email,
c.segment,
json_agg(
json_build_object(
'order_id', o.order_id,
'order_date', o.order_date,
'total_amount', o.total_amount,
'status', o.status,
'items', (
SELECT json_agg(
json_build_object(
'product_id', p.product_id,
'product_name', p.name,
'category', p.category,
'quantity', oi.quantity,
'unit_price', oi.unit_price,
'return_status', oi.return_status
)
)
FROM order_items oi
JOIN products p ON oi.product_id = p.product_id
WHERE oi.order_id = o.order_id
)
) ORDER BY o.order_date DESC
) as orders
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name, c.email, c.segment
)
SELECT * FROM customer_orders
"""
chunks = []
results = db_connection.execute(query)
for row in results:
# 고객 전체 프로필을 하나의 chunk로
chunk_text = format_customer_profile(row)
chunk = RelationalChunk(
text=chunk_text,
metadata={
"customer_id": row['customer_id'],
"customer_name": row['name'],
"segment": row['segment'],
"order_ids": [o['order_id'] for o in row['orders'] if o],
"categories": extract_categories(row['orders']),
"total_orders": len([o for o in row['orders'] if o]),
"has_returns": has_any_returns(row['orders']),
},
chunk_type="customer_profile"
)
chunks.append(chunk)
return chunks
def format_customer_profile(row: Dict) -> str:
"""
고객 프로필을 검색 가능한 텍스트로 포맷팅
관계 정보를 자연어로 명시적 표현
"""
lines = [
f"## 고객 프로필: {row['name']}",
f"고객 ID: {row['customer_id']}",
f"이메일: {row['email']}",
f"등급: {row['segment']}",
"",
"### 주문 이력",
]
orders = row.get('orders', [])
if not orders or orders[0] is None:
lines.append("주문 내역 없음")
else:
for order in orders:
lines.append(f"\n#### 주문 #{order['order_id']} ({order['order_date']})")
lines.append(f"상태: {order['status']} | 총액: {order['total_amount']:,}원")
items = order.get('items', [])
if items:
lines.append("포함 상품:")
for item in items:
return_info = f" [반품: {item['return_status']}]" if item['return_status'] != 'none' else ""
lines.append(
f" - {item['product_name']} ({item['category']}) "
f"x{item['quantity']} @ {item['unit_price']:,}원{return_info}"
)
# 요약 통계 (검색 키워드로 활용)
lines.extend([
"",
"### 요약",
f"총 주문 수: {len([o for o in orders if o])}건",
f"구매 카테고리: {', '.join(extract_categories(orders))}",
f"반품 이력: {'있음' if has_any_returns(orders) else '없음'}",
])
return "\n".join(lines)
def extract_categories(orders: List[Dict]) -> List[str]:
"""주문에서 고유 카테고리 추출"""
categories = set()
for order in orders:
if order and order.get('items'):
for item in order['items']:
if item.get('category'):
categories.add(item['category'])
return list(categories)
def has_any_returns(orders: List[Dict]) -> bool:
"""반품 이력 존재 여부"""
for order in orders:
if order and order.get('items'):
for item in order['items']:
if item.get('return_status') not in (None, 'none'):
return True
return False관계 보존 Chunk의 실제 모습
## 고객 프로필: 김철수
고객 ID: 123
이메일: kim@example.com
등급: VIP
### 주문 이력
#### 주문 #456 (2024-01-15)
상태: completed | 총액: 1,500,000원
포함 상품:
- MacBook Pro (전자제품) x1 @ 1,200,000원
- Magic Mouse (전자제품) x2 @ 150,000원 [반품: completed]
#### 주문 #789 (2024-02-20)
상태: completed | 총액: 89,000원
포함 상품:
- 기계식 키보드 (전자제품) x1 @ 89,000원 [반품: completed]
### 요약
총 주문 수: 2건
구매 카테고리: 전자제품
반품 이력: 있음이제 "김철수가 반품한 상품은?"이라는 질문에 정확히 답할 수 있습니다.
5. Parent-Child Document 패턴 심층 분석
패턴 개요
대용량 관계형 데이터에서는 모든 관계를 하나의 chunk에 넣을 수 없습니다. 이때 Parent-Child Document 패턴을 사용합니다.
Parent-Child Document 패턴
Parent Document (요약)
전체 맥락을 담은 요약 정보
모든 Child의 ID 참조
검색 시 Parent만으로도 관련성 판단 가능
Child Documents (상세)
개별 상세 정보
Parent ID 역참조
세부 검색 시 활용
검색 전략
Step 1: Child 검색으로 관련 문서 찾기
Step 2: Child의 Parent 조회
Step 3: Parent의 전체 맥락을 LLM에 전달
구현 코드
from typing import List, Tuple
from dataclasses import dataclass, field
import uuid
@dataclass
class ParentDocument:
"""부모 문서: 전체 맥락 + 자식 참조"""
doc_id: str
text: str
child_ids: List[str]
metadata: Dict[str, Any]
@dataclass
class ChildDocument:
"""자식 문서: 상세 정보 + 부모 참조"""
doc_id: str
parent_id: str
text: str
metadata: Dict[str, Any]
class ParentChildChunker:
"""
Parent-Child 패턴으로 관계형 데이터 Chunking
사용 시나리오:
- 고객(Parent) - 개별 주문(Child)
- 제품(Parent) - 개별 리뷰(Child)
- 문서(Parent) - 섹션/단락(Child)
"""
def __init__(self, db_connection):
self.db = db_connection
def create_customer_order_chunks(self) -> Tuple[List[ParentDocument], List[ChildDocument]]:
"""
고객-주문 관계를 Parent-Child 구조로 변환
Parent: 고객 요약 (기본 정보 + 주문 통계)
Child: 개별 주문 상세
"""
parents = []
children = []
# 고객별 처리
customers = self.db.query("SELECT * FROM customers")
for customer in customers:
customer_id = customer['customer_id']
parent_id = f"customer_{customer_id}"
# 해당 고객의 주문들 조회
orders = self.db.query("""
SELECT o.*,
json_agg(
json_build_object(
'product_name', p.name,
'category', p.category,
'quantity', oi.quantity,
'return_status', oi.return_status
)
) as items
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
WHERE o.customer_id = %s
GROUP BY o.order_id
""", [customer_id])
child_ids = []
# Child documents 생성 (개별 주문)
for order in orders:
child_id = f"order_{order['order_id']}"
child_ids.append(child_id)
child_text = self._format_order_detail(order, customer)
child = ChildDocument(
doc_id=child_id,
parent_id=parent_id,
text=child_text,
metadata={
"order_id": order['order_id'],
"customer_id": customer_id,
"order_date": str(order['order_date']),
"categories": list(set(i['category'] for i in order['items'])),
"has_return": any(i['return_status'] != 'none' for i in order['items']),
}
)
children.append(child)
# Parent document 생성 (고객 요약)
parent_text = self._format_customer_summary(customer, orders)
parent = ParentDocument(
doc_id=parent_id,
text=parent_text,
child_ids=child_ids,
metadata={
"customer_id": customer_id,
"segment": customer['segment'],
"total_orders": len(orders),
"order_ids": [o['order_id'] for o in orders],
}
)
parents.append(parent)
return parents, children
def _format_customer_summary(self, customer: Dict, orders: List[Dict]) -> str:
"""Parent 문서용 고객 요약 포맷"""
total_amount = sum(o['total_amount'] for o in orders)
categories = set()
return_count = 0
for order in orders:
for item in order['items']:
categories.add(item['category'])
if item['return_status'] != 'none':
return_count += 1
return f"""
고객 요약: {customer['name']} (ID: {customer['customer_id']})
등급: {customer['segment']}
이메일: {customer['email']}
주문 통계:
- 총 주문 수: {len(orders)}건
- 총 구매액: {total_amount:,.0f}원
- 구매 카테고리: {', '.join(categories)}
- 반품 건수: {return_count}건
이 고객의 상세 주문 정보는 개별 주문 문서를 참조하세요.
주문 ID 목록: {', '.join(str(o['order_id']) for o in orders)}
""".strip()
def _format_order_detail(self, order: Dict, customer: Dict) -> str:
"""Child 문서용 주문 상세 포맷"""
items_text = []
for item in order['items']:
return_info = f" (반품: {item['return_status']})" if item['return_status'] != 'none' else ""
items_text.append(f" - {item['product_name']} [{item['category']}] x{item['quantity']}{return_info}")
return f"""
주문 상세: #{order['order_id']}
고객: {customer['name']} (ID: {customer['customer_id']}, {customer['segment']})
주문일: {order['order_date']}
상태: {order['status']}
총액: {order['total_amount']:,.0f}원
주문 상품:
{chr(10).join(items_text)}
""".strip()Parent-Child 검색 전략
class ParentChildRetriever:
"""
Parent-Child 구조에서의 검색 전략
핵심: Child로 검색하고, Parent로 맥락 확보
"""
def __init__(self, vector_db, sql_db):
self.vector_db = vector_db
self.sql_db = sql_db
def retrieve(self, query: str, top_k: int = 5) -> List[Dict]:
"""
2단계 검색 전략
1. 벡터 검색으로 관련 Child 찾기
2. Child의 Parent 조회하여 전체 맥락 확보
"""
# Step 1: Child documents에서 검색
child_results = self.vector_db.search(
query=query,
filter={"doc_type": "child"},
top_k=top_k
)
# Step 2: 관련 Parent IDs 수집
parent_ids = set()
for result in child_results:
parent_ids.add(result.metadata['parent_id'])
# Step 3: Parent documents 조회
parents = self.vector_db.get_by_ids(list(parent_ids))
# Step 4: 결과 조합
enriched_results = []
for child in child_results:
parent = next(
(p for p in parents if p.doc_id == child.metadata['parent_id']),
None
)
enriched_results.append({
"child": child,
"parent": parent,
"context": self._build_context(parent, child),
})
return enriched_results
def _build_context(self, parent: ParentDocument, child: ChildDocument) -> str:
"""LLM에게 전달할 통합 컨텍스트 생성"""
return f"""
[전체 맥락 - 고객 정보]
{parent.text}
[상세 정보 - 관련 주문]
{child.text}
"""
def retrieve_with_sql_fallback(self, query: str, customer_id: int = None) -> Dict:
"""
벡터 검색 + SQL 보완 전략
벡터 검색으로 관련 문서를 찾고,
정확한 수치가 필요하면 SQL로 보완
"""
# 벡터 검색
vector_results = self.retrieve(query)
# SQL로 정확한 데이터 보완
if customer_id and self._needs_exact_numbers(query):
sql_data = self.sql_db.query("""
SELECT
COUNT(DISTINCT o.order_id) as order_count,
SUM(o.total_amount) as total_spent,
COUNT(CASE WHEN oi.return_status != 'none' THEN 1 END) as return_count
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
JOIN order_items oi ON o.order_id = oi.order_id
WHERE c.customer_id = %s
""", [customer_id])
return {
"vector_context": vector_results,
"exact_data": sql_data,
}
return {"vector_context": vector_results}
def _needs_exact_numbers(self, query: str) -> bool:
"""정확한 수치가 필요한 쿼리인지 판단"""
exact_keywords = ['총', '합계', '몇 개', '몇 건', '얼마', '평균', '최대', '최소']
return any(kw in query for kw in exact_keywords)6. SQL JOIN 결과를 Embedding할 때 주의점

JOIN의 함정: 데이터 폭발
-- 단순해 보이는 JOIN
SELECT c.name, o.order_id, p.name as product
FROM customers c
JOIN orders o ON c.customer_id = o.customer_id
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id;문제점:
원본 레코드 수:
- customers: 1,000
- orders: 10,000
- order_items: 50,000
- products: 5,000
JOIN 결과: 50,000 rows (order_items 기준으로 폭발)
각 row를 chunk로 만들면:
- 50,000개의 chunk
- 동일 고객 정보가 수십~수백 번 중복
- 임베딩 비용 폭증
- 검색 시 중복 결과올바른 JOIN 전략
class SmartJoinChunker:
"""
JOIN 결과를 효율적으로 Chunking하는 전략
"""
def __init__(self, db_connection):
self.db = db_connection
def chunk_with_aggregation(self) -> List[RelationalChunk]:
"""
전략 1: GROUP BY + JSON 집계
1:N 관계를 JSON 배열로 집계하여 중복 제거
"""
query = """
SELECT
c.customer_id,
c.name,
c.segment,
-- 주문을 JSON 배열로 집계
json_agg(
DISTINCT jsonb_build_object(
'order_id', o.order_id,
'order_date', o.order_date,
'items', (
SELECT json_agg(
jsonb_build_object(
'product', p.name,
'category', p.category,
'qty', oi.quantity
)
)
FROM order_items oi
JOIN products p ON oi.product_id = p.product_id
WHERE oi.order_id = o.order_id
)
)
) as orders
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
GROUP BY c.customer_id, c.name, c.segment
"""
results = self.db.query(query)
chunks = []
for row in results:
chunk = RelationalChunk(
text=self._format_aggregated_customer(row),
metadata={
"customer_id": row['customer_id'],
"type": "customer_profile",
}
)
chunks.append(chunk)
return chunks # customers 수만큼만 생성 (1,000개)
def chunk_with_window_functions(self) -> List[RelationalChunk]:
"""
전략 2: Window Function으로 컨텍스트 보존
각 레코드에 관련 컨텍스트를 포함
"""
query = """
WITH order_context AS (
SELECT
o.order_id,
o.customer_id,
o.order_date,
c.name as customer_name,
c.segment,
-- 동일 고객의 다른 주문 수
COUNT(*) OVER (PARTITION BY o.customer_id) as customer_total_orders,
-- 동일 고객의 총 구매액
SUM(o.total_amount) OVER (PARTITION BY o.customer_id) as customer_total_spent,
-- 주문 순서
ROW_NUMBER() OVER (PARTITION BY o.customer_id ORDER BY o.order_date) as order_sequence
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
)
SELECT
oc.*,
json_agg(
jsonb_build_object(
'product', p.name,
'category', p.category,
'quantity', oi.quantity
)
) as items
FROM order_context oc
JOIN order_items oi ON oc.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
GROUP BY oc.order_id, oc.customer_id, oc.order_date,
oc.customer_name, oc.segment, oc.customer_total_orders,
oc.customer_total_spent, oc.order_sequence
"""
results = self.db.query(query)
chunks = []
for row in results:
# 각 주문이 고객 컨텍스트를 포함
chunk = RelationalChunk(
text=self._format_order_with_context(row),
metadata={
"order_id": row['order_id'],
"customer_id": row['customer_id'],
"type": "order_with_context",
}
)
chunks.append(chunk)
return chunks # orders 수만큼 생성 (10,000개), 하지만 각각 컨텍스트 포함
def _format_order_with_context(self, row: Dict) -> str:
"""주문 + 고객 컨텍스트 포맷"""
items_text = "\n".join([
f" - {item['product']} ({item['category']}) x{item['quantity']}"
for item in row['items']
])
return f"""
주문 #{row['order_id']} ({row['order_date']})
고객 정보:
- 이름: {row['customer_name']} (ID: {row['customer_id']})
- 등급: {row['segment']}
- 총 주문 수: {row['customer_total_orders']}건 (이 주문은 {row['order_sequence']}번째)
- 누적 구매액: {row['customer_total_spent']:,.0f}원
주문 상품:
{items_text}
""".strip()중복 제거와 Deduplication
class ChunkDeduplicator:
"""
Embedding 전 중복 chunk 제거
"""
def deduplicate_chunks(
self,
chunks: List[RelationalChunk],
strategy: str = "exact"
) -> List[RelationalChunk]:
"""
중복 제거 전략
- exact: 정확히 같은 텍스트 제거
- semantic: 의미적으로 유사한 것도 제거 (임베딩 비용 발생)
- id_based: metadata의 primary key 기준
"""
if strategy == "exact":
seen = set()
unique = []
for chunk in chunks:
text_hash = hash(chunk.text)
if text_hash not in seen:
seen.add(text_hash)
unique.append(chunk)
return unique
elif strategy == "id_based":
# Primary key 기준 중복 제거
seen_ids = set()
unique = []
for chunk in chunks:
primary_key = self._get_primary_key(chunk)
if primary_key not in seen_ids:
seen_ids.add(primary_key)
unique.append(chunk)
return unique
elif strategy == "semantic":
return self._semantic_dedup(chunks)
def _get_primary_key(self, chunk: RelationalChunk) -> str:
"""Chunk의 primary key 추출"""
meta = chunk.metadata
if "customer_id" in meta and "order_id" not in meta:
return f"customer_{meta['customer_id']}"
elif "order_id" in meta:
return f"order_{meta['order_id']}"
else:
return hash(chunk.text)
def _semantic_dedup(
self,
chunks: List[RelationalChunk],
threshold: float = 0.95
) -> List[RelationalChunk]:
"""
의미적 유사도 기반 중복 제거
주의: 임베딩 API 호출 필요
"""
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
# 임베딩 생성
embeddings = [self._embed(chunk.text) for chunk in chunks]
embeddings = np.array(embeddings)
# 유사도 행렬
similarity_matrix = cosine_similarity(embeddings)
# 중복 마킹
unique_indices = []
removed = set()
for i in range(len(chunks)):
if i in removed:
continue
unique_indices.append(i)
# i와 유사한 것들 제거 대상으로
for j in range(i + 1, len(chunks)):
if similarity_matrix[i, j] > threshold:
removed.add(j)
return [chunks[i] for i in unique_indices]7. Metadata 필터링 vs 순수 Vector Search
두 접근법의 차이
검색 전략 비교
순수 Vector Search
장점: 의미적 유사성 탐색, 유연한 검색
단점: 정확한 필터링 불가, 관계 무시
적합: "~와 비슷한 것 찾아줘"
Metadata 필터링 + Vector Search
장점: 정확한 조건 필터링, 관계 ID로 연결
단점: 필터 조건 명시 필요
적합: "VIP 고객 중에서 ~한 것 찾아줘"
Hybrid (SQL + Vector)
장점: SQL로 정확한 필터 + Vector로 유사성
단점: 구현 복잡도 증가
적합: 복잡한 비즈니스 쿼리
Metadata 설계 원칙
class MetadataDesigner:
"""
효과적인 Metadata 설계
원칙:
1. 관계 ID는 필수 (customer_id, order_id 등)
2. 자주 필터링하는 속성만 포함
3. 값의 카디널리티 고려 (너무 세분화하면 필터 효과 없음)
"""
@staticmethod
def design_customer_metadata(customer: Dict, orders: List[Dict]) -> Dict:
"""
고객 chunk용 metadata 설계
"""
# 필수: 관계 식별자
metadata = {
"customer_id": customer['customer_id'],
"type": "customer_profile",
}
# 권장: 자주 필터링하는 속성
metadata.update({
"segment": customer['segment'], # VIP, Regular 등
"registration_year": customer['created_at'].year,
})
# 권장: 집계된 검색 조건
categories = set()
has_returns = False
for order in orders:
for item in order.get('items', []):
categories.add(item['category'])
if item.get('return_status') not in (None, 'none'):
has_returns = True
metadata.update({
"categories": list(categories), # 리스트 필터링 지원
"has_returns": has_returns, # 불리언 필터
"total_orders": len(orders), # 범위 필터용
})
# 비권장: 너무 세부적인 정보
# - 개별 주문 날짜들 (SQL로 처리)
# - 개별 상품 ID들 (너무 많음)
# - 정확한 금액 (범위 쿼리는 SQL이 적합)
return metadataHybrid 검색 구현
class HybridRetriever:
"""
SQL 필터링 + Vector Search 결합
전략:
1. SQL로 후보군 좁히기 (정확한 조건)
2. Vector Search로 관련성 정렬 (의미적 유사성)
"""
def __init__(self, sql_db, vector_db):
self.sql_db = sql_db
self.vector_db = vector_db
def search(
self,
query: str,
filters: Dict = None,
top_k: int = 10
) -> List[Dict]:
"""
Hybrid 검색 실행
예시:
query = "반품 관련 불만 사항"
filters = {
"segment": "VIP",
"has_returns": True,
"order_date_after": "2024-01-01"
}
"""
# Step 1: SQL로 후보 ID 추출
candidate_ids = self._sql_filter(filters)
if not candidate_ids:
# 조건에 맞는 데이터 없음
return []
# Step 2: Vector Search with ID 필터
vector_results = self.vector_db.search(
query=query,
filter={"customer_id": {"$in": candidate_ids}},
top_k=top_k
)
# Step 3: SQL로 상세 정보 보강
enriched = self._enrich_with_sql(vector_results)
return enriched
def _sql_filter(self, filters: Dict) -> List[int]:
"""SQL로 조건에 맞는 customer_id 추출"""
if not filters:
return None # 필터 없으면 전체 대상
conditions = ["1=1"]
params = []
if filters.get("segment"):
conditions.append("c.segment = %s")
params.append(filters["segment"])
if filters.get("has_returns"):
conditions.append("""
EXISTS (
SELECT 1 FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
WHERE o.customer_id = c.customer_id
AND oi.return_status != 'none'
)
""")
if filters.get("order_date_after"):
conditions.append("""
EXISTS (
SELECT 1 FROM orders o
WHERE o.customer_id = c.customer_id
AND o.order_date >= %s
)
""")
params.append(filters["order_date_after"])
query = f"""
SELECT DISTINCT c.customer_id
FROM customers c
WHERE {' AND '.join(conditions)}
"""
results = self.sql_db.query(query, params)
return [r['customer_id'] for r in results]
def _enrich_with_sql(self, vector_results: List) -> List[Dict]:
"""Vector 결과에 SQL 데이터 보강"""
customer_ids = [r.metadata['customer_id'] for r in vector_results]
sql_data = self.sql_db.query("""
SELECT
c.customer_id,
c.name,
COUNT(o.order_id) as order_count,
SUM(o.total_amount) as total_spent,
MAX(o.order_date) as last_order_date
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
WHERE c.customer_id = ANY(%s)
GROUP BY c.customer_id, c.name
""", [customer_ids])
sql_lookup = {d['customer_id']: d for d in sql_data}
enriched = []
for result in vector_results:
cid = result.metadata['customer_id']
enriched.append({
"text": result.text,
"score": result.score,
"metadata": result.metadata,
"sql_data": sql_lookup.get(cid, {}),
})
return enriched성능 비교

def benchmark_search_strategies():
"""
검색 전략별 성능 비교
"""
queries = [
{
"question": "VIP 고객 중 반품이 많은 사람은?",
"expected_filter": {"segment": "VIP", "has_returns": True},
},
{
"question": "전자제품을 자주 구매하는 고객의 특성",
"expected_filter": {"categories": {"$contains": "전자제품"}},
},
{
"question": "최근 3개월 구매 이력이 있는 휴면 고객",
"expected_filter": {
"order_date_after": "2024-09-01",
"segment": "Dormant",
},
},
]
results = []
for q in queries:
# 전략 1: 순수 Vector Search
start = time.time()
pure_vector = vector_db.search(q["question"], top_k=10)
pure_vector_time = time.time() - start
pure_vector_precision = calculate_precision(pure_vector, q)
# 전략 2: Metadata 필터 + Vector
start = time.time()
metadata_vector = vector_db.search(
q["question"],
filter=q["expected_filter"],
top_k=10
)
metadata_time = time.time() - start
metadata_precision = calculate_precision(metadata_vector, q)
# 전략 3: SQL + Vector Hybrid
start = time.time()
hybrid = hybrid_retriever.search(
q["question"],
filters=q["expected_filter"],
top_k=10
)
hybrid_time = time.time() - start
hybrid_precision = calculate_precision(hybrid, q)
results.append({
"question": q["question"],
"pure_vector": {"time": pure_vector_time, "precision": pure_vector_precision},
"metadata": {"time": metadata_time, "precision": metadata_precision},
"hybrid": {"time": hybrid_time, "precision": hybrid_precision},
})
return results
# 예상 결과:
# ┌─────────────────────────────────┬─────────────┬─────────────┬─────────────┐
# │ Query │ Pure Vector │ Metadata │ Hybrid │
# ├─────────────────────────────────┼─────────────┼─────────────┼─────────────┤
# │ VIP 고객 중 반품 많은 사람 │ P: 0.3 │ P: 0.7 │ P: 0.9 │
# │ │ T: 50ms │ T: 45ms │ T: 80ms │
# ├─────────────────────────────────┼─────────────┼─────────────┼─────────────┤
# │ 전자제품 자주 구매 고객 │ P: 0.4 │ P: 0.8 │ P: 0.95 │
# │ │ T: 48ms │ T: 52ms │ T: 85ms │
# └─────────────────────────────────┴─────────────┴─────────────┴─────────────┘8. 실전 구현: 고객-주문-상품 RAG 시스템
전체 아키텍처
"""
E-Commerce RAG 시스템 전체 구현
구성요소:
1. DataLoader: SQL에서 관계 데이터 로드
2. ChunkBuilder: 관계 보존 Chunking
3. Indexer: 벡터 DB 인덱싱
4. Retriever: Hybrid 검색
5. Generator: LLM 응답 생성
"""
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
import json
from datetime import datetime
import psycopg2
from openai import OpenAI
# 설정
@dataclass
class RAGConfig:
# Database
db_host: str = "localhost"
db_name: str = "ecommerce"
db_user: str = "user"
db_password: str = "password"
# Vector DB
vector_db_url: str = "http://localhost:6333"
collection_name: str = "customer_profiles"
# Embedding
embedding_model: str = "text-embedding-3-small"
embedding_dimension: int = 1536
# LLM
llm_model: str = "gpt-4-turbo-preview"
max_tokens: int = 2000
# Chunking
chunk_strategy: str = "customer_centric" # customer_centric, order_centric, parent_child
class EcommerceRAG:
"""
E-Commerce RAG 시스템 메인 클래스
"""
def __init__(self, config: RAGConfig):
self.config = config
self.sql_db = self._connect_sql()
self.vector_db = self._connect_vector_db()
self.openai = OpenAI()
def _connect_sql(self):
return psycopg2.connect(
host=self.config.db_host,
database=self.config.db_name,
user=self.config.db_user,
password=self.config.db_password
)
def _connect_vector_db(self):
from qdrant_client import QdrantClient
return QdrantClient(url=self.config.vector_db_url)
# =========================================
# 1. 데이터 로드 및 Chunking
# =========================================
def build_index(self):
"""
SQL 데이터를 벡터 DB에 인덱싱
"""
print("Starting index build...")
# Step 1: 관계 데이터 로드
customers = self._load_customer_data()
print(f"Loaded {len(customers)} customers with orders")
# Step 2: Chunk 생성
chunks = self._create_chunks(customers)
print(f"Created {len(chunks)} chunks")
# Step 3: 임베딩 및 인덱싱
self._index_chunks(chunks)
print("Indexing complete!")
return len(chunks)
def _load_customer_data(self) -> List[Dict]:
"""
고객-주문-상품 관계 데이터 로드
핵심: JSON 집계로 1:N 관계를 하나의 레코드로
"""
query = """
WITH order_details AS (
SELECT
o.order_id,
o.customer_id,
o.order_date,
o.total_amount,
o.status,
json_agg(
json_build_object(
'product_id', p.product_id,
'product_name', p.name,
'category', p.category,
'quantity', oi.quantity,
'unit_price', oi.unit_price,
'return_status', oi.return_status
)
) as items
FROM orders o
JOIN order_items oi ON o.order_id = oi.order_id
JOIN products p ON oi.product_id = p.product_id
GROUP BY o.order_id
)
SELECT
c.customer_id,
c.name,
c.email,
c.segment,
c.created_at,
COALESCE(
json_agg(
json_build_object(
'order_id', od.order_id,
'order_date', od.order_date,
'total_amount', od.total_amount,
'status', od.status,
'items', od.items
) ORDER BY od.order_date DESC
) FILTER (WHERE od.order_id IS NOT NULL),
'[]'::json
) as orders
FROM customers c
LEFT JOIN order_details od ON c.customer_id = od.customer_id
GROUP BY c.customer_id, c.name, c.email, c.segment, c.created_at
"""
with self.sql_db.cursor() as cur:
cur.execute(query)
columns = [desc[0] for desc in cur.description]
results = [dict(zip(columns, row)) for row in cur.fetchall()]
return results
def _create_chunks(self, customers: List[Dict]) -> List[Dict]:
"""
관계 보존 Chunk 생성
"""
chunks = []
for customer in customers:
chunk_text = self._format_customer_chunk(customer)
# Metadata 설계
orders = customer['orders'] if customer['orders'] else []
categories = self._extract_categories(orders)
metadata = {
"customer_id": customer['customer_id'],
"customer_name": customer['name'],
"segment": customer['segment'],
"categories": categories,
"has_returns": self._has_returns(orders),
"total_orders": len(orders),
"total_spent": sum(o['total_amount'] for o in orders if o.get('total_amount')),
"last_order_date": orders[0]['order_date'] if orders else None,
}
chunks.append({
"text": chunk_text,
"metadata": metadata,
})
return chunks
def _format_customer_chunk(self, customer: Dict) -> str:
"""
고객 데이터를 검색 가능한 텍스트로 변환
핵심: 관계 정보를 명시적으로 포함
"""
lines = [
f"# 고객 프로필: {customer['name']}",
f"",
f"## 기본 정보",
f"- 고객 ID: {customer['customer_id']}",
f"- 이메일: {customer['email']}",
f"- 등급: {customer['segment']}",
f"- 가입일: {customer['created_at']}",
f"",
]
orders = customer['orders'] if customer['orders'] else []
if orders:
# 요약 통계
total_spent = sum(o['total_amount'] for o in orders if o.get('total_amount'))
categories = self._extract_categories(orders)
return_count = sum(
1 for o in orders
for item in (o.get('items') or [])
if item.get('return_status') not in (None, 'none')
)
lines.extend([
f"## 구매 요약",
f"- 총 주문 수: {len(orders)}건",
f"- 총 구매액: {total_spent:,.0f}원",
f"- 구매 카테고리: {', '.join(categories)}",
f"- 반품 건수: {return_count}건",
f"",
f"## 주문 상세",
])
# 최근 10건만 상세 포함 (너무 길면 truncate)
for order in orders[:10]:
lines.append(f"")
lines.append(f"### 주문 #{order['order_id']} ({order['order_date']})")
lines.append(f"상태: {order['status']} | 금액: {order['total_amount']:,.0f}원")
items = order.get('items') or []
if items:
lines.append("상품:")
for item in items:
return_info = ""
if item.get('return_status') not in (None, 'none'):
return_info = f" [반품: {item['return_status']}]"
lines.append(
f" - {item['product_name']} ({item['category']}) "
f"x{item['quantity']} @ {item['unit_price']:,.0f}원{return_info}"
)
if len(orders) > 10:
lines.append(f"")
lines.append(f"... 외 {len(orders) - 10}건의 주문 이력")
else:
lines.extend([
f"## 구매 이력",
f"주문 내역 없음",
])
return "\n".join(lines)
def _extract_categories(self, orders: List[Dict]) -> List[str]:
categories = set()
for order in orders:
for item in (order.get('items') or []):
if item.get('category'):
categories.add(item['category'])
return list(categories)
def _has_returns(self, orders: List[Dict]) -> bool:
for order in orders:
for item in (order.get('items') or []):
if item.get('return_status') not in (None, 'none'):
return True
return False
# =========================================
# 2. 인덱싱
# =========================================
def _index_chunks(self, chunks: List[Dict]):
"""
Chunk들을 벡터 DB에 인덱싱
"""
from qdrant_client.models import Distance, VectorParams, PointStruct
# 컬렉션 생성
self.vector_db.recreate_collection(
collection_name=self.config.collection_name,
vectors_config=VectorParams(
size=self.config.embedding_dimension,
distance=Distance.COSINE
)
)
# 배치 임베딩 및 인덱싱
batch_size = 100
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
# 임베딩 생성
texts = [c['text'] for c in batch]
embeddings = self._embed_texts(texts)
# 포인트 생성
points = [
PointStruct(
id=i + j,
vector=embeddings[j],
payload={
"text": batch[j]['text'],
**batch[j]['metadata']
}
)
for j in range(len(batch))
]
# 업로드
self.vector_db.upsert(
collection_name=self.config.collection_name,
points=points
)
print(f"Indexed {min(i + batch_size, len(chunks))}/{len(chunks)} chunks")
def _embed_texts(self, texts: List[str]) -> List[List[float]]:
"""텍스트 임베딩 생성"""
response = self.openai.embeddings.create(
model=self.config.embedding_model,
input=texts
)
return [item.embedding for item in response.data]
# =========================================
# 3. 검색 및 응답 생성
# =========================================
def query(
self,
question: str,
filters: Dict = None,
top_k: int = 5
) -> Dict:
"""
RAG 질의 실행
Args:
question: 사용자 질문
filters: 메타데이터 필터 (optional)
top_k: 검색 결과 수
Returns:
{
"answer": str,
"sources": List[Dict],
"sql_supplement": Dict (optional)
}
"""
# Step 1: 질문 분석 및 필터 추출
parsed = self._parse_question(question)
# Step 2: 벡터 검색
search_results = self._search(
question,
filters=filters or parsed.get('filters'),
top_k=top_k
)
# Step 3: SQL 보완 (정확한 수치 필요시)
sql_data = None
if parsed.get('needs_exact_data'):
sql_data = self._get_exact_data(parsed)
# Step 4: LLM 응답 생성
answer = self._generate_answer(question, search_results, sql_data)
return {
"answer": answer,
"sources": [
{
"customer_id": r.payload['customer_id'],
"customer_name": r.payload['customer_name'],
"score": r.score,
}
for r in search_results
],
"sql_supplement": sql_data,
}
def _parse_question(self, question: str) -> Dict:
"""
질문에서 필터 조건과 데이터 요구사항 추출
"""
# 간단한 규칙 기반 파싱 (프로덕션에서는 LLM 활용 권장)
parsed = {
"filters": {},
"needs_exact_data": False,
}
# 세그먼트 필터
if "VIP" in question.upper():
parsed["filters"]["segment"] = "VIP"
elif "휴면" in question or "dormant" in question.lower():
parsed["filters"]["segment"] = "Dormant"
# 반품 필터
if "반품" in question or "return" in question.lower():
parsed["filters"]["has_returns"] = True
# 카테고리 필터
categories = ["전자제품", "의류", "식품", "가구"]
for cat in categories:
if cat in question:
parsed["filters"]["categories"] = {"$contains": cat}
break
# 정확한 수치 필요 여부
exact_keywords = ['총', '합계', '몇 개', '몇 건', '얼마', '평균', '최대', '최소', '비율']
parsed["needs_exact_data"] = any(kw in question for kw in exact_keywords)
return parsed
def _search(
self,
query: str,
filters: Dict = None,
top_k: int = 5
) -> List:
"""벡터 검색 실행"""
from qdrant_client.models import Filter, FieldCondition, MatchValue
# 쿼리 임베딩
query_vector = self._embed_texts([query])[0]
# 필터 구성
qdrant_filter = None
if filters:
conditions = []
for key, value in filters.items():
if isinstance(value, dict) and "$contains" in value:
# 리스트 포함 조건
conditions.append(
FieldCondition(
key=key,
match=MatchValue(value=value["$contains"])
)
)
else:
conditions.append(
FieldCondition(
key=key,
match=MatchValue(value=value)
)
)
if conditions:
qdrant_filter = Filter(must=conditions)
# 검색 실행
results = self.vector_db.search(
collection_name=self.config.collection_name,
query_vector=query_vector,
query_filter=qdrant_filter,
limit=top_k
)
return results
def _get_exact_data(self, parsed: Dict) -> Dict:
"""SQL로 정확한 통계 데이터 조회"""
# 필터 조건 구성
where_clauses = ["1=1"]
params = []
if parsed["filters"].get("segment"):
where_clauses.append("c.segment = %s")
params.append(parsed["filters"]["segment"])
if parsed["filters"].get("has_returns"):
where_clauses.append("""
EXISTS (
SELECT 1 FROM order_items oi2
JOIN orders o2 ON oi2.order_id = o2.order_id
WHERE o2.customer_id = c.customer_id
AND oi2.return_status != 'none'
)
""")
query = f"""
SELECT
COUNT(DISTINCT c.customer_id) as customer_count,
COUNT(DISTINCT o.order_id) as order_count,
COALESCE(SUM(o.total_amount), 0) as total_revenue,
COALESCE(AVG(o.total_amount), 0) as avg_order_value,
COUNT(CASE WHEN oi.return_status != 'none' THEN 1 END) as return_count
FROM customers c
LEFT JOIN orders o ON c.customer_id = o.customer_id
LEFT JOIN order_items oi ON o.order_id = oi.order_id
WHERE {' AND '.join(where_clauses)}
"""
with self.sql_db.cursor() as cur:
cur.execute(query, params)
columns = [desc[0] for desc in cur.description]
row = cur.fetchone()
return dict(zip(columns, row)) if row else {}
def _generate_answer(
self,
question: str,
search_results: List,
sql_data: Dict = None
) -> str:
"""LLM으로 최종 응답 생성"""
# 컨텍스트 구성
context_parts = []
for i, result in enumerate(search_results, 1):
context_parts.append(f"[검색 결과 {i}] (관련도: {result.score:.2f})")
context_parts.append(result.payload['text'])
context_parts.append("")
context = "\n".join(context_parts)
# SQL 보완 데이터
sql_info = ""
if sql_data:
sql_info = f"""
정확한 통계 데이터:
- 해당 고객 수: {sql_data.get('customer_count', 'N/A')}명
- 총 주문 수: {sql_data.get('order_count', 'N/A')}건
- 총 매출액: {sql_data.get('total_revenue', 0):,.0f}원
- 평균 주문 금액: {sql_data.get('avg_order_value', 0):,.0f}원
- 반품 건수: {sql_data.get('return_count', 'N/A')}건
"""
prompt = f"""다음 고객 데이터를 기반으로 질문에 답변해주세요.
## 검색된 고객 정보
{context}
{sql_info}
## 질문
{question}
## 지시사항
1. 검색 결과에 있는 정보만 사용하여 답변하세요.
2. 정확한 수치가 제공된 경우 해당 수치를 인용하세요.
3. 확실하지 않은 정보는 "확인된 데이터에 따르면"으로 시작하세요.
4. 답을 찾을 수 없으면 솔직히 "제공된 데이터에서 해당 정보를 찾을 수 없습니다"라고 하세요.
"""
response = self.openai.chat.completions.create(
model=self.config.llm_model,
messages=[
{"role": "system", "content": "당신은 E-Commerce 데이터 분석 전문가입니다."},
{"role": "user", "content": prompt}
],
max_tokens=self.config.max_tokens,
temperature=0.3 # 낮은 temperature로 정확성 우선
)
return response.choices[0].message.content사용 예시
# 시스템 초기화
config = RAGConfig(
db_host="localhost",
db_name="ecommerce",
)
rag = EcommerceRAG(config)
# 인덱스 빌드 (최초 1회)
rag.build_index()
# 질의 실행
result = rag.query("VIP 고객 중 반품이 많은 사람은 누구인가요?")
print(result['answer'])
# 출력:
# 확인된 데이터에 따르면, VIP 고객 중 반품 이력이 있는 고객은 다음과 같습니다:
# 1. 김철수 (ID: 123) - 반품 2건 (Magic Mouse, 기계식 키보드)
# 2. 이영희 (ID: 456) - 반품 1건 (무선 이어폰)
# ...
# 정확한 수치가 필요한 질의
result = rag.query("VIP 고객의 총 매출액과 평균 주문 금액은?")
print(result['answer'])
# 출력:
# VIP 고객 통계:
# - 총 고객 수: 150명
# - 총 매출액: 2,500,000,000원
# - 평균 주문 금액: 850,000원9. 성능 벤치마크: 관계 보존 vs 미보존
실험 설계
"""
벤치마크 실험: 관계 보존 Chunking의 효과 측정
비교 대상:
1. Baseline: 테이블별 독립 Chunking
2. Flat JOIN: JOIN 결과를 row별로 Chunking
3. Proposed: 관계 보존 Chunking (Customer-centric)
"""
from dataclasses import dataclass
from typing import List, Dict, Tuple
import time
import numpy as np
@dataclass
class BenchmarkResult:
strategy: str
precision_at_5: float
recall_at_5: float
f1_at_5: float
latency_ms: float
hallucination_rate: float
chunk_count: int
index_time_sec: float
class RAGBenchmark:
"""RAG 시스템 벤치마크"""
def __init__(self, test_queries: List[Dict]):
"""
test_queries 형식:
[
{
"question": "VIP 고객 중 반품이 많은 사람은?",
"ground_truth": {
"customer_ids": [123, 456],
"expected_facts": ["김철수 반품 2건", "이영희 반품 1건"]
}
},
...
]
"""
self.test_queries = test_queries
self.results = []
def run_benchmark(self) -> List[BenchmarkResult]:
"""전체 벤치마크 실행"""
strategies = [
("baseline", self._build_baseline_index),
("flat_join", self._build_flat_join_index),
("proposed", self._build_proposed_index),
]
for name, build_fn in strategies:
print(f"\n{'='*50}")
print(f"Testing strategy: {name}")
print('='*50)
# 인덱스 빌드
start = time.time()
rag_system, chunk_count = build_fn()
index_time = time.time() - start
# 쿼리 실행 및 평가
metrics = self._evaluate_strategy(rag_system)
result = BenchmarkResult(
strategy=name,
precision_at_5=metrics['precision'],
recall_at_5=metrics['recall'],
f1_at_5=metrics['f1'],
latency_ms=metrics['latency'],
hallucination_rate=metrics['hallucination_rate'],
chunk_count=chunk_count,
index_time_sec=index_time
)
self.results.append(result)
return self.results
def _evaluate_strategy(self, rag_system) -> Dict:
"""전략 평가"""
precisions = []
recalls = []
latencies = []
hallucinations = []
for query in self.test_queries:
# 검색 실행
start = time.time()
result = rag_system.query(query['question'])
latency = (time.time() - start) * 1000
latencies.append(latency)
# Precision/Recall 계산
retrieved_ids = set(s['customer_id'] for s in result['sources'])
relevant_ids = set(query['ground_truth']['customer_ids'])
if retrieved_ids:
precision = len(retrieved_ids & relevant_ids) / len(retrieved_ids)
else:
precision = 0
if relevant_ids:
recall = len(retrieved_ids & relevant_ids) / len(relevant_ids)
else:
recall = 1
precisions.append(precision)
recalls.append(recall)
# Hallucination 체크
hallucination = self._check_hallucination(
result['answer'],
query['ground_truth']['expected_facts']
)
hallucinations.append(hallucination)
avg_precision = np.mean(precisions)
avg_recall = np.mean(recalls)
return {
'precision': avg_precision,
'recall': avg_recall,
'f1': 2 * avg_precision * avg_recall / (avg_precision + avg_recall + 1e-10),
'latency': np.mean(latencies),
'hallucination_rate': np.mean(hallucinations),
}
def _check_hallucination(self, answer: str, expected_facts: List[str]) -> float:
"""
Hallucination 비율 측정
반환: 0.0 (hallucination 없음) ~ 1.0 (완전 hallucination)
"""
# 간단한 방법: 예상 사실이 답변에 포함되어 있는지 확인
# 프로덕션에서는 NLI 모델 활용 권장
if not expected_facts:
return 0.0
found_facts = sum(1 for fact in expected_facts if fact in answer)
fact_coverage = found_facts / len(expected_facts)
# fact가 없는데 자신있게 답변하면 hallucination
# fact가 있으면 hallucination 아님
return 1.0 - fact_coverage
def print_results(self):
"""결과 출력"""
print("\n" + "="*80)
print("BENCHMARK RESULTS")
print("="*80)
headers = ["Strategy", "P@5", "R@5", "F1@5", "Latency", "Halluc.", "Chunks", "Index Time"]
print(f"\n{'Strategy':<15} {'P@5':>8} {'R@5':>8} {'F1@5':>8} {'Latency':>10} {'Halluc.':>8} {'Chunks':>8} {'Index':>10}")
print("-" * 80)
for r in self.results:
print(
f"{r.strategy:<15} "
f"{r.precision_at_5:>8.2%} "
f"{r.recall_at_5:>8.2%} "
f"{r.f1_at_5:>8.2%} "
f"{r.latency_ms:>8.1f}ms "
f"{r.hallucination_rate:>8.2%} "
f"{r.chunk_count:>8,} "
f"{r.index_time_sec:>8.1f}s"
)예상 벤치마크 결과
================================================================================
BENCHMARK RESULTS
================================================================================
Strategy P@5 R@5 F1@5 Latency Halluc. Chunks Index
--------------------------------------------------------------------------------
baseline 31.2% 28.5% 29.8% 45.2ms 68.5% 65,000 12.3s
flat_join 42.8% 35.2% 38.6% 52.1ms 45.2% 150,000 28.7s
proposed 78.5% 72.3% 75.3% 48.5ms 12.8% 10,000 8.2s
================================================================================
ANALYSIS
================================================================================
1. Precision/Recall 향상
- Proposed vs Baseline: +47.3%p (Precision), +43.8%p (Recall)
- 관계 보존으로 정확한 고객을 검색
2. Hallucination 감소
- Proposed: 12.8% vs Baseline: 68.5%
- 5배 이상 hallucination 감소
- 관계 정보가 명시되어 LLM이 추측하지 않음
3. 효율성
- Chunk 수: Proposed 10,000 vs Flat JOIN 150,000 (93% 감소)
- Index 시간: 8.2s vs 28.7s (71% 감소)
- 임베딩 API 비용 대폭 절감
4. Latency
- 비슷한 수준 (45~52ms)
- Chunk 수가 적어도 검색 품질 향상상세 분석: 왜 이런 차이가 나나요?
def analyze_failure_cases():
"""
Baseline 전략의 실패 사례 분석
"""
# 질문: "김철수가 반품한 상품은?"
# Baseline 검색 결과 (테이블별 독립 chunk)
baseline_retrieved = [
"김철수 고객은 VIP 등급입니다.", # 고객 정보만
"Magic Mouse는 전자제품입니다.", # 상품 정보만
"주문 #456은 2024-01-15에 생성되었습니다.", # 주문 정보만
]
# 문제: 세 정보 간의 연결 관계가 없음
# LLM은 "김철수"와 "Magic Mouse"가 관련있다고 추측 (hallucination)
# 실제로 김철수가 Magic Mouse를 샀는지 알 수 없음
# Proposed 검색 결과 (관계 보존 chunk)
proposed_retrieved = [
"""
고객 프로필: 김철수
- 주문 #456 (2024-01-15)
- Magic Mouse x2 [반품: completed]
- MacBook Pro x1
- 주문 #789 (2024-02-20)
- 기계식 키보드 x1 [반품: completed]
반품 이력: Magic Mouse, 기계식 키보드
"""
]
# 차이: 관계가 명시되어 있어 LLM이 정확히 답변 가능
# "김철수가 반품한 상품은 Magic Mouse와 기계식 키보드입니다."10. 프로덕션 체크리스트
데이터 파이프라인 체크리스트
## 관계형 데이터 → RAG 파이프라인 체크리스트
### 1. 데이터 분석 단계
- [ ] ERD 문서화 완료
- [ ] 주요 1:N, N:M 관계 식별
- [ ] 각 테이블의 카디널리티 확인
- [ ] 자주 조회되는 쿼리 패턴 분석
- [ ] 데이터 갱신 주기 파악
### 2. Chunking 전략 설계
- [ ] 관계의 "1" 측 식별 (고객? 주문? 상품?)
- [ ] Parent-Child 패턴 필요 여부 결정
- [ ] Chunk 크기 결정 (토큰 제한 고려)
- [ ] Metadata 스키마 설계
- [ ] 중복 제거 전략 수립
### 3. SQL 쿼리 최적화
- [ ] JSON 집계 쿼리 성능 테스트
- [ ] 필요한 인덱스 생성
- [ ] 배치 처리 크기 결정
- [ ] 증분 업데이트 쿼리 작성
### 4. 임베딩 및 인덱싱
- [ ] 임베딩 모델 선택 및 테스트
- [ ] 벡터 DB 선택 (Qdrant, Pinecone, Weaviate 등)
- [ ] 인덱스 설정 (HNSW 파라미터 등)
- [ ] Metadata 필터링 인덱스 설정
### 5. 검색 전략
- [ ] Hybrid 검색 필요 여부 결정
- [ ] 필터 조건 추출 로직 구현
- [ ] Top-K 값 결정
- [ ] Reranking 필요 여부 검토
### 6. 품질 관리
- [ ] 테스트 쿼리 세트 구축 (최소 50개)
- [ ] Ground Truth 라벨링
- [ ] Hallucination 탐지 로직 구현
- [ ] 정기 품질 모니터링 대시보드
### 7. 운영 준비
- [ ] 데이터 동기화 스케줄러
- [ ] 에러 핸들링 및 재시도 로직
- [ ] 로깅 및 모니터링
- [ ] 비용 모니터링 (임베딩 API)흔한 실수와 해결책
"""
RAG + 관계형 데이터에서 흔한 실수들
"""
# 실수 1: 관계 없이 테이블별로 따로 임베딩
# ❌ Bad
for customer in customers:
embed(f"고객 {customer.name}")
for order in orders:
embed(f"주문 #{order.id}")
# ✅ Good
for customer in customers_with_orders:
embed(format_customer_with_orders(customer))
# 실수 2: JOIN 결과를 그대로 row별 chunk
# ❌ Bad
for row in db.query("SELECT * FROM customers JOIN orders JOIN items"):
embed(str(row)) # 동일 고객이 수십 번 중복
# ✅ Good
for customer in db.query(aggregated_query_with_json):
embed(format_aggregated(customer)) # 고객당 1개 chunk
# 실수 3: Metadata 없이 텍스트만 저장
# ❌ Bad
vector_db.insert({"text": chunk_text, "vector": embedding})
# ✅ Good
vector_db.insert({
"text": chunk_text,
"vector": embedding,
"customer_id": 123, # 관계 ID
"segment": "VIP", # 필터링용
"categories": ["전자제품", "의류"], # 다중 값
})
# 실수 4: 정확한 수치를 벡터 검색에만 의존
# ❌ Bad
question = "VIP 고객 총 매출액은?"
answer = rag.query(question) # 벡터 검색 결과에서 추정
# ✅ Good
question = "VIP 고객 총 매출액은?"
vector_context = rag.search(question)
exact_data = sql.query("SELECT SUM(amount) FROM orders WHERE segment='VIP'")
answer = llm.generate(context=vector_context, exact_data=exact_data)
# 실수 5: Chunk 크기 무시
# ❌ Bad
# 고객 한 명의 주문이 1000건이면 chunk가 너무 커짐
chunk = format_all_orders(customer) # 100,000 토큰
# ✅ Good
# Parent-Child 패턴 사용
parent = format_customer_summary(customer) # 500 토큰
children = [format_order(o) for o in customer.orders] # 각 200 토큰
# 실수 6: 동기화 누락
# ❌ Bad
# 최초 인덱싱 후 업데이트 없음
# ✅ Good
# 증분 동기화 구현
def sync_updates():
last_sync = get_last_sync_time()
updated = db.query(f"SELECT * FROM customers WHERE updated_at > {last_sync}")
for customer in updated:
vector_db.upsert(customer_id, new_embedding)성능 최적화 팁
"""
프로덕션 성능 최적화
"""
# 1. 배치 임베딩
# ❌ Bad: 하나씩 API 호출
for chunk in chunks:
embedding = openai.embed(chunk.text) # 1000번 API 호출
# ✅ Good: 배치 처리
batch_size = 100
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i+batch_size]
embeddings = openai.embed([c.text for c in batch]) # 10번 API 호출
# 2. 캐싱
from functools import lru_cache
@lru_cache(maxsize=1000)
def embed_query(query: str) -> List[float]:
return openai.embed(query)
# 동일 쿼리 반복 시 API 호출 절약
# 3. 비동기 처리
import asyncio
async def search_with_sql(query: str):
# 벡터 검색과 SQL 쿼리 병렬 실행
vector_task = asyncio.create_task(vector_db.search_async(query))
sql_task = asyncio.create_task(sql_db.query_async(stats_query))
vector_results, sql_data = await asyncio.gather(vector_task, sql_task)
return combine_results(vector_results, sql_data)
# 4. 인덱스 최적화 (Qdrant 예시)
from qdrant_client.models import OptimizersConfig
client.update_collection(
collection_name="customers",
optimizer_config=OptimizersConfig(
indexing_threshold=10000, # 10K 이상일 때만 인덱싱
memmap_threshold=50000, # 50K 이상이면 디스크 사용
)
)
# 5. Metadata 인덱스 (필터링 성능)
client.create_payload_index(
collection_name="customers",
field_name="segment",
field_schema="keyword" # 정확한 매칭용
)
client.create_payload_index(
collection_name="customers",
field_name="total_orders",
field_schema="integer" # 범위 쿼리용
)마무리: 핵심 요약
RAG + SQL 핵심 원칙
1. 관계를 텍스트로 명시하라
- FK 관계를 자연어로 풀어서 chunk에 포함
- "주문 #456은 고객 김철수(ID: 123)의 주문입니다"
2. "1"의 측을 기준으로 그룹핑하라
- 고객 중심: 고객 + 해당 고객의 모든 주문
- JOIN 폭발을 JSON 집계로 해결
3. Metadata에 관계 ID를 저장하라
- customer_id, order_id로 필터링
- 검색 후 SQL로 추가 조회 가능
4. 정확한 수치는 SQL로 보완하라
- 벡터 검색: 관련 문서 찾기
- SQL: 정확한 집계/통계
5. Parent-Child로 스케일하라
- 대용량 관계는 요약(Parent) + 상세(Child)
- Child로 검색, Parent로 맥락 확보
벡터 DB는 의미를 이해하지만, 관계는 이해하지 못합니다.
RAG 시스템의 정확도를 높이려면 벡터 DB 튜닝 이전에, 원본 데이터의 관계형 구조가 chunk에 어떻게 반영되는지부터 점검해야 합니다. 1:N 관계를 무시한 chunking은 아무리 좋은 임베딩 모델을 써도 hallucination을 만들 수밖에 없습니다.
SQL에서 관계를 정리하고, 그 관계를 보존하는 chunking 전략을 세우는 것 - 이것이 RAG 시스템 정확도 향상의 첫 번째 단계다.
참고 자료
- [LangChain: Parent Document Retriever](https://python.langchain.com/docs/modules/data_connection/retrievers/parent_document_retriever)
- [Qdrant: Hybrid Search](https://qdrant.tech/documentation/concepts/hybrid-queries/)
- [PostgreSQL: JSON Functions](https://www.postgresql.org/docs/current/functions-json.html)
- [OpenAI: Embeddings Best Practices](https://platform.openai.com/docs/guides/embeddings)