Skip to main content

Unified Extraction Framework

Philosophy: Every data source deserves the same thorough treatment. Build once, plug in anything.

Overview

The Unified Extraction Framework is a source-agnostic pipeline that transforms raw content from ANY data source into rich, searchable, interconnected knowledge. Whether it's a Discord message, incident report, shift note, or email - every piece of content flows through the same comprehensive extraction process.

ANY DATA SOURCE

┌─────────────────────────────────────────────────────────────┐
│ RABS UNIFIED EXTRACTION FRAMEWORK │
│ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Discord │ │ Incidents │ │Shift Reports│ ... │
│ │ Adapter │ │ Adapter │ │ Adapter │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └────────────┬────┴────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────┐
│ │ UNIFIED EXTRACTION PIPELINE │
│ │ │
│ │ Pass 1: Semantic Units → vector search │
│ │ Pass 1: Source Metadata → tooltips, keywords │
│ │ Pass 2: Verification → quality control │
│ │ Pass 2.5: Enrichment → entities, sentiment │
│ │ Pass 2.6: Contact Intel → YP3000 evidence │
│ │ Pass 3: Embeddings → 3 vector strategies │
│ │ Pass 4: Translations → 8 languages │
│ └─────────────────────────────────────────────────────────┘
│ ↓ │
│ ┌─────────────────────────────────────────────────────────┐
│ │ UNIFIED OUTPUT │
│ │ │
│ │ • embeddings.sources (tooltip, keywords, importance)│
│ │ • embeddings.units (semantic chunks, vectors) │
│ │ • yp3000_pending_evidence (contact intelligence) │
│ │ • source_translations (multilingual search) │
│ └─────────────────────────────────────────────────────────┘
└─────────────────────────────────────────────────────────────┘

What Gets Extracted (One Pass)

Every piece of content automatically receives:

Source-Level Metadata

FieldPurposeExample
tooltip_summaryHuman-readable hover preview"Brett asking about roster swap for Tuesday"
keywordsText search (tsvector indexed)"brett roster swap tuesday sil"
importance_scorePriority/filtering (0.1-1.0)0.3 (routine) or 0.9 (urgent)
conversation_roleThread contextinitiates, responds, resolves, escalates
action_itemsTask extraction[{action, owner, due, confidence}]
pii_categoriesPrivacy compliance["phone", "medical", "address"]
tone_markersCommunication style{formality, urgency, emotion}

Unit-Level Extraction

FieldPurposeExample
text_for_embeddingOptimized for vector searchSelf-contained semantic chunk
standalone_summaryComplete context-resolved sentenceFull meaning without source
unit_keywordsPer-unit searchable termsSpecific to this chunk
unit_importanceRelative importanceWithin source context
domain_tagsCategory classificationroster, incidents, medical
entity_mentionsWho's involvedStaff: Jane, Participant: Sarah
sentiment/intentWhy was this said{tone, purpose, expected_action}

YP3000 Contact Intelligence

Automatically detected and fed to the evidence system:

  • Phone numbers with ownership signals ("my new number is...")
  • Email addresses
  • Nicknames ("call me Bob")
  • Relationships ("my wife Sarah")

With temporal awareness - old messages (2018) don't pollute current data.

Multilingual Translations

8 languages configured for cross-language search:

  • Spanish, Arabic, Vietnamese, Chinese (Simplified)
  • Tagalog, Hindi, Nepali, Korean

Staff can search in their native language, find English content.

Adapter Pattern

Adding a new data source requires only:

// Example: Shift Reports Adapter
const shiftReportAdapter = {
origin_system: 'shift_reports',
container_type: 'staff_report',

// Map source fields to framework schema
mapToSource: (shiftReport) => ({
external_id: shiftReport.id,
author_id: shiftReport.staff_id,
author_display: shiftReport.staff_name,
created_at: shiftReport.shift_date,
raw_text: shiftReport.notes,
channel_or_context: shiftReport.location,
// ... framework handles the rest
})
};

The framework then automatically:

  1. Extracts semantic units
  2. Generates tooltips and keywords
  3. Calculates importance
  4. Extracts action items
  5. Detects contact info → YP3000
  6. Generates embeddings (3 strategies)
  7. Translates to configured languages

Deputy Shift Reports (Confirmed Data Source)

Discovery Date: 2026-01-25

Deputy stores rich structured shift reports via CustomFieldData linked to each Timesheet. Each shift report contains 12-13 structured questions:

Fields Available

FieldTypeContent
Participant NotesTextPer-person updates - THE GOLD
[Location] AttendanceMulti-selectWhich participants were present
VehiclesDropdownWhich vehicle used
DSW Owned Vehicle KMsNumberOdometer start/stop
Private Car KMsNumber + PhotoPersonal vehicle tracking
Shift TitleDropdownGroup Home, Centre Based, 1:1, Saturday Adventure
Shift DescriptionTextLocation details, instructions
AttachmentsFilesPhotos from shift
Employee CommentTextBrief summary

Sample Participant Notes

"Karen was repeating herself more than usual. Karen was fixated on having a shower and getting into her pyjamas..."

"Ryan was prompted several times to have a shower and get ready for his dad to arrive. He did but needed lots of prompting to do so in a timely manner."

