Unified Extraction Framework
Philosophy: Every data source deserves the same thorough treatment. Build once, plug in anything.
Overview
The Unified Extraction Framework is a source-agnostic pipeline that transforms raw content from ANY data source into rich, searchable, interconnected knowledge. Whether it's a Discord message, incident report, shift note, or email - every piece of content flows through the same comprehensive extraction process.
ANY DATA SOURCE
↓
┌─────────────────────────────────────────────────────────────┐
│ RABS UNIFIED EXTRACTION FRAMEWORK │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Discord │ │ Incidents │ │Shift Reports│ ... │
│ │ Adapter │ │ Adapter │ │ Adapter │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └────────────┬────┴────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────┐
│ │ UNIFIED EXTRACTION PIPELINE │
│ │ │
│ │ Pass 1: Semantic Units → vector search │
│ │ Pass 1: Source Metadata → tooltips, keywords │
│ │ Pass 2: Verification → quality control │
│ │ Pass 2.5: Enrichment → entities, sentiment │
│ │ Pass 2.6: Contact Intel → YP3000 evidence │
│ │ Pass 3: Embeddings → 3 vector strategies │
│ │ Pass 4: Translations → 8 languages │
│ └─────────────────────────────────────────────────────────┘
│ ↓ │
│ ┌─────────────────────────────────────────────────────────┐
│ │ UNIFIED OUTPUT │
│ │ │
│ │ • embeddings.sources (tooltip, keywords, importance)│
│ │ • embeddings.units (semantic chunks, vectors) │
│ │ • yp3000_pending_evidence (contact intelligence) │
│ │ • source_translations (multilingual search) │
│ └─────────────────────────────────────────────────────────┘
└─────────────────────────────────────────────────────────────┘
What Gets Extracted (One Pass)
Every piece of content automatically receives:
Source-Level Metadata
| Field | Purpose | Example |
|---|---|---|
tooltip_summary | Human-readable hover preview | "Brett asking about roster swap for Tuesday" |
keywords | Text search (tsvector indexed) | "brett roster swap tuesday sil" |
importance_score | Priority/filtering (0.1-1.0) | 0.3 (routine) or 0.9 (urgent) |
conversation_role | Thread context | initiates, responds, resolves, escalates |
action_items | Task extraction | [{action, owner, due, confidence}] |
pii_categories | Privacy compliance | ["phone", "medical", "address"] |
tone_markers | Communication style | {formality, urgency, emotion} |
Unit-Level Extraction
| Field | Purpose | Example |
|---|---|---|
text_for_embedding | Optimized for vector search | Self-contained semantic chunk |
standalone_summary | Complete context-resolved sentence | Full meaning without source |
unit_keywords | Per-unit searchable terms | Specific to this chunk |
unit_importance | Relative importance | Within source context |
domain_tags | Category classification | roster, incidents, medical |
entity_mentions | Who's involved | Staff: Jane, Participant: Sarah |
sentiment/intent | Why was this said | {tone, purpose, expected_action} |
YP3000 Contact Intelligence
Automatically detected and fed to the evidence system:
- Phone numbers with ownership signals ("my new number is...")
- Email addresses
- Nicknames ("call me Bob")
- Relationships ("my wife Sarah")
With temporal awareness - old messages (2018) don't pollute current data.
Multilingual Translations
8 languages configured for cross-language search:
- Spanish, Arabic, Vietnamese, Chinese (Simplified)
- Tagalog, Hindi, Nepali, Korean
Staff can search in their native language, find English content.
Adapter Pattern
Adding a new data source requires only:
// Example: Shift Reports Adapter
const shiftReportAdapter = {
origin_system: 'shift_reports',
container_type: 'staff_report',
// Map source fields to framework schema
mapToSource: (shiftReport) => ({
external_id: shiftReport.id,
author_id: shiftReport.staff_id,
author_display: shiftReport.staff_name,
created_at: shiftReport.shift_date,
raw_text: shiftReport.notes,
channel_or_context: shiftReport.location,
// ... framework handles the rest
})
};
The framework then automatically:
- Extracts semantic units
- Generates tooltips and keywords
- Calculates importance
- Extracts action items
- Detects contact info → YP3000
- Generates embeddings (3 strategies)
- Translates to configured languages
Deputy Shift Reports (Confirmed Data Source)
Discovery Date: 2026-01-25
Deputy stores rich structured shift reports via CustomFieldData linked to each Timesheet. Each shift report contains 12-13 structured questions:
Fields Available
| Field | Type | Content |
|---|---|---|
Participant Notes | Text | Per-person updates - THE GOLD |
[Location] Attendance | Multi-select | Which participants were present |
Vehicles | Dropdown | Which vehicle used |
DSW Owned Vehicle KMs | Number | Odometer start/stop |
Private Car KMs | Number + Photo | Personal vehicle tracking |
Shift Title | Dropdown | Group Home, Centre Based, 1:1, Saturday Adventure |
Shift Description | Text | Location details, instructions |
Attachments | Files | Photos from shift |
Employee Comment | Text | Brief summary |
Sample Participant Notes
"Karen was repeating herself more than usual. Karen was fixated on having a shower and getting into her pyjamas..."
"Ryan was prompted several times to have a shower and get ready for his dad to arrive. He did but needed lots of prompting to do so in a timely manner."
"Alex required constant prompting to partake in the activities. Alex was then transported to Newmarket Hotel - he ordered his own lunch without prompting from staff"
Volume Estimate
- 50+ timesheets per week with substantial content
- CustomFieldData table: 60,000+ records (estimated)
- Date range: 2018 to present
API Access
// Get timesheets with structured data
const timesheets = await deputyApi.post('/resource/Timesheet/QUERY', {
search: { s1: { field: 'Date', type: 'ge', data: startDate } },
join: ['EmployeeObject', 'OperationalUnitObject'],
max: 200
});
// Get custom field data for each timesheet
const customData = await deputyApi.get(`/resource/CustomFieldData/${ts.CustomFieldData}`);
Extraction Value
- Links incidents to context: "Karen had an episode" → shift report shows she was "repeating herself more"
- Verifies attendance: Who was actually there that day
- Cross-references Discord: Shift mentions "bus problems" → Discord shows scramble to find alternatives
- YP3000 evidence: Staff-participant relationships confirmed by attendance records
Other Future Data Sources
Already designed to plug in:
- Email → contact intel goldmine, thread context
- SMS/TextMagic → phone verification, short message semantics
- Call Transcripts → relationship mapping, contact updates
- HR Documents → policy extraction, compliance tracking
- Training Records → competency mapping
- Vehicle Logs → incident correlation
- CCTV Alerts → event context enrichment
Resilient Worker Architecture
Source-Agnostic Workers
The worker system is completely source-agnostic from day one. Workers don't know or care what they're processing - they just pull from the unified queue and run the extraction pipeline.
┌─────────────────────────────────────────────────────────────┐
│ WORKER POOL │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Worker 1 │ │ Worker 2 │ │ Worker 3 │ │ Worker N │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │
│ └─────────────┴──────┬──────┴─────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ UNIFIED EXTRACTION QUEUE │ │
│ │ │ │
│ │ source_id │ origin_system │ priority │ status │ │
│ │ ──────────┼───────────────┼──────────┼──────── │ │
│ │ abc-123 │ discord │ 1 │ pending │ │
│ │ def-456 │ incidents │ 1 │ pending │ │
│ │ ghi-789 │ shift_reports │ 2 │ backfill │ │
│ │ ... │ ... │ ... │ ... │ │
│ └─────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ UNIFIED EXTRACTION PIPELINE │ │
│ │ (same for ALL source types) │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Why Source-Agnostic?
- Simpler scaling - Add workers, not worker types
- Consistent quality - Every source gets identical treatment
- Future-proof - New sources just need an adapter, not new workers
- Load balancing - Workers grab whatever's next, no idle specialists
- Easier debugging - One pipeline to troubleshoot
Worker Behavior
Workers are designed to:
- Survive restarts - cursor position saved in DB
- Prioritize new content - process recent first (priority 1)
- Backfill in gaps - work through history when idle (priority 2)
- Graceful shutdown - save state before stopping
- Resume seamlessly - pick up exactly where left off
Server Start
↓
┌─────────────────────────────────────────────────────────────┐
│ WORKER LIFECYCLE │
│ │
│ 1. Load cursor positions from DB │
│ 2. Check for NEW items (any source) → process immediately │
│ 3. If no new items → grab BACKFILL item (oldest first) │
│ 4. Process through unified pipeline │
│ 5. Save cursor every N items (checkpoint) │
│ 6. On SIGTERM → finish current item → save cursor → exit │
│ 7. On restart → resume from saved cursor │
│ │
└─────────────────────────────────────────────────────────────┘
Processing Cursors Table
embeddings.processing_cursors
├── source_type -- 'discord', 'incidents', 'shift_reports', etc
├── forward_cursor -- timestamp of last NEW item processed
├── backfill_cursor -- timestamp of oldest BACKFILL item processed
├── status -- 'running', 'paused', 'completed'
├── items_processed -- total count
├── last_checkpoint -- when cursor was last saved
└── worker_id -- which worker is handling this (if locked)
Adding a New Source
When a new data source is added:
- Create adapter - maps source fields to
embeddings.sources - Insert to queue - adapter ingests items with
origin_systemset - Workers handle it - automatically, no code changes needed
// That's it. Workers will process it like everything else.
await ingestSource({
origin_system: 'shift_reports', // New source type
container_type: 'staff_report',
external_id: report.id,
raw_text: report.notes,
// ... framework handles everything else
});
Quality Over Speed
The framework prioritizes:
- Thoroughness over processing speed
- Precision over cost savings
- Completeness over partial extraction
One comprehensive pass through 150k messages is better than multiple incomplete passes later.
Database Schema
Core Tables
embeddings.sources- Raw content + extracted metadataembeddings.units- Semantic chunks with vectorsembeddings.source_translations- Multilingual versions
Supporting Tables
embeddings.extraction_queue- Processing queue with cursorsembeddings.processing_cursors- Resume positions per source typecore_source.yp3000_pending_evidence- Contact intel accumulation
Related Documentation
- YP3000 Identity System - Identity resolution
- Embedding & Semantic Search - Vector search
- Embedding V2 Linked Context - Linked groups
Framework implemented: 2026-01-24
Location: backend/services/embedding-extraction.js