Skip to main content

On This Page

Draft / Scheduled Content

This article is a draft or scheduled for future publication. The content is subject to change.

Codexity Part 8: The Complete Answer Engine

13 min read
Share

Codexity Part 8: The Complete Answer Engine

Seven chapters of components. Time to weld them together.

This final chapter contains the complete, runnable source code for Codexity. Every module from the series, integrated, tested, and ready to deploy. By the end, you will run a single command and have a working answer engine that accepts questions, searches the web, scrapes pages, and streams cited answers.

Complete Codexity Architecture

Project Structure (Final)

codexity/
├── main.py
├── config.py
├── models.py
├── llm_client.py
├── query_rewriter.py
├── searcher.py
├── scraper.py
├── content_processor.py
├── synthesizer.py
├── pyproject.toml
├── Dockerfile
├── docker-compose.yml
├── .env.example
└── models/
    └── (download GGUF model here)

Complete Source: config.py

from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    # LLM
    model_path: str = "./models/qwen2.5-7b-instruct-q4_k_m.gguf"
    context_length: int = 8192
    max_tokens: int = 2048

    # Search
    max_search_results: int = 8
    max_queries: int = 3

    # Scraping
    scrape_timeout: int = 15
    max_concurrent_scrapes: int = 5
    use_playwright: bool = True

    # Content processing
    chunk_size: int = 512
    chunk_overlap: int = 50
    top_k_chunks: int = 10
    max_chunks_per_source: int = 3

    # Server
    host: str = "0.0.0.0"
    port: int = 8000
    log_level: str = "info"

    class Config:
        env_file = ".env"

settings = Settings()

Complete Source: models.py

from pydantic import BaseModel

class SearchResult(BaseModel):
    title: str
    url: str
    snippet: str

class ScrapedPage(BaseModel):
    url: str
    title: str
    content: str
    success: bool

class TextChunk(BaseModel):
    text: str
    source_url: str
    source_title: str
    relevance_score: float = 0.0

class SourceReference(BaseModel):
    index: int
    title: str
    url: str

Complete Source: llm_client.py

import asyncio
from llama_cpp import Llama

from config import settings

_llm: Llama | None = None

def get_llm() -> Llama:
    global _llm
    if _llm is None:
        _llm = Llama(
            model_path=settings.model_path,
            n_ctx=settings.context_length,
            n_threads=4,
            n_gpu_layers=0,
            verbose=False,
        )
    return _llm

def generate(prompt: str, max_tokens: int = 512, temperature: float = 0.1) -> str:
    llm = get_llm()
    response = llm.create_chat_completion(
        messages=[{"role": "user", "content": prompt}],
        max_tokens=max_tokens,
        temperature=temperature,
    )
    return response["choices"][0]["message"]["content"]

async def generate_streaming(
    prompt: str,
    system: str = "",
    max_tokens: int = 2048,
):
    llm = get_llm()
    messages = []
    if system:
        messages.append({"role": "system", "content": system})
    messages.append({"role": "user", "content": prompt})

    for chunk in llm.create_chat_completion(
        messages=messages,
        max_tokens=max_tokens,
        temperature=0.3,
        top_p=0.9,
        repeat_penalty=1.1,
        stream=True,
    ):
        delta = chunk["choices"][0].get("delta", {})
        if "content" in delta:
            yield delta["content"]
            await asyncio.sleep(0)

Complete Source: query_rewriter.py

import json
import re

from llm_client import generate
from config import settings

REWRITE_PROMPT = """You are a search query optimizer. Given a user question, generate {max_queries} specific search queries that will find the most relevant information.

Rules:
- Each query should be 4-8 words
- Include the current year (2026) when time-relevance matters
- Use specific technical terms over conversational language
- For comparisons, generate one query per option plus one comparison query
- Output ONLY a JSON array of strings, nothing else

User question: {question}

JSON array:"""

def rewrite_query(question: str) -> list[str]:
    prompt = REWRITE_PROMPT.format(
        question=question,
        max_queries=settings.max_queries,
    )
    raw = generate(prompt, max_tokens=200, temperature=0.1)
    return parse_queries(raw, question)