"Alex required constant prompting to partake in the activities. Alex was then transported to Newmarket Hotel - he ordered his own lunch without prompting from staff"

Volume Estimate

  • 50+ timesheets per week with substantial content
  • CustomFieldData table: 60,000+ records (estimated)
  • Date range: 2018 to present

API Access

// Get timesheets with structured data
const timesheets = await deputyApi.post('/resource/Timesheet/QUERY', {
search: { s1: { field: 'Date', type: 'ge', data: startDate } },
join: ['EmployeeObject', 'OperationalUnitObject'],
max: 200
});

// Get custom field data for each timesheet
const customData = await deputyApi.get(`/resource/CustomFieldData/${ts.CustomFieldData}`);

Extraction Value

  • Links incidents to context: "Karen had an episode" → shift report shows she was "repeating herself more"
  • Verifies attendance: Who was actually there that day
  • Cross-references Discord: Shift mentions "bus problems" → Discord shows scramble to find alternatives
  • YP3000 evidence: Staff-participant relationships confirmed by attendance records

Other Future Data Sources

Already designed to plug in:

  • Email → contact intel goldmine, thread context
  • SMS/TextMagic → phone verification, short message semantics
  • Call Transcripts → relationship mapping, contact updates
  • HR Documents → policy extraction, compliance tracking
  • Training Records → competency mapping
  • Vehicle Logs → incident correlation
  • CCTV Alerts → event context enrichment

Resilient Worker Architecture

Source-Agnostic Workers

The worker system is completely source-agnostic from day one. Workers don't know or care what they're processing - they just pull from the unified queue and run the extraction pipeline.

┌─────────────────────────────────────────────────────────────┐
│ WORKER POOL │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Worker 1 │ │ Worker 2 │ │ Worker 3 │ │ Worker N │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │
│ └─────────────┴──────┬──────┴─────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ UNIFIED EXTRACTION QUEUE │ │
│ │ │ │
│ │ source_id │ origin_system │ priority │ status │ │
│ │ ──────────┼───────────────┼──────────┼──────── │ │
│ │ abc-123 │ discord │ 1 │ pending │ │
│ │ def-456 │ incidents │ 1 │ pending │ │
│ │ ghi-789 │ shift_reports │ 2 │ backfill │ │
│ │ ... │ ... │ ... │ ... │ │
│ └─────────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ UNIFIED EXTRACTION PIPELINE │ │
│ │ (same for ALL source types) │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

Why Source-Agnostic?

  1. Simpler scaling - Add workers, not worker types
  2. Consistent quality - Every source gets identical treatment
  3. Future-proof - New sources just need an adapter, not new workers
  4. Load balancing - Workers grab whatever's next, no idle specialists
  5. Easier debugging - One pipeline to troubleshoot

Worker Behavior

Workers are designed to:

  • Survive restarts - cursor position saved in DB
  • Prioritize new content - process recent first (priority 1)
  • Backfill in gaps - work through history when idle (priority 2)
  • Graceful shutdown - save state before stopping
  • Resume seamlessly - pick up exactly where left off
Server Start

┌─────────────────────────────────────────────────────────────┐
│ WORKER LIFECYCLE │
│ │
│ 1. Load cursor positions from DB │
│ 2. Check for NEW items (any source) → process immediately │
│ 3. If no new items → grab BACKFILL item (oldest first) │
│ 4. Process through unified pipeline │
│ 5. Save cursor every N items (checkpoint) │
│ 6. On SIGTERM → finish current item → save cursor → exit │
│ 7. On restart → resume from saved cursor │
│ │
└─────────────────────────────────────────────────────────────┘

Processing Cursors Table

embeddings.processing_cursors
├── source_type -- 'discord', 'incidents', 'shift_reports', etc
├── forward_cursor -- timestamp of last NEW item processed
├── backfill_cursor -- timestamp of oldest BACKFILL item processed
├── status -- 'running', 'paused', 'completed'
├── items_processed -- total count
├── last_checkpoint -- when cursor was last saved
└── worker_id -- which worker is handling this (if locked)

Adding a New Source

When a new data source is added:

  1. Create adapter - maps source fields to embeddings.sources
  2. Insert to queue - adapter ingests items with origin_system set
  3. Workers handle it - automatically, no code changes needed
// That's it. Workers will process it like everything else.
await ingestSource({
origin_system: 'shift_reports', // New source type
container_type: 'staff_report',
external_id: report.id,
raw_text: report.notes,
// ... framework handles everything else
});

Quality Over Speed

The framework prioritizes:

  1. Thoroughness over processing speed
  2. Precision over cost savings
  3. Completeness over partial extraction

One comprehensive pass through 150k messages is better than multiple incomplete passes later.

Database Schema

Core Tables

  • embeddings.sources - Raw content + extracted metadata
  • embeddings.units - Semantic chunks with vectors
  • embeddings.source_translations - Multilingual versions

Supporting Tables

  • embeddings.extraction_queue - Processing queue with cursors
  • embeddings.processing_cursors - Resume positions per source type
  • core_source.yp3000_pending_evidence - Contact intel accumulation

Framework implemented: 2026-01-24 Location: backend/services/embedding-extraction.js