Observer Pattern in TypeScript
This lesson explores TypeScript-specific implementations of the Observer pattern, including EventEmitter, RxJS, and modern reactive patterns.
TypeScript EventEmitter Pattern
typescript1type EventMap = Record<string, any>;2type EventKey<T extends EventMap> = string & keyof T;3type EventHandler<T> = (params: T) => void;45class TypedEventEmitter<T extends EventMap> {6 private listeners: {7 [K in keyof T]?: Set<EventHandler<T[K]>>;8 } = {};910 on<K extends EventKey<T>>(eventName: K, handler: EventHandler<T[K]>): () => void {11 if (!this.listeners[eventName]) {12 this.listeners[eventName] = new Set();13 }14 this.listeners[eventName]!.add(handler);1516 // Return unsubscribe function17 return () => this.off(eventName, handler);18 }1920 off<K extends EventKey<T>>(eventName: K, handler: EventHandler<T[K]>): void {21 this.listeners[eventName]?.delete(handler);22 }2324 emit<K extends EventKey<T>>(eventName: K, params: T[K]): void {25 this.listeners[eventName]?.forEach((handler) => {26 try {27 handler(params);28 } catch (error) {29 console.error(`Error in ${String(eventName)} handler:`, error);30 }31 });32 }3334 once<K extends EventKey<T>>(eventName: K, handler: EventHandler<T[K]>): void {35 const onceHandler: EventHandler<T[K]> = (params) => {36 handler(params);37 this.off(eventName, onceHandler);38 };39 this.on(eventName, onceHandler);40 }4142 removeAllListeners<K extends EventKey<T>>(eventName?: K): void {43 if (eventName) {44 delete this.listeners[eventName];45 } else {46 this.listeners = {};47 }48 }4950 listenerCount<K extends EventKey<T>>(eventName: K): number {51 return this.listeners[eventName]?.size ?? 0;52 }53}
Package Tracking with EventEmitter
typescript1interface PackageEvents {2 'status:updated': {3 trackingNumber: string;4 oldStatus: string;5 newStatus: string;6 timestamp: Date;7 };8 'location:changed': {9 trackingNumber: string;10 location: string;11 coordinates: { lat: number; lng: number };12 };13 'delivery:estimated': {14 trackingNumber: string;15 estimatedDate: Date;16 };17 'delivery:completed': {18 trackingNumber: string;19 signedBy: string;20 timestamp: Date;21 };22 'exception:occurred': {23 trackingNumber: string;24 type: 'delay' | 'damage' | 'lost' | 'incorrect-address';25 message: string;26 };27}2829class PackageTrackingEmitter extends TypedEventEmitter<PackageEvents> {30 private packages: Map<string, any> = new Map();3132 updateStatus(33 trackingNumber: string,34 newStatus: string,35 oldStatus: string36 ): void {37 this.emit('status:updated', {38 trackingNumber,39 oldStatus,40 newStatus,41 timestamp: new Date(),42 });43 }4445 updateLocation(46 trackingNumber: string,47 location: string,48 coordinates: { lat: number; lng: number }49 ): void {50 this.emit('location:changed', {51 trackingNumber,52 location,53 coordinates,54 });55 }5657 recordDelivery(trackingNumber: string, signedBy: string): void {58 this.emit('delivery:completed', {59 trackingNumber,60 signedBy,61 timestamp: new Date(),62 });63 }6465 reportException(66 trackingNumber: string,67 type: PackageEvents['exception:occurred']['type'],68 message: string69 ): void {70 this.emit('exception:occurred', {71 trackingNumber,72 type,73 message,74 });75 }76}7778// Usage with type safety79const emitter = new PackageTrackingEmitter();8081// Type-safe event handlers82const unsubscribe = emitter.on('status:updated', (event) => {83 console.log(`Package ${event.trackingNumber}: ${event.oldStatus} → ${event.newStatus}`);84});8586emitter.on('delivery:completed', (event) => {87 console.log(`Delivered to ${event.signedBy} at ${event.timestamp}`);88});8990emitter.on('exception:occurred', (event) => {91 console.error(`Exception for ${event.trackingNumber}: ${event.message}`);92});9394// Emit events95emitter.updateStatus('TRK123', 'in-transit', 'pending');96emitter.updateLocation('TRK123', 'Chicago Hub', { lat: 41.8781, lng: -87.6298 });
WeakRef for Memory Management
typescript1class WeakObserver<T> {2 private observerRef: WeakRef<Observer<T>>;3 private cleanupRegistry: FinalizationRegistry<string>;45 constructor(observer: Observer<T>, onCleanup?: (id: string) => void) {6 this.observerRef = new WeakRef(observer);7 this.cleanupRegistry = new FinalizationRegistry((id) => {8 console.log(`Observer ${id} was garbage collected`);9 onCleanup?.(id);10 });1112 const id = Math.random().toString(36);13 this.cleanupRegistry.register(observer, id, observer);14 }1516 deref(): Observer<T> | undefined {17 return this.observerRef.deref();18 }19}2021class WeakSubject<T> implements Subject<T> {22 private observers: WeakObserver<Observer<T>>[] = [];2324 attach(observer: Observer<T>): void {25 const weakObserver = new WeakObserver(observer, (id) => {26 // Remove from array when garbage collected27 this.observers = this.observers.filter((obs) => obs.deref() !== undefined);28 });29 this.observers.push(weakObserver);30 }3132 detach(observer: Observer<T>): void {33 this.observers = this.observers.filter((obs) => obs.deref() !== observer);34 }3536 notify(data: T): void {37 // Clean up dead references38 this.observers = this.observers.filter((weakObs) => {39 const observer = weakObs.deref();40 if (observer) {41 observer.update(data);42 return true;43 }44 return false; // Remove garbage collected observers45 });46 }47}
RxJS-Style Observables
typescript1type Observer<T> = {2 next: (value: T) => void;3 error?: (error: Error) => void;4 complete?: () => void;5};67type Subscription = {8 unsubscribe: () => void;9};1011class Observable<T> {12 constructor(13 private subscribeHandler: (observer: Observer<T>) => () => void14 ) {}1516 subscribe(observer: Partial<Observer<T>>): Subscription {17 const fullObserver: Observer<T> = {18 next: observer.next ?? (() => {}),19 error: observer.error ?? ((error) => console.error(error)),20 complete: observer.complete ?? (() => {}),21 };2223 const unsubscribe = this.subscribeHandler(fullObserver);2425 return {26 unsubscribe,27 };28 }2930 // Operators31 map<R>(transform: (value: T) => R): Observable<R> {32 return new Observable((observer) => {33 return this.subscribe({34 next: (value) => observer.next(transform(value)),35 error: (error) => observer.error?.(error),36 complete: () => observer.complete?.(),37 }).unsubscribe;38 });39 }4041 filter(predicate: (value: T) => boolean): Observable<T> {42 return new Observable((observer) => {43 return this.subscribe({44 next: (value) => {45 if (predicate(value)) {46 observer.next(value);47 }48 },49 error: (error) => observer.error?.(error),50 complete: () => observer.complete?.(),51 }).unsubscribe;52 });53 }5455 static fromEvent<T extends EventMap, K extends EventKey<T>>(56 emitter: TypedEventEmitter<T>,57 eventName: K58 ): Observable<T[K]> {59 return new Observable((observer) => {60 const handler = (value: T[K]) => observer.next(value);61 return emitter.on(eventName, handler);62 });63 }64}6566// Package tracking as Observable67class ObservablePackageTracker {68 private emitter = new PackageTrackingEmitter();6970 statusUpdates$: Observable<PackageEvents['status:updated']>;71 deliveries$: Observable<PackageEvents['delivery:completed']>;72 exceptions$: Observable<PackageEvents['exception:occurred']>;7374 constructor() {75 this.statusUpdates$ = Observable.fromEvent(this.emitter, 'status:updated');76 this.deliveries$ = Observable.fromEvent(this.emitter, 'delivery:completed');77 this.exceptions$ = Observable.fromEvent(this.emitter, 'exception:occurred');78 }7980 updateStatus(trackingNumber: string, newStatus: string, oldStatus: string): void {81 this.emitter.updateStatus(trackingNumber, newStatus, oldStatus);82 }8384 recordDelivery(trackingNumber: string, signedBy: string): void {85 this.emitter.recordDelivery(trackingNumber, signedBy);86 }87}8889// Usage with reactive operators90const tracker = new ObservablePackageTracker();9192// Filter and transform93const deliveredPackages$ = tracker.statusUpdates$94 .filter((event) => event.newStatus === 'delivered')95 .map((event) => event.trackingNumber);9697deliveredPackages$.subscribe({98 next: (trackingNumber) => console.log(`Delivered: ${trackingNumber}`),99 error: (error) => console.error('Error:', error),100 complete: () => console.log('Stream completed'),101});102103// Multiple subscriptions104tracker.exceptions$.subscribe({105 next: (event) => {106 // Log to error tracking service107 console.error(`Exception: ${event.type} - ${event.message}`);108 },109});110111tracker.deliveries$.subscribe({112 next: (event) => {113 // Update analytics114 console.log(`Delivered to ${event.signedBy}`);115 },116});
Async Iterators for Observables
typescript1class AsyncObservable<T> {2 private handlers: Set<(value: T) => void> = new Set();3 private queue: T[] = [];4 private resolvers: ((value: T) => void)[] = [];56 emit(value: T): void {7 if (this.resolvers.length > 0) {8 const resolve = this.resolvers.shift()!;9 resolve(value);10 } else {11 this.queue.push(value);12 }1314 // Also notify synchronous handlers15 this.handlers.forEach((handler) => handler(value));16 }1718 async *[Symbol.asyncIterator](): AsyncIterator<T> {19 while (true) {20 if (this.queue.length > 0) {21 yield this.queue.shift()!;22 } else {23 yield await new Promise<T>((resolve) => {24 this.resolvers.push(resolve);25 });26 }27 }28 }2930 subscribe(handler: (value: T) => void): () => void {31 this.handlers.add(handler);32 return () => this.handlers.delete(handler);33 }34}3536// Usage with async iteration37const packageUpdates = new AsyncObservable<PackageEvents['status:updated']>();3839// Consume with for-await-of40async function processUpdates() {41 for await (const update of packageUpdates) {42 console.log(`Processing: ${update.trackingNumber}`);4344 // Can break based on conditions45 if (update.newStatus === 'delivered') {46 break;47 }48 }49}5051processUpdates();5253// Emit updates54packageUpdates.emit({55 trackingNumber: 'TRK123',56 oldStatus: 'pending',57 newStatus: 'in-transit',58 timestamp: new Date(),59});
Decorator for Observable Properties
typescript1function Observable<T extends object, K extends keyof T>(2 target: T,3 propertyKey: K4): void {5 const privateKey = Symbol(`__${String(propertyKey)}__`);6 const observers = new Set<(value: T[K]) => void>();78 Object.defineProperty(target, propertyKey, {9 get(): T[K] {10 return (this as any)[privateKey];11 },12 set(newValue: T[K]): void {13 const oldValue = (this as any)[privateKey];14 (this as any)[privateKey] = newValue;1516 if (oldValue !== newValue) {17 observers.forEach((observer) => observer(newValue));18 }19 },20 enumerable: true,21 configurable: true,22 });2324 // Add method to subscribe25 const subscriberKey = `on${String(propertyKey).charAt(0).toUpperCase()}${String(26 propertyKey27 ).slice(1)}Change`;2829 (target as any)[subscriberKey] = (observer: (value: T[K]) => void) => {30 observers.add(observer);31 return () => observers.delete(observer);32 };33}3435class Package {36 @Observable37 status: string = 'pending';3839 @Observable40 location: string = '';4142 // TypeScript generates these methods43 onStatusChange!: (observer: (value: string) => void) => () => void;44 onLocationChange!: (observer: (value: string) => void) => () => void;45}4647// Usage48const pkg = new Package();4950pkg.onStatusChange((status) => {51 console.log(`Status changed to: ${status}`);52});5354pkg.status = 'in-transit'; // Automatically notifies observers
Best Practices for TypeScript
- Use typed event maps - Define all event types in a single interface
- Return unsubscribe functions - Easier cleanup than passing observer back
- Leverage WeakRef - Automatic cleanup for observers
- Consider RxJS for complex cases - Don't reinvent the wheel
- Use decorators for reactive properties - Clean syntax for observable properties
- Type-safe emitters - Prevent runtime errors with compile-time checks
Summary
TypeScript provides powerful tools for implementing the Observer pattern with type safety, automatic memory management with WeakRef, and reactive programming patterns. Choose the right abstraction level for your use case - from simple EventEmitter to full reactive streams.