def parse_queries(raw: str, fallback: str) -> list[str]:
    try:
        cleaned = raw.strip()
        match = re.search(r'\[.*?\]', cleaned, re.DOTALL)
        if match:
            queries = json.loads(match.group())
            if isinstance(queries, list) and all(isinstance(q, str) for q in queries):
                return queries[:settings.max_queries]
    except (json.JSONDecodeError, ValueError):
        pass

    lines = [
        line.strip().strip('"-,').strip()
        for line in raw.strip().split('\n')
        if line.strip() and not line.strip().startswith('{')
    ]
    queries = [l for l in lines if len(l) > 5]

    if queries:
        return queries[:settings.max_queries]

    return [fallback]

Complete Source: searcher.py

import asyncio
from urllib.parse import urlparse

from duckduckgo_search import DDGS

from models import SearchResult
from config import settings

async def search_parallel(queries: list[str]) -> list[SearchResult]:
    loop = asyncio.get_event_loop()
    tasks = [
        loop.run_in_executor(None, _search_sync, q)
        for q in queries
    ]
    results_lists = await asyncio.gather(*tasks, return_exceptions=True)

    all_results = []
    for results in results_lists:
        if isinstance(results, Exception):
            continue
        all_results.extend(results)

    return deduplicate_smart(all_results)

def _search_sync(query: str) -> list[SearchResult]:
    try:
        ddgs = DDGS()
        results = ddgs.text(
            query,
            max_results=settings.max_search_results,
            region="wt-wt",
        )
        return [
            SearchResult(
                title=r.get("title", ""),
                url=r.get("href", ""),
                snippet=r.get("body", ""),
            )
            for r in results
            if r.get("href")
        ]
    except Exception as e:
        print(f"Search failed for '{query}': {e}")
        return []

def deduplicate_smart(
    results: list[SearchResult],
    max_per_domain: int = 3,
) -> list[SearchResult]:
    seen_urls: set[str] = set()
    domain_counts: dict[str, int] = {}
    unique = []

    for result in results:
        normalized = normalize_url(result.url)
        if normalized in seen_urls:
            continue

        domain = urlparse(result.url).netloc
        count = domain_counts.get(domain, 0)
        if count >= max_per_domain:
            continue

        seen_urls.add(normalized)
        domain_counts[domain] = count + 1
        unique.append(result)

    return unique

def normalize_url(url: str) -> str:
    url = url.split('#')[0].rstrip('/')
    if '?' in url:
        base, params = url.split('?', 1)
        clean_params = '&'.join(
            p for p in params.split('&')
            if not p.startswith(('utm_', 'ref=', 'source='))
        )
        url = f"{base}?{clean_params}" if clean_params else base
    return url

Complete Source: scraper.py

import asyncio

import httpx
from bs4 import BeautifulSoup
from readability import Document

from models import ScrapedPage
from config import settings

HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/125.0.0.0 Safari/537.36"
    ),
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    "Accept-Language": "en-US,en;q=0.9",
    "Accept-Encoding": "gzip, deflate, br",
}

async def scrape_urls(urls: list[str]) -> list[ScrapedPage]:
    semaphore = asyncio.Semaphore(settings.max_concurrent_scrapes)

    async with httpx.AsyncClient(http2=True, timeout=settings.scrape_timeout) as client:
        tasks = [_scrape_one(url, client, semaphore) for url in urls]
        pages = await asyncio.gather(*tasks, return_exceptions=True)
        return [
            p for p in pages
            if isinstance(p, ScrapedPage) and p.success
        ]

async def _scrape_one(
    url: str,
    client: httpx.AsyncClient,
    semaphore: asyncio.Semaphore,
) -> ScrapedPage | None:
    async with semaphore:
        # Tier 1: httpx
        page = await _scrape_httpx(url, client)
        if page is not None:
            return page

        # Tier 2: Playwright (if enabled)
        if settings.use_playwright:
            page = await _scrape_playwright(url)
            return page

        return None

async def _scrape_httpx(url: str, client: httpx.AsyncClient) -> ScrapedPage | None:
    try:
        response = await client.get(url, headers=HEADERS, follow_redirects=True)
        if response.status_code != 200:
            return None

        html = response.text
        if _needs_javascript(html):
            return None

        content = extract_content(html)
        title = extract_title(html)

        if len(content) < 100:
            return None

        return ScrapedPage(url=url, title=title, content=content, success=True)

    except (httpx.TimeoutException, httpx.ConnectError, httpx.HTTPStatusError):
        return None

