Architecture¶
System Overview¶
graph LR
subgraph Sources
SO[StackOverflow]
RD[Reddit]
MD[Medium]
DT[Dev.to]
HN[Hashnode]
YT[YouTube]
end
subgraph Pipeline Service
direction TB
SCAN[Scan Stage]
EXTRACT[Extract Stage]
ANALYZE[Analyze Stage]
GEN[Generate Stage]
API[REST API]
end
subgraph Storage
PG[(PostgreSQL + pgvector)]
end
subgraph UI
NEXT[Next.js WebUI]
end
SO & RD & MD & DT & HN & YT --> SCAN
SCAN --> PG
PG --> EXTRACT --> ANALYZE --> GEN --> PG
PG --> API --> NEXT
Pipeline Stages¶
Topics flow through four stages, each managed by the PostgreSQL job queue:
stateDiagram-v2
[*] --> scanned: Scan discovers topic
scanned --> extracted: Extract full content
extracted --> analyzed: Pass 7-stage filter
analyzed --> [*]: Available in UI
extracted --> rejected: Failed filter
rejected --> [*]
Stage 1: Scan¶
The ScanOrchestrator runs on a cron schedule (default: 6 AM daily, configurable via topicscanner.pipeline.scan-cron). For each enabled category:
- Get category keywords and configured source types
- For each source, get the
SourceScannerfromScannerRegistry - Call
scanner.scan(ScanRequest)— returns list ofScanResult - Persist new topics via
INSERT ... ON CONFLICT DO NOTHING(URL + source dedup) - Enqueue
EXTRACTjob for each new topic
Stage 2: Extract¶
The ExtractStageHandler fetches full content from each discovered URL:
- Try the scanner's custom
extractContent(url)method first - Fall back to source-specific extractors (YouTube transcripts, SO Q&A)
- Fall back to generic web extraction (Jsoup readability-style)
- Store extracted content, enqueue
ANALYZEjob
Stage 3: Analyze¶
The AnalyzeStageHandler runs the 7-stage filter pipeline:
- URL dedup (order 10) — reject if URL already analyzed
- Negative keywords (order 20) — reject if matches exclusion terms
- Language (order 30) — reject non-English content (ASCII ratio + stop words)
- Content length (order 40) — reject if < 200 chars
- Quality (order 45) — score based on structure (headings, code blocks, paragraphs)
- Relevance (order 50) — LLM scores relevance to category keywords (0.0–1.0)
- Embedding dedup (order 70) — pgvector cosine similarity rejects near-duplicates
Filters are ordered cheapest-first. The pipeline short-circuits on the first failure.
Stage 4: Generate¶
Content generation is triggered on-demand via the Content Studio UI or API. The ContentGenerationService:
- Loads the topic and up to 5 related topics from the same group
- Retrieves the user's composite writing style profile
- Finds similar user-uploaded content via pgvector (RAG context)
- Builds format-specific prompts with style + RAG context
- Calls LLM with
GENERATIONtask type - Stores result in
generated_contenttable
Scanner System¶
classDiagram
class SourceScanner {
<<interface>>
+getSourceType() String
+getDisplayName() String
+scan(ScanRequest) List~ScanResult~
+extractContent(url) Optional~String~
}
class ScannerRegistry {
-scanners Map
+register(SourceScanner)
+getScanner(type) Optional
+getAllScanners() Collection
}
SourceScanner <|.. RedditScanner
SourceScanner <|.. StackOverflowScanner
SourceScanner <|.. MediumScanner
SourceScanner <|.. DevToScanner
SourceScanner <|.. HashnodeScanner
SourceScanner <|.. YouTubeScanner
ScannerRegistry o-- SourceScanner
Built-in scanners are Spring @Component beans auto-discovered via classpath scanning. External plugins are JARs in the plugins/ directory loaded via ServiceLoader.
LLM Abstraction¶
classDiagram
class LLMService {
<<interface>>
+getProvider() String
+complete(taskType, system, user) LLMResponse
+embed(texts) List~float[]~
+isAvailable() boolean
}
class LLMTaskType {
<<enum>>
RELEVANCE
CLASSIFICATION
SUMMARIZATION
EMBEDDING
GENERATION
}
LLMService <|.. OllamaLLMService
LLMService <|.. OpenAILLMService
LLMService <|.. ClaudeLLMService
LLMService <|.. FallbackLLMService
FallbackLLMService o-- LLMService : primary
FallbackLLMService o-- LLMService : fallback
Each provider supports task-specific models — you can use a small model for relevance scoring and a large model for content generation. The FallbackLLMService wraps primary + fallback, trying primary first and falling back on failure.
Job Queue¶
The pipeline uses PostgreSQL as a job queue instead of Kafka:
-- Claim next job (atomic, skip locked)
UPDATE pipeline_jobs
SET status = 'PROCESSING', started_at = NOW(), updated_at = NOW()
WHERE id = (
SELECT id FROM pipeline_jobs
WHERE stage = ? AND status = 'PENDING'
ORDER BY created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING *;
The PipelineOrchestrator polls each stage on a configurable interval (default: 30 seconds). Stale jobs (stuck in PROCESSING for > 10 minutes) are automatically reset to PENDING.
Content Generation Flow¶
sequenceDiagram
participant User
participant API
participant Gen as ContentGenerationService
participant Style as StyleAnalysisService
participant RAG as UserContentService
participant LLM
User->>API: POST /api/generate {topicId, outputFormat}
API->>Gen: generate(topicId, format)
Gen->>Gen: Load topic + related topics
Gen->>Style: getCompositeStyleProfile()
Style-->>Gen: style JSON
Gen->>RAG: findSimilarContent(topic text, 3)
RAG-->>Gen: similar user content
Gen->>LLM: complete(GENERATION, systemPrompt, userPrompt)
LLM-->>Gen: generated text
Gen->>Gen: Store in generated_content
Gen-->>API: content ID
API-->>User: {id, generated_text, ...}