15 minlesson

Observer Pattern in TypeScript

Observer Pattern in TypeScript

This lesson explores TypeScript-specific implementations of the Observer pattern, including EventEmitter, RxJS, and modern reactive patterns.

TypeScript EventEmitter Pattern

typescript
1type EventMap = Record<string, any>;
2type EventKey<T extends EventMap> = string & keyof T;
3type EventHandler<T> = (params: T) => void;
4
5class TypedEventEmitter<T extends EventMap> {
6 private listeners: {
7 [K in keyof T]?: Set<EventHandler<T[K]>>;
8 } = {};
9
10 on<K extends EventKey<T>>(eventName: K, handler: EventHandler<T[K]>): () => void {
11 if (!this.listeners[eventName]) {
12 this.listeners[eventName] = new Set();
13 }
14 this.listeners[eventName]!.add(handler);
15
16 // Return unsubscribe function
17 return () => this.off(eventName, handler);
18 }
19
20 off<K extends EventKey<T>>(eventName: K, handler: EventHandler<T[K]>): void {
21 this.listeners[eventName]?.delete(handler);
22 }
23
24 emit<K extends EventKey<T>>(eventName: K, params: T[K]): void {
25 this.listeners[eventName]?.forEach((handler) => {
26 try {
27 handler(params);
28 } catch (error) {
29 console.error(`Error in ${String(eventName)} handler:`, error);
30 }
31 });
32 }
33
34 once<K extends EventKey<T>>(eventName: K, handler: EventHandler<T[K]>): void {
35 const onceHandler: EventHandler<T[K]> = (params) => {
36 handler(params);
37 this.off(eventName, onceHandler);
38 };
39 this.on(eventName, onceHandler);
40 }
41
42 removeAllListeners<K extends EventKey<T>>(eventName?: K): void {
43 if (eventName) {
44 delete this.listeners[eventName];
45 } else {
46 this.listeners = {};
47 }
48 }
49
50 listenerCount<K extends EventKey<T>>(eventName: K): number {
51 return this.listeners[eventName]?.size ?? 0;
52 }
53}

Package Tracking with EventEmitter

typescript
1interface PackageEvents {
2 'status:updated': {
3 trackingNumber: string;
4 oldStatus: string;
5 newStatus: string;
6 timestamp: Date;
7 };
8 'location:changed': {
9 trackingNumber: string;
10 location: string;
11 coordinates: { lat: number; lng: number };
12 };
13 'delivery:estimated': {
14 trackingNumber: string;
15 estimatedDate: Date;
16 };
17 'delivery:completed': {
18 trackingNumber: string;
19 signedBy: string;
20 timestamp: Date;
21 };
22 'exception:occurred': {
23 trackingNumber: string;
24 type: 'delay' | 'damage' | 'lost' | 'incorrect-address';
25 message: string;
26 };
27}
28
29class PackageTrackingEmitter extends TypedEventEmitter<PackageEvents> {
30 private packages: Map<string, any> = new Map();
31
32 updateStatus(
33 trackingNumber: string,
34 newStatus: string,
35 oldStatus: string
36 ): void {
37 this.emit('status:updated', {
38 trackingNumber,
39 oldStatus,
40 newStatus,
41 timestamp: new Date(),
42 });
43 }
44
45 updateLocation(
46 trackingNumber: string,
47 location: string,
48 coordinates: { lat: number; lng: number }
49 ): void {
50 this.emit('location:changed', {
51 trackingNumber,
52 location,
53 coordinates,
54 });
55 }
56
57 recordDelivery(trackingNumber: string, signedBy: string): void {
58 this.emit('delivery:completed', {
59 trackingNumber,
60 signedBy,
61 timestamp: new Date(),
62 });
63 }
64
65 reportException(
66 trackingNumber: string,
67 type: PackageEvents['exception:occurred']['type'],
68 message: string
69 ): void {
70 this.emit('exception:occurred', {
71 trackingNumber,
72 type,
73 message,
74 });
75 }
76}
77
78// Usage with type safety
79const emitter = new PackageTrackingEmitter();
80
81// Type-safe event handlers
82const unsubscribe = emitter.on('status:updated', (event) => {
83 console.log(`Package ${event.trackingNumber}: ${event.oldStatus}${event.newStatus}`);
84});
85
86emitter.on('delivery:completed', (event) => {
87 console.log(`Delivered to ${event.signedBy} at ${event.timestamp}`);
88});
89
90emitter.on('exception:occurred', (event) => {
91 console.error(`Exception for ${event.trackingNumber}: ${event.message}`);
92});
93
94// Emit events
95emitter.updateStatus('TRK123', 'in-transit', 'pending');
96emitter.updateLocation('TRK123', 'Chicago Hub', { lat: 41.8781, lng: -87.6298 });

WeakRef for Memory Management

typescript
1class WeakObserver<T> {
2 private observerRef: WeakRef<Observer<T>>;
3 private cleanupRegistry: FinalizationRegistry<string>;
4
5 constructor(observer: Observer<T>, onCleanup?: (id: string) => void) {
6 this.observerRef = new WeakRef(observer);
7 this.cleanupRegistry = new FinalizationRegistry((id) => {
8 console.log(`Observer ${id} was garbage collected`);
9 onCleanup?.(id);
10 });
11
12 const id = Math.random().toString(36);
13 this.cleanupRegistry.register(observer, id, observer);
14 }
15
16 deref(): Observer<T> | undefined {
17 return this.observerRef.deref();
18 }
19}
20
21class WeakSubject<T> implements Subject<T> {
22 private observers: WeakObserver<Observer<T>>[] = [];
23
24 attach(observer: Observer<T>): void {
25 const weakObserver = new WeakObserver(observer, (id) => {
26 // Remove from array when garbage collected
27 this.observers = this.observers.filter((obs) => obs.deref() !== undefined);
28 });
29 this.observers.push(weakObserver);
30 }
31
32 detach(observer: Observer<T>): void {
33 this.observers = this.observers.filter((obs) => obs.deref() !== observer);
34 }
35
36 notify(data: T): void {
37 // Clean up dead references
38 this.observers = this.observers.filter((weakObs) => {
39 const observer = weakObs.deref();
40 if (observer) {
41 observer.update(data);
42 return true;
43 }
44 return false; // Remove garbage collected observers
45 });
46 }
47}

RxJS-Style Observables

typescript
1type Observer<T> = {
2 next: (value: T) => void;
3 error?: (error: Error) => void;
4 complete?: () => void;
5};
6
7type Subscription = {
8 unsubscribe: () => void;
9};
10
11class Observable<T> {
12 constructor(
13 private subscribeHandler: (observer: Observer<T>) => () => void
14 ) {}
15
16 subscribe(observer: Partial<Observer<T>>): Subscription {
17 const fullObserver: Observer<T> = {
18 next: observer.next ?? (() => {}),
19 error: observer.error ?? ((error) => console.error(error)),
20 complete: observer.complete ?? (() => {}),
21 };
22
23 const unsubscribe = this.subscribeHandler(fullObserver);
24
25 return {
26 unsubscribe,
27 };
28 }
29
30 // Operators
31 map<R>(transform: (value: T) => R): Observable<R> {
32 return new Observable((observer) => {
33 return this.subscribe({
34 next: (value) => observer.next(transform(value)),
35 error: (error) => observer.error?.(error),
36 complete: () => observer.complete?.(),
37 }).unsubscribe;
38 });
39 }
40
41 filter(predicate: (value: T) => boolean): Observable<T> {
42 return new Observable((observer) => {
43 return this.subscribe({
44 next: (value) => {
45 if (predicate(value)) {
46 observer.next(value);
47 }
48 },
49 error: (error) => observer.error?.(error),
50 complete: () => observer.complete?.(),
51 }).unsubscribe;
52 });
53 }
54
55 static fromEvent<T extends EventMap, K extends EventKey<T>>(
56 emitter: TypedEventEmitter<T>,
57 eventName: K
58 ): Observable<T[K]> {
59 return new Observable((observer) => {
60 const handler = (value: T[K]) => observer.next(value);
61 return emitter.on(eventName, handler);
62 });
63 }
64}
65
66// Package tracking as Observable
67class ObservablePackageTracker {
68 private emitter = new PackageTrackingEmitter();
69
70 statusUpdates$: Observable<PackageEvents['status:updated']>;
71 deliveries$: Observable<PackageEvents['delivery:completed']>;
72 exceptions$: Observable<PackageEvents['exception:occurred']>;
73
74 constructor() {
75 this.statusUpdates$ = Observable.fromEvent(this.emitter, 'status:updated');
76 this.deliveries$ = Observable.fromEvent(this.emitter, 'delivery:completed');
77 this.exceptions$ = Observable.fromEvent(this.emitter, 'exception:occurred');
78 }
79
80 updateStatus(trackingNumber: string, newStatus: string, oldStatus: string): void {
81 this.emitter.updateStatus(trackingNumber, newStatus, oldStatus);
82 }
83
84 recordDelivery(trackingNumber: string, signedBy: string): void {
85 this.emitter.recordDelivery(trackingNumber, signedBy);
86 }
87}
88
89// Usage with reactive operators
90const tracker = new ObservablePackageTracker();
91
92// Filter and transform
93const deliveredPackages$ = tracker.statusUpdates$
94 .filter((event) => event.newStatus === 'delivered')
95 .map((event) => event.trackingNumber);
96
97deliveredPackages$.subscribe({
98 next: (trackingNumber) => console.log(`Delivered: ${trackingNumber}`),
99 error: (error) => console.error('Error:', error),
100 complete: () => console.log('Stream completed'),
101});
102
103// Multiple subscriptions
104tracker.exceptions$.subscribe({
105 next: (event) => {
106 // Log to error tracking service
107 console.error(`Exception: ${event.type} - ${event.message}`);
108 },
109});
110
111tracker.deliveries$.subscribe({
112 next: (event) => {
113 // Update analytics
114 console.log(`Delivered to ${event.signedBy}`);
115 },
116});

Async Iterators for Observables

typescript
1class AsyncObservable<T> {
2 private handlers: Set<(value: T) => void> = new Set();
3 private queue: T[] = [];
4 private resolvers: ((value: T) => void)[] = [];
5
6 emit(value: T): void {
7 if (this.resolvers.length > 0) {
8 const resolve = this.resolvers.shift()!;
9 resolve(value);
10 } else {
11 this.queue.push(value);
12 }
13
14 // Also notify synchronous handlers
15 this.handlers.forEach((handler) => handler(value));
16 }
17
18 async *[Symbol.asyncIterator](): AsyncIterator<T> {
19 while (true) {
20 if (this.queue.length > 0) {
21 yield this.queue.shift()!;
22 } else {
23 yield await new Promise<T>((resolve) => {
24 this.resolvers.push(resolve);
25 });
26 }
27 }
28 }
29
30 subscribe(handler: (value: T) => void): () => void {
31 this.handlers.add(handler);
32 return () => this.handlers.delete(handler);
33 }
34}
35
36// Usage with async iteration
37const packageUpdates = new AsyncObservable<PackageEvents['status:updated']>();
38
39// Consume with for-await-of
40async function processUpdates() {
41 for await (const update of packageUpdates) {
42 console.log(`Processing: ${update.trackingNumber}`);
43
44 // Can break based on conditions
45 if (update.newStatus === 'delivered') {
46 break;
47 }
48 }
49}
50
51processUpdates();
52
53// Emit updates
54packageUpdates.emit({
55 trackingNumber: 'TRK123',
56 oldStatus: 'pending',
57 newStatus: 'in-transit',
58 timestamp: new Date(),
59});

Decorator for Observable Properties

typescript
1function Observable<T extends object, K extends keyof T>(
2 target: T,
3 propertyKey: K
4): void {
5 const privateKey = Symbol(`__${String(propertyKey)}__`);
6 const observers = new Set<(value: T[K]) => void>();
7
8 Object.defineProperty(target, propertyKey, {
9 get(): T[K] {
10 return (this as any)[privateKey];
11 },
12 set(newValue: T[K]): void {
13 const oldValue = (this as any)[privateKey];
14 (this as any)[privateKey] = newValue;
15
16 if (oldValue !== newValue) {
17 observers.forEach((observer) => observer(newValue));
18 }
19 },
20 enumerable: true,
21 configurable: true,
22 });
23
24 // Add method to subscribe
25 const subscriberKey = `on${String(propertyKey).charAt(0).toUpperCase()}${String(
26 propertyKey
27 ).slice(1)}Change`;
28
29 (target as any)[subscriberKey] = (observer: (value: T[K]) => void) => {
30 observers.add(observer);
31 return () => observers.delete(observer);
32 };
33}
34
35class Package {
36 @Observable
37 status: string = 'pending';
38
39 @Observable
40 location: string = '';
41
42 // TypeScript generates these methods
43 onStatusChange!: (observer: (value: string) => void) => () => void;
44 onLocationChange!: (observer: (value: string) => void) => () => void;
45}
46
47// Usage
48const pkg = new Package();
49
50pkg.onStatusChange((status) => {
51 console.log(`Status changed to: ${status}`);
52});
53
54pkg.status = 'in-transit'; // Automatically notifies observers

Best Practices for TypeScript

  1. Use typed event maps - Define all event types in a single interface
  2. Return unsubscribe functions - Easier cleanup than passing observer back
  3. Leverage WeakRef - Automatic cleanup for observers
  4. Consider RxJS for complex cases - Don't reinvent the wheel
  5. Use decorators for reactive properties - Clean syntax for observable properties
  6. Type-safe emitters - Prevent runtime errors with compile-time checks

Summary

TypeScript provides powerful tools for implementing the Observer pattern with type safety, automatic memory management with WeakRef, and reactive programming patterns. Choose the right abstraction level for your use case - from simple EventEmitter to full reactive streams.