async def _scrape_playwright(url: str) -> ScrapedPage | None:
    try:
        from playwright.async_api import async_playwright

        async with async_playwright() as p:
            browser = await p.chromium.launch(
                headless=True,
                args=[
                    "--disable-blink-features=AutomationControlled",
                    "--disable-dev-shm-usage",
                    "--no-sandbox",
                ],
            )
            context = await browser.new_context(
                user_agent=HEADERS["User-Agent"],
                viewport={"width": 1920, "height": 1080},
            )
            page = await context.new_page()
            await page.route(
                "**/*.{png,jpg,jpeg,gif,svg,woff,woff2,ttf,css}",
                lambda route: route.abort(),
            )

            await page.goto(url, wait_until="domcontentloaded", timeout=15000)
            await page.wait_for_timeout(2000)

            html = await page.content()
            await browser.close()

            content = extract_content(html)
            title = extract_title(html)

            if len(content) < 100:
                return None

            return ScrapedPage(url=url, title=title, content=content, success=True)

    except Exception:
        return None

def _needs_javascript(html: str) -> bool:
    if len(html) < 1000:
        return True

    soup = BeautifulSoup(html, "lxml")
    text = soup.get_text(strip=True)
    if len(text) < 200:
        return True

    body = soup.find("body")
    if body:
        children = [c for c in body.children if c.name and c.name != "script"]
        if len(children) <= 2 and body.find_all("script"):
            return True

    return False

def extract_content(html: str) -> str:
    doc = Document(html)
    article_html = doc.summary()

    soup = BeautifulSoup(article_html, "lxml")
    for tag in soup.find_all(["nav", "footer", "header", "aside", "form"]):
        tag.decompose()
    for tag in soup.find_all(class_=lambda c: c and any(
        x in c.lower() for x in ["sidebar", "cookie", "newsletter", "popup", "modal", "ad-"]
    )):
        tag.decompose()

    text = soup.get_text(separator="\n", strip=True)
    lines = [line for line in text.split("\n") if line.strip()]
    return "\n".join(lines)

def extract_title(html: str) -> str:
    soup = BeautifulSoup(html, "lxml")
    og = soup.find("meta", property="og:title")
    if og and og.get("content"):
        return og["content"]
    if soup.title and soup.title.string:
        return soup.title.string.strip()
    h1 = soup.find("h1")
    if h1:
        return h1.get_text(strip=True)
    return ""

Complete Source: content_processor.py

from rank_bm25 import BM25Okapi

from models import ScrapedPage, TextChunk, SourceReference
from config import settings

async def process_content(
    pages: list[ScrapedPage],
    query: str,
) -> tuple[str, list[SourceReference]]:
    chunks = _pages_to_chunks(pages)
    scored = _score_chunks(chunks, query)
    selected = _select_top_chunks(scored)
    context, sources = _build_context(selected)
    return context, sources

def _pages_to_chunks(pages: list[ScrapedPage]) -> list[TextChunk]:
    all_chunks = []
    for page in pages:
        for chunk_text in _chunk_text(page.content):
            all_chunks.append(TextChunk(
                text=chunk_text,
                source_url=page.url,
                source_title=page.title,
            ))
    return all_chunks

def _chunk_text(text: str) -> list[str]:
    words = text.split()
    size = settings.chunk_size
    overlap = settings.chunk_overlap

    if len(words) <= size:
        return [text]

    chunks = []
    start = 0
    while start < len(words):
        end = start + size
        chunks.append(" ".join(words[start:end]))
        start += size - overlap
    return chunks

def _score_chunks(chunks: list[TextChunk], query: str) -> list[TextChunk]:
    if not chunks:
        return []

    corpus = [c.text.lower().split() for c in chunks]
    bm25 = BM25Okapi(corpus)
    scores = bm25.get_scores(query.lower().split())

    for chunk, score in zip(chunks, scores):
        chunk.relevance_score = float(score)

    return sorted(chunks, key=lambda c: c.relevance_score, reverse=True)

def _select_top_chunks(chunks: list[TextChunk]) -> list[TextChunk]:
    selected = []
    source_counts: dict[str, int] = {}

    for chunk in chunks:
        url = chunk.source_url
        count = source_counts.get(url, 0)
        if count >= settings.max_chunks_per_source:
            continue

        selected.append(chunk)
        source_counts[url] = count + 1

        if len(selected) >= settings.top_k_chunks:
            break

    return selected

def _build_context(chunks: list[TextChunk]) -> tuple[str, list[SourceReference]]:
    source_map: dict[str, int] = {}
    sources: list[SourceReference] = []
    counter = 1

    for chunk in chunks:
        if chunk.source_url not in source_map:
            source_map[chunk.source_url] = counter
            sources.append(SourceReference(
                index=counter,
                title=chunk.source_title,
                url=chunk.source_url,
            ))
            counter += 1

    parts = []
    for chunk in chunks:
        num = source_map[chunk.source_url]
        parts.append(f"[Source {num}]\n{chunk.text}")

    return "\n\n".join(parts), sources

Complete Source: synthesizer.py

import re

from llm_client import generate_streaming
from models import SourceReference

SYSTEM_PROMPT = """You are a search assistant that answers questions using provided sources.

Rules:
- Base your answer ONLY on the provided sources
- Cite sources using [1], [2], etc. matching the source numbers
- Every factual claim must have a citation
- If sources disagree, mention both perspectives with their citations
- Write clear, direct paragraphs
- Do not make up information not in the sources
- If the sources do not contain enough information, say so"""

async def synthesize(
    query: str,
    context: str,
    sources: list[SourceReference],
):
    source_list = "\n".join(
        f"[{s.index}] {s.title} ({s.url})" for s in sources
    )

    prompt = f"""Sources:
{source_list}

Context:
{context}

Question: {query}

Answer:"""

    full_answer = ""
    async for token in generate_streaming(
        prompt=prompt,
        system=SYSTEM_PROMPT,
        max_tokens=2048,
    ):
        full_answer += token
        yield token

    # Post-process: log if invalid citations were generated
    max_source = max(s.index for s in sources) if sources else 0
    invalid = re.findall(r'\[(\d+)\]', full_answer)
    invalid_refs = [int(n) for n in invalid if int(n) > max_source]
    if invalid_refs:
        print(f"Warning: generated invalid citation refs: {invalid_refs}")

Complete Source: main.py

import asyncio
import json
import logging

from fastapi import FastAPI, Query, Request
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse

from config import settings
from query_rewriter import rewrite_query
from searcher import search_parallel
from scraper import scrape_urls
from content_processor import process_content
from synthesizer import synthesize

logging.basicConfig(level=settings.log_level.upper())
logger = logging.getLogger("codexity")

app = FastAPI(title="Codexity", version="1.0.0")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["GET"],
    allow_headers=["*"],
)

async def search_pipeline(query: str, request: Request):
    try:
        # Phase 1: Rewrite
        yield _event("status", {"step": "rewriting_query"})
        queries = rewrite_query(query)
        yield _event("status", {"step": "queries_ready", "queries": queries})

        # Phase 2: Search
        if await request.is_disconnected():
            return
        yield _event("status", {"step": "searching"})
        search_results = await search_parallel(queries)
        if not search_results:
            yield _event("error", {"message": "No search results found"})
            yield _event("done", {})
            return
        yield _event("sources_preview", {
            "count": len(search_results),
        })

        # Phase 3: Scrape
        if await request.is_disconnected():
            return
        yield _event("status", {"step": "scraping"})
        urls = [r.url for r in search_results]
        pages = await scrape_urls(urls)
        if not pages:
            yield _event("error", {"message": "Failed to scrape any pages"})
            yield _event("done", {})
            return
        yield _event("status", {
            "step": "scraping_done",
            "scraped": len(pages),
            "total": len(urls),
        })

        # Phase 4: Process
        if await request.is_disconnected():
            return
        yield _event("status", {"step": "processing"})
        context, sources = await process_content(pages, query)
        yield _event("sources", {
            "sources": [
                {"index": s.index, "title": s.title, "url": s.url}
                for s in sources
            ],
        })

        # Phase 5: Synthesize
        if await request.is_disconnected():
            return
        yield _event("status", {"step": "generating"})
        async for token in synthesize(query, context, sources):
            if await request.is_disconnected():
                return
            yield _event("token", {"text": token})

        yield _event("done", {})

    except Exception:
        logger.exception(f"Pipeline error for query: {query}")
        yield _event("error", {"message": "An internal error occurred"})
        yield _event("done", {})

def _event(event_type: str, data: dict) -> dict:
    return {"event": event_type, "data": json.dumps(data)}

@app.get("/search")
async def search(request: Request, q: str = Query(..., min_length=1, max_length=500)):
    async def generator():
        async for evt in search_pipeline(q, request):
            yield evt

    return EventSourceResponse(
        generator(),
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
        },
    )

@app.get("/health")
async def health():
    return {"status": "ok"}

Docker Deployment

# Dockerfile
FROM python:3.12-slim

RUN apt-get update && apt-get install -y \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /app

COPY pyproject.toml .
RUN pip install --no-cache-dir .

# Install Playwright browser
RUN pip install playwright && playwright install chromium --with-deps

COPY . .

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
services:
  codexity:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - ./models:/app/models
      - ./.env:/app/.env
    environment:
      - MODEL_PATH=/app/models/qwen2.5-7b-instruct-q4_k_m.gguf
    restart: unless-stopped
# .env.example
MODEL_PATH=./models/qwen2.5-7b-instruct-q4_k_m.gguf
CONTEXT_LENGTH=8192
MAX_TOKENS=2048
MAX_SEARCH_RESULTS=8
MAX_QUERIES=3
SCRAPE_TIMEOUT=15
MAX_CONCURRENT_SCRAPES=5
USE_PLAYWRIGHT=true
CHUNK_SIZE=512
CHUNK_OVERLAP=50
TOP_K_CHUNKS=10
LOG_LEVEL=info

Running It

Local Development

# Download a model
mkdir -p models
wget -O models/qwen2.5-7b-instruct-q4_k_m.gguf \
  "https://huggingface.co/Qwen/Qwen2.5-7B-Instruct-GGUF/resolve/main/qwen2.5-7b-instruct-q4_k_m.gguf"

# Install dependencies
pip install -e .
playwright install chromium

# Copy env
cp .env.example .env

# Run
uvicorn main:app --reload --host 0.0.0.0 --port 8000

Docker

docker compose up --build

Testing

# Simple query
curl -N "http://localhost:8000/search?q=what+is+fastapi"

# Complex comparison
curl -N "http://localhost:8000/search?q=postgresql+vs+mongodb+for+startups"

# Health check
curl http://localhost:8000/health

End-to-End Trace

Here is what happens when you run curl -N "http://localhost:8000/search?q=what+is+asyncio":

event: status
data: {"step": "rewriting_query"}

event: status
data: {"step": "queries_ready", "queries": ["Python asyncio tutorial 2026", "asyncio event loop explained", "Python async await concurrency"]}

event: status
data: {"step": "searching"}

event: sources_preview
data: {"count": 16}

event: status
data: {"step": "scraping"}

event: status
data: {"step": "scraping_done", "scraped": 11, "total": 16}

event: status
data: {"step": "processing"}

event: sources
data: {"sources": [{"index": 1, "title": "Python asyncio Documentation", "url": "https://docs.python.org/3/library/asyncio.html"}, ...]}

event: status
data: {"step": "generating"}

event: token
data: {"text": "Python"}

event: token
data: {"text": "'s"}

event: token
data: {"text": " asyncio"}

... (hundreds of token events)

event: token
data: {"text": " [1][3]."}

event: done
data: {}

Total time: 7-15 seconds on CPU, 3-6 seconds with GPU offload. The user sees the first token within 5-10 seconds.

Performance Tuning

Latency breakdown for a typical query:

PhaseTime
Query rewriting100-300ms
Web search (parallel)1-2s
Scraping (5 concurrent)3-8s
Content processing50-200ms
Answer generation (CPU)3-10s
Total7-20s

The two biggest levers:

GPU offload. Set n_gpu_layers=-1 in llm_client.py. Answer generation drops from 3-10s to 0.5-2s.

Skip Playwright. Set USE_PLAYWRIGHT=false. Most technical content is server-rendered. You lose access to SPA-only sites but cut scraping time significantly.

What You Built

A backend that takes a natural language question and returns a cited, streaming answer sourced from the live web. Nine Python files. No paid APIs. Runs on hardware you own.

The architecture is the same one Perplexity, You.com, and Phind use. The models are smaller. The infrastructure is simpler. But the pipeline, query rewriting, web search, scraping, content processing, LLM synthesis, SSE streaming, is identical.

Fork it. Extend it. Add a frontend. Swap in a bigger model. Point it at different search engines. The components are modular and the contracts between them are typed.

The source speaks for itself.

Related Content