Topic Overview
Event Sourcing
Learn event sourcing: storing state changes as events, rebuilding state, event replay, and when event sourcing is the right choice.
Event Sourcing is an architectural pattern where state changes are stored as a sequence of events. Instead of storing current state, you store events and reconstruct state by replaying them.
What is Event Sourcing?
Event Sourcing stores:
- Events: All state changes as events
- Event Store: Immutable log of events
- State Reconstruction: Rebuild state by replaying events
Traditional (State-based):
Current State: { balance: 100 }
Update: balance = 150
New State: { balance: 150 }
(Old state lost)
Event Sourcing:
Events:
- AccountCreated (balance: 0)
- Deposit (amount: 100)
- Deposit (amount: 50)
Current State: Reconstruct from events
balance = 0 + 100 + 50 = 150
Benefits:
- Complete history: All changes preserved
- Audit trail: Know what happened and when
- Time travel: Reconstruct state at any point
- Debugging: See exact sequence of events
How Event Sourcing Works
1. Store Events
Command: Deposit $100
Event: DepositEvent { accountId, amount: 100, timestamp }
Store: Append to event store
2. Reconstruct State
Load all events for aggregate
Replay events in order
Build current state
3. Query State
Option 1: Replay events (slow)
Option 2: Use projections (read model)
Event Store
Event Store is an append-only log of events.
Characteristics:
- Immutable: Events never deleted (only appended)
- Ordered: Events in sequence
- Versioned: Each event has version number
- Queryable: Can query by aggregate ID, time range
Examples
Basic Event Sourcing
// Event Store
class EventStore {
async append(aggregateId: string, event: Event): Promise<void> {
await this.database.insert('events', {
id: this.generateId(),
aggregateId,
type: event.type,
data: event.data,
version: await this.getNextVersion(aggregateId),
timestamp: Date.now()
});
}
async getEvents(aggregateId: string): Promise<Event[]> {
return await this.database.query(
'SELECT * FROM events WHERE aggregate_id = ? ORDER BY version',
[aggregateId]
);
}
}
// Aggregate
class BankAccount {
private balance: number = 0;
private events: Event[] = [];
constructor(private accountId: string, private eventStore: EventStore) {}
async deposit(amount: number): Promise<void> {
// Create event
const event = new DepositEvent({
accountId: this.accountId,
amount,
timestamp: Date.now()
});
// Apply event
this.apply(event);
// Store event
await this.eventStore.append(this.accountId, event);
}
apply(event: Event): void {
this.events.push(event);
switch (event.type) {
case 'AccountCreated':
this.balance = 0;
break;
case 'Deposit':
this.balance += event.amount;
break;
case 'Withdrawal':
this.balance -= event.amount;
break;
}
}
async load(): Promise<void> {
// Load all events
const events = await this.eventStore.getEvents(this.accountId);
// Replay events
this.balance = 0;
for (const event of events) {
this.apply(event);
}
}
getBalance(): number {
return this.balance;
}
}
Event Sourcing with Snapshots
class SnapshotManager {
async createSnapshot(aggregateId: string, state: State): Promise<void> {
// Save snapshot
await this.database.insert('snapshots', {
aggregateId,
state,
version: await this.getCurrentVersion(aggregateId),
timestamp: Date.now()
});
}
async loadWithSnapshot(aggregateId: string): Promise<State> {
// Load latest snapshot
const snapshot = await this.getLatestSnapshot(aggregateId);
if (snapshot) {
// Load events after snapshot
const events = await this.eventStore.getEventsAfter(
aggregateId,
snapshot.version
);
// Replay events from snapshot
let state = snapshot.state;
for (const event of events) {
state = this.apply(state, event);
}
return state;
} else {
// No snapshot: Load all events
return await this.loadFromEvents(aggregateId);
}
}
}
Common Pitfalls
- Event versioning: Events change, break replay. Fix: Version events, handle migrations
- Performance: Replaying all events is slow. Fix: Use snapshots, projections
- Event schema evolution: Events change over time. Fix: Version events, backward compatibility
- Not using projections: Querying by replaying events. Fix: Build read models (projections)
- Event store as database: Using event store for queries. Fix: Use projections for queries
Interview Questions
Beginner
Q: What is event sourcing and how does it differ from traditional state storage?
A:
Event Sourcing stores all state changes as events instead of storing current state.
Traditional (State-based):
Current State: { balance: 100 }
Update: balance = 150
New State: { balance: 150 }
(Old state lost)
Event Sourcing:
Events:
- AccountCreated (balance: 0)
- Deposit (amount: 100)
- Deposit (amount: 50)
Current State: Reconstruct from events
balance = 0 + 100 + 50 = 150
Benefits:
- Complete history: All changes preserved
- Audit trail: Know what happened and when
- Time travel: Reconstruct state at any point
- Debugging: See exact sequence of events
How it works:
- Store events: All state changes as events
- Reconstruct state: Replay events to build current state
- Query: Use projections (read models) for queries
Intermediate
Q: Explain how event sourcing works. How do you reconstruct state from events?
A:
Event Sourcing Process:
-
Store Events
Command: Deposit $100 Event: DepositEvent { accountId, amount: 100, timestamp } Store: Append to event store (immutable log) -
Reconstruct State
async load(aggregateId: string): Promise<State> { // Load all events const events = await this.eventStore.getEvents(aggregateId); // Replay events let state = this.initialState(); for (const event of events) { state = this.apply(state, event); } return state; } -
Apply Events
apply(event: Event, state: State): State { switch (event.type) { case 'AccountCreated': return { balance: 0 }; case 'Deposit': return { balance: state.balance + event.amount }; case 'Withdrawal': return { balance: state.balance - event.amount }; } }
Event Store:
- Immutable: Events never deleted (only appended)
- Ordered: Events in sequence
- Versioned: Each event has version number
Performance:
- Snapshots: Save state periodically, replay from snapshot
- Projections: Build read models for queries (don't replay for every query)
Senior
Q: Design an event sourcing system for a financial application handling millions of transactions. How do you handle performance, snapshots, projections, and event versioning?
A:
class FinancialEventSourcingSystem {
private eventStore: EventStore;
private snapshotManager: SnapshotManager;
private projectionManager: ProjectionManager;
private eventVersioner: EventVersioner;
constructor() {
this.eventStore = new EventStore();
this.snapshotManager = new SnapshotManager();
this.projectionManager = new ProjectionManager();
this.eventVersioner = new EventVersioner();
}
// 1. Event Store (Optimized)
class EventStore {
async append(aggregateId: string, event: Event): Promise<void> {
// Version event
const versionedEvent = await this.eventVersioner.version(event);
// Append to event store (partitioned by aggregate)
await this.database.insert('events', {
id: this.generateId(),
aggregateId,
type: event.type,
data: versionedEvent.data,
version: await this.getNextVersion(aggregateId),
timestamp: Date.now(),
eventVersion: versionedEvent.version
});
// Publish to projections
await this.publishToProjections(versionedEvent);
}
async getEvents(aggregateId: string, fromVersion?: number): Promise<Event[]> {
if (fromVersion) {
return await this.database.query(
'SELECT * FROM events WHERE aggregate_id = ? AND version > ? ORDER BY version',
[aggregateId, fromVersion]
);
}
return await this.database.query(
'SELECT * FROM events WHERE aggregate_id = ? ORDER BY version',
[aggregateId]
);
}
}
// 2. Snapshots (Performance)
class SnapshotManager {
async createSnapshot(aggregateId: string, state: State): Promise<void> {
const version = await this.getCurrentVersion(aggregateId);
// Save snapshot
await this.database.insert('snapshots', {
aggregateId,
state,
version,
timestamp: Date.now()
});
}
async loadWithSnapshot(aggregateId: string): Promise<State> {
// Load latest snapshot
const snapshot = await this.getLatestSnapshot(aggregateId);
if (snapshot && this.shouldUseSnapshot(snapshot)) {
// Load events after snapshot
const events = await this.eventStore.getEvents(aggregateId, snapshot.version);
// Replay from snapshot
let state = snapshot.state;
for (const event of events) {
state = await this.applyEvent(state, event);
}
return state;
} else {
// No snapshot: Load all events
return await this.loadFromEvents(aggregateId);
}
}
shouldUseSnapshot(snapshot: Snapshot): boolean {
// Use snapshot if not too old
const age = Date.now() - snapshot.timestamp;
return age < 86400000; // 24 hours
}
}
// 3. Projections (Read Models)
class ProjectionManager {
async updateProjection(event: Event): Promise<void> {
// Update read model from event
switch (event.type) {
case 'TransactionCreated':
await this.updateTransactionView(event);
break;
case 'AccountBalanceUpdated':
await this.updateAccountBalanceView(event);
break;
}
}
async updateTransactionView(event: TransactionCreatedEvent): Promise<void> {
await this.readDatabase.upsert('transaction_views', {
id: event.transactionId,
accountId: event.accountId,
amount: event.amount,
type: event.type,
timestamp: event.timestamp
});
}
}
// 4. Event Versioning
class EventVersioner {
async version(event: Event): Promise<VersionedEvent> {
// Add version to event
return {
...event,
version: this.getEventVersion(event.type),
schema: this.getEventSchema(event.type)
};
}
async migrate(event: Event, fromVersion: number, toVersion: number): Promise<Event> {
// Migrate event between versions
const migrations = this.getMigrations(event.type, fromVersion, toVersion);
let migratedEvent = event;
for (const migration of migrations) {
migratedEvent = await migration.apply(migratedEvent);
}
return migratedEvent;
}
}
// 5. Performance Optimization
optimizePerformance(): void {
// Batch event writes
this.batchEvents();
// Use snapshots for aggregates with many events
this.createSnapshotsForLargeAggregates();
// Cache frequently accessed aggregates
this.cacheAggregates();
}
}
Features:
- Event store: Partitioned, versioned events
- Snapshots: Periodic snapshots for performance
- Projections: Read models for queries
- Event versioning: Handle schema evolution
- Performance: Batching, caching, snapshots
Key Takeaways
- Event sourcing: Store state changes as events, not current state
- Event store: Immutable, append-only log of events
- State reconstruction: Replay events to build current state
- Snapshots: Save state periodically to avoid replaying all events
- Projections: Build read models for queries (don't replay for every query)
- Event versioning: Handle schema evolution, backward compatibility
- Benefits: Complete history, audit trail, time travel, debugging
- Best practices: Use snapshots, build projections, version events, optimize performance