Cross-database reference integrity in a multi-store architecture.
The Problem
Polyglot persistence means no foreign keys across stores. A user ID in Neo4j doesn't automatically cascade when deleted from Postgres. A file URL in Postgres doesn't validate against R2. This is the hidden cost of using the right database for each job.
Your responsibility: Ensure references between stores remain valid and current.
Reference Map
| From | To | Reference | Freshness Concern |
|---|---|---|---|
| Neo4j node | Postgres user | userId property |
User deletion must invalidate/remove nodes |
Postgres files |
R2 object | storageKey |
File deletion must remove object |
Postgres items |
Neo4j node | Entity ID | Item deletion must clean graph refs |
Neo4j :RELATES_TO |
Neo4j nodes | Both endpoints | Node deletion cascades automatically |
Strategy Overview
Choose based on consistency requirements and complexity tolerance:
| Strategy | Consistency | Complexity | When to Use |
|---|---|---|---|
| Read-time validation | Eventual | Low | Non-critical refs, can tolerate stale |
| Soft delete + propagation | Eventual | Low-Medium | Need audit trail, async cleanup OK |
| Transactional outbox | Strong (local) | Medium | Critical events, need reliability |
| Periodic reconciliation | Eventual | Low | Orphan cleanup, batch operations |
| CDC (Debezium) | Near-real-time | High | Enterprise scale, many consumers |
Recommendation: Start with soft deletes + periodic reconciliation. Add outbox for critical paths. CDC only at scale.
Pattern 1: Read-Time Validation
Check reference validity when reading. Simplest approach, handles stale refs gracefully.
// src/lib/server/graph/queries.ts
import { db } from '$lib/server/db';
import { user } from '$lib/server/db/schema';
import { eq } from 'drizzle-orm';
export async function getPageWithFeatures(pagePath: string) {
const session = await getSession();
try {
const result = await session.run(
`MATCH (p:Page {path: $path})-[:CREATED_BY]->(u:User)
RETURN p, u.userId as userId`,
{ path: pagePath }
);
if (result.records.length === 0) return null;
const page = result.records[0].get('p').properties;
const userId = result.records[0].get('userId');
// Validate user still exists in Postgres
const [dbUser] = await db
.select()
.from(user)
.where(eq(user.id, userId))
.limit(1);
if (!dbUser) {
// Reference is stale - handle gracefully
return { ...page, user: null, userDeleted: true };
}
return { ...page, user: dbUser };
} finally {
await session.close();
}
}
Pros: Zero infrastructure, immediate implementation. Cons: Extra queries, doesn't clean up stale data.
Pattern 2: Soft Delete with Propagation
Never hard-delete. Mark as deleted, then propagate asynchronously.
Schema Addition
// src/lib/server/db/schema/common.ts
import { timestamp, boolean } from 'drizzle-orm/pg-core';
// Add to tables that need soft delete
export const softDeleteColumns = {
deletedAt: timestamp('deleted_at', { withTimezone: true }),
isDeleted: boolean('is_deleted').notNull().default(false),
};
Soft Delete Helper
// src/lib/server/db/soft-delete.ts
import { db } from './index';
import { eq, and, isNull } from 'drizzle-orm';
import type { PgTable } from 'drizzle-orm/pg-core';
export async function softDelete<T extends PgTable>(
table: T,
id: string,
idColumn: keyof T
) {
return db
.update(table)
.set({
deletedAt: new Date(),
isDeleted: true
})
.where(eq(table[idColumn], id));
}
// Query helper - exclude deleted by default
export function notDeleted<T extends PgTable>(table: T) {
return isNull(table.deletedAt);
}
Propagation Service
// src/lib/server/services/deletion-propagation.ts
import { db } from '$lib/server/db';
import { user } from '$lib/server/db/schema';
import { getSession } from '$lib/server/graph';
import { deleteR2Object } from '$lib/server/storage';
import { eq, and, isNotNull } from 'drizzle-orm';
export async function propagateUserDeletion(userId: string) {
// 1. Clean up Neo4j nodes owned by this user
const graphSession = await getSession();
try {
await graphSession.run(
`MATCH (n {userId: $userId}) DETACH DELETE n`,
{ userId }
);
} finally {
await graphSession.close();
}
// 2. Clean up R2 files owned by this user
const userFiles = await db
.select({ storageKey: files.storageKey })
.from(files)
.where(eq(files.userId, userId));
for (const file of userFiles) {
await deleteR2Object(file.storageKey);
}
// 3. Hard delete now that refs are cleaned
await db.delete(files).where(eq(files.userId, userId));
await db.delete(user).where(eq(user.id, userId));
}
Pros: Audit trail, controlled cleanup, recoverable. Cons: Queries must filter deleted records, storage overhead.
Pattern 3: Transactional Outbox
For critical operations where you need guaranteed event delivery.
Outbox Table
// src/lib/server/db/schema/outbox.ts
import { pgTable, text, timestamp, jsonb, pgEnum } from 'drizzle-orm/pg-core';
export const outboxStatusEnum = pgEnum('outbox_status', [
'pending',
'processing',
'completed',
'failed'
]);
export const outbox = pgTable('outbox', {
id: text('id').primaryKey(),
aggregateType: text('aggregate_type').notNull(), // 'user', 'item', 'file'
aggregateId: text('aggregate_id').notNull(),
eventType: text('event_type').notNull(), // 'deleted', 'updated'
payload: jsonb('payload'),
status: outboxStatusEnum('status').notNull().default('pending'),
createdAt: timestamp('created_at', { withTimezone: true }).notNull().defaultNow(),
processedAt: timestamp('processed_at', { withTimezone: true }),
retryCount: integer('retry_count').notNull().default(0),
error: text('error'),
});
Write with Outbox (Same Transaction)
// src/lib/server/services/user.ts
import { db } from '$lib/server/db';
import { user, outbox } from '$lib/server/db/schema';
import { eq } from 'drizzle-orm';
import { nanoid } from 'nanoid';
export async function deleteUser(userId: string) {
// Single transaction: soft delete + outbox event
await db.transaction(async (tx) => {
// Soft delete the user
await tx
.update(user)
.set({ deletedAt: new Date(), isDeleted: true })
.where(eq(user.id, userId));
// Write event to outbox (same transaction = atomic)
await tx.insert(outbox).values({
id: `obx_${nanoid(12)}`,
aggregateType: 'user',
aggregateId: userId,
eventType: 'deleted',
payload: { userId },
});
});
}
Outbox Processor (Background Job)
// src/lib/server/jobs/outbox-processor.ts
import { db } from '$lib/server/db';
import { outbox } from '$lib/server/db/schema';
import { eq, and, lt } from 'drizzle-orm';
import { propagateUserDeletion } from '../services/deletion-propagation';
const MAX_RETRIES = 3;
export async function processOutbox() {
// Get pending events (oldest first)
const events = await db
.select()
.from(outbox)
.where(
and(
eq(outbox.status, 'pending'),
lt(outbox.retryCount, MAX_RETRIES)
)
)
.orderBy(outbox.createdAt)
.limit(10);
for (const event of events) {
try {
// Mark as processing
await db
.update(outbox)
.set({ status: 'processing' })
.where(eq(outbox.id, event.id));
// Process based on event type
if (event.aggregateType === 'user' && event.eventType === 'deleted') {
await propagateUserDeletion(event.aggregateId);
}
// Add more handlers...
// Mark completed
await db
.update(outbox)
.set({
status: 'completed',
processedAt: new Date()
})
.where(eq(outbox.id, event.id));
} catch (error) {
// Increment retry, mark failed if max reached
await db
.update(outbox)
.set({
status: event.retryCount + 1 >= MAX_RETRIES ? 'failed' : 'pending',
retryCount: event.retryCount + 1,
error: error instanceof Error ? error.message : 'Unknown error',
})
.where(eq(outbox.id, event.id));
}
}
}
Cron Trigger
// src/routes/api/cron/outbox/+server.ts
import { json } from '@sveltejs/kit';
import { processOutbox } from '$lib/server/jobs/outbox-processor';
// Secure with API key or Vercel Cron
export async function GET({ request }) {
const authHeader = request.headers.get('authorization');
if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
return json({ error: 'Unauthorized' }, { status: 401 });
}
await processOutbox();
return json({ success: true });
}
// vercel.json
{
"crons": [{
"path": "/api/cron/outbox",
"schedule": "* * * * *"
}]
}
Pros: Guaranteed delivery, retry logic, audit trail. Cons: Additional table, background job infrastructure.
Pattern 4: Periodic Reconciliation
Batch detection and cleanup of orphaned records. Run hourly or daily.
Orphan Detection Queries
// src/lib/server/jobs/reconciliation.ts
import { db } from '$lib/server/db';
import { files, user } from '$lib/server/db/schema';
import { getSession } from '$lib/server/graph';
import { listR2Objects, deleteR2Object } from '$lib/server/storage';
import { sql, notInArray } from 'drizzle-orm';
export async function reconcileOrphans() {
const results = {
graphNodesWithoutUsers: 0,
r2ObjectsWithoutRecords: 0,
fileRecordsWithoutObjects: 0,
};
// 1. Find Neo4j nodes referencing deleted users
const graphSession = await getSession();
try {
// Get all userIds from graph
const graphResult = await graphSession.run(
`MATCH (n) WHERE n.userId IS NOT NULL
RETURN DISTINCT n.userId as userId`
);
const graphUserIds = graphResult.records.map(r => r.get('userId'));
// Check which exist in Postgres
const existingUsers = await db
.select({ id: user.id })
.from(user)
.where(sql`${user.id} = ANY(${graphUserIds})`);
const existingIds = new Set(existingUsers.map(u => u.id));
const orphanedIds = graphUserIds.filter(id => !existingIds.has(id));
// Clean up orphaned nodes
if (orphanedIds.length > 0) {
await graphSession.run(
`MATCH (n) WHERE n.userId IN $ids DETACH DELETE n`,
{ ids: orphanedIds }
);
results.graphNodesWithoutUsers = orphanedIds.length;
}
} finally {
await graphSession.close();
}
// 2. Find R2 objects not referenced in database
const r2Objects = await listR2Objects();
const dbFiles = await db.select({ storageKey: files.storageKey }).from(files);
const dbKeys = new Set(dbFiles.map(f => f.storageKey));
for (const obj of r2Objects) {
if (!dbKeys.has(obj.key)) {
await deleteR2Object(obj.key);
results.r2ObjectsWithoutRecords++;
}
}
// 3. Find file records pointing to missing R2 objects
const r2Keys = new Set(r2Objects.map(o => o.key));
const orphanedRecords = dbFiles.filter(f => !r2Keys.has(f.storageKey));
if (orphanedRecords.length > 0) {
await db
.delete(files)
.where(sql`${files.storageKey} = ANY(${orphanedRecords.map(r => r.storageKey)})`);
results.fileRecordsWithoutObjects = orphanedRecords.length;
}
return results;
}
Scheduled Execution
// src/routes/api/cron/reconcile/+server.ts
import { json } from '@sveltejs/kit';
import { reconcileOrphans } from '$lib/server/jobs/reconciliation';
export async function GET({ request }) {
const authHeader = request.headers.get('authorization');
if (authHeader !== `Bearer ${process.env.CRON_SECRET}`) {
return json({ error: 'Unauthorized' }, { status: 401 });
}
const results = await reconcileOrphans();
// Log for monitoring
console.log('[Reconciliation]', results);
return json({ success: true, ...results });
}
// vercel.json
{
"crons": [{
"path": "/api/cron/reconcile",
"schedule": "0 * * * *"
}]
}
Pros: Simple, catches everything eventually, low overhead. Cons: Delayed cleanup, stale data visible until next run.
Pattern 5: Change Data Capture (CDC)
For enterprise scale with real-time requirements. Uses Debezium + Kafka.
When to Consider CDC
- Multiple consumers need database change events
- Sub-second latency required
- 10K+ writes/second
- Need to replicate to data warehouse
Architecture
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Postgres │───▶│ Debezium │───▶│ Kafka │───▶│ Consumer │
│ WAL │ │Connector │ │ Topic │ │ Services │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
│
┌──────────────────────┐
│ Neo4j / R2 / etc. │
└──────────────────────┘
Not covered in detail — this requires Kafka infrastructure (Confluent Cloud, Upstash Kafka, or self-hosted). Only implement when simpler patterns hit limits.
Failure Scenarios
Understanding what happens when cross-database operations fail:
Scenario 1: Postgres Commits, Outbox Processor Fails
| Aspect | Detail |
|---|---|
| Result | User deleted in Postgres, Neo4j nodes remain temporarily |
| Detection | Outbox event stays in pending status |
| Resolution | Outbox retry mechanism (up to 3 attempts) |
| Fallback | Periodic reconciliation job detects orphans |
| User Impact | None — stale graph data is filtered at read time |
Scenario 2: Outbox Processor Succeeds, Neo4j Write Fails
| Aspect | Detail |
|---|---|
| Result | Outbox event marked as failed with error message |
| Detection | Monitoring alert on failed events |
| Resolution | Manual review, fix underlying issue, reprocess |
| Prevention | Use MERGE instead of CREATE for idempotent operations |
| User Impact | Stale graph data until resolved |
Scenario 3: Neo4j Down During Outbox Processing
| Aspect | Detail |
|---|---|
| Result | Event retried, eventually marked failed after max retries |
| Detection | verifyConnection() health check fails |
| Resolution | Events automatically retry when Neo4j recovers |
| Prevention | Circuit breaker pattern (future enhancement) |
| User Impact | Graph features degraded, core CRUD unaffected |
Scenario 4: R2 Storage Unavailable
| Aspect | Detail |
|---|---|
| Result | File upload fails, database record not created |
| Detection | S3 client throws error |
| Resolution | User retries upload |
| Prevention | Upload to R2 first, then create DB record |
| User Impact | Upload fails with error message |
Mitigation Checklist
- Outbox processor runs on cron (every minute)
- Reconciliation runs on cron (hourly or daily)
- Monitoring alerts on
failedoutbox events - Use MERGE in Neo4j for idempotent writes
- Read-time validation filters stale graph references
- Health endpoint checks both Postgres and Neo4j
Decision Tree
Need cross-store consistency?
│
├── Can tolerate stale reads?
│ └── Yes → Read-time validation (Pattern 1)
│
├── Need audit trail?
│ └── Yes → Soft delete + propagation (Pattern 2)
│
├── Critical operation, must not lose events?
│ └── Yes → Transactional outbox (Pattern 3)
│
├── Batch cleanup is acceptable?
│ └── Yes → Periodic reconciliation (Pattern 4)
│
└── Need real-time at scale?
└── Yes → CDC with Debezium (Pattern 5)
Implementation Priority
Phase 1: Foundation (Start Here)
- Add soft delete columns to relevant tables
- Implement
notDeleted()query helper - Create basic reconciliation job
- Run reconciliation on cron (daily)
Phase 2: Reliability
- Add outbox table
- Wrap critical operations with outbox writes
- Create outbox processor job
- Run outbox processor on cron (every minute)
Phase 3: Monitoring
- Add metrics for orphan counts
- Alert on failed outbox events
- Dashboard for reconciliation results
Phase 4: Scale (If Needed)
- Evaluate CDC requirements
- Set up Kafka infrastructure
- Deploy Debezium connectors
Testing Freshness
Integration Test: User Deletion Cascade
// tests/integration/user-deletion.test.ts
import { describe, it, expect } from 'vitest';
import { db, schema } from '$lib/server/db';
import { getSession } from '$lib/server/graph';
import { deleteUser } from '$lib/server/services/user';
import { processOutbox } from '$lib/server/jobs/outbox-processor';
describe('User deletion cascade', () => {
it('should clean up Neo4j nodes when user is deleted', async () => {
// Setup: Create user and graph node
const userId = 'test_user_123';
await db.insert(schema.user).values({
id: userId,
email: 'test@example.com'
});
const graphSession = await getSession();
await graphSession.run(
`CREATE (n:Page {id: 'page_1', userId: $userId})`,
{ userId }
);
await graphSession.close();
// Act: Delete user
await deleteUser(userId);
await processOutbox(); // Process async events
// Assert: Graph node should be gone
const verifySession = await getSession();
const result = await verifySession.run(
`MATCH (n {userId: $userId}) RETURN count(n) as count`,
{ userId }
);
await verifySession.close();
expect(result.records[0].get('count').toNumber()).toBe(0);
});
});
File Structure
src/lib/server/
├── db/
│ └── schema/
│ ├── outbox.ts # Outbox table
│ └── common.ts # Soft delete columns
├── jobs/
│ ├── outbox-processor.ts # Process outbox events
│ └── reconciliation.ts # Orphan detection
└── services/
└── deletion-propagation.ts # Cross-store cleanup
src/routes/api/cron/
├── outbox/+server.ts # Outbox cron endpoint
└── reconcile/+server.ts # Reconciliation cron endpoint
Related
- README.md - Polyglot persistence overview
- relational.md - Drizzle schema
- graph.md - Neo4j queries
- ../api.md - API endpoints