实现观察者模式

手写实现观察者模式,支持订阅、取消订阅和通知功能

问题

观察者模式是一种行为设计模式,定义了对象之间的一对多依赖关系。当一个对象(被观察者/主题)的状态发生改变时,所有依赖于它的对象(观察者)都会得到通知并自动更新。

需要实现一个观察者模式,支持以下功能:

  • 添加观察者(订阅)
  • 移除观察者(取消订阅)
  • 通知所有观察者
  • 支持传递数据给观察者

解答

// 被观察者(主题)类
class Subject {
  constructor() {
    // 存储所有观察者
    this.observers = [];
  }

  // 添加观察者
  subscribe(observer) {
    if (typeof observer !== 'function') {
      throw new Error('观察者必须是一个函数');
    }
    // 避免重复添加
    if (!this.observers.includes(observer)) {
      this.observers.push(observer);
    }
    // 返回取消订阅的函数
    return () => this.unsubscribe(observer);
  }

  // 移除观察者
  unsubscribe(observer) {
    const index = this.observers.indexOf(observer);
    if (index > -1) {
      this.observers.splice(index, 1);
    }
  }

  // 通知所有观察者
  notify(data) {
    this.observers.forEach(observer => {
      try {
        observer(data);
      } catch (error) {
        console.error('观察者执行出错:', error);
      }
    });
  }

  // 获取观察者数量
  getObserverCount() {
    return this.observers.length;
  }

  // 清空所有观察者
  clear() {
    this.observers = [];
  }
}

// 支持命名事件的观察者模式(事件总线)
class EventEmitter {
  constructor() {
    // 使用对象存储不同事件的观察者
    this.events = {};
  }

  // 订阅事件
  on(eventName, callback) {
    if (!this.events[eventName]) {
      this.events[eventName] = [];
    }
    this.events[eventName].push(callback);
    
    // 返回取消订阅函数
    return () => this.off(eventName, callback);
  }

  // 订阅一次性事件
  once(eventName, callback) {
    const wrapper = (...args) => {
      callback(...args);
      this.off(eventName, wrapper);
    };
    this.on(eventName, wrapper);
  }

  // 取消订阅
  off(eventName, callback) {
    if (!this.events[eventName]) return;
    
    if (callback) {
      // 移除特定回调
      this.events[eventName] = this.events[eventName].filter(
        cb => cb !== callback
      );
    } else {
      // 移除该事件的所有回调
      delete this.events[eventName];
    }
  }

  // 触发事件
  emit(eventName, ...args) {
    if (!this.events[eventName]) return;
    
    this.events[eventName].forEach(callback => {
      try {
        callback(...args);
      } catch (error) {
        console.error(`事件 ${eventName} 执行出错:`, error);
      }
    });
  }

  // 清空所有事件
  clear() {
    this.events = {};
  }
}

使用示例

// ========== 示例1:基础观察者模式 ==========
console.log('=== 基础观察者模式 ===');
const subject = new Subject();

// 创建观察者
const observer1 = (data) => {
  console.log('观察者1收到通知:', data);
};

const observer2 = (data) => {
  console.log('观察者2收到通知:', data);
};

// 订阅
subject.subscribe(observer1);
subject.subscribe(observer2);

// 通知所有观察者
subject.notify('Hello Observers!');
// 输出:
// 观察者1收到通知: Hello Observers!
// 观察者2收到通知: Hello Observers!

// 取消订阅
subject.unsubscribe(observer1);
subject.notify('Second notification');
// 输出:
// 观察者2收到通知: Second notification

console.log('观察者数量:', subject.getObserverCount()); // 1


// ========== 示例2:使用返回的取消订阅函数 ==========
console.log('\n=== 使用取消订阅函数 ===');
const subject2 = new Subject();

const unsubscribe = subject2.subscribe((data) => {
  console.log('接收到:', data);
});

subject2.notify('消息1'); // 接收到: 消息1
unsubscribe(); // 取消订阅
subject2.notify('消息2'); // 无输出


// ========== 示例3:事件总线模式 ==========
console.log('\n=== 事件总线模式 ===');
const eventBus = new EventEmitter();

// 订阅用户登录事件
eventBus.on('user:login', (user) => {
  console.log('用户登录:', user.name);
});

eventBus.on('user:login', (user) => {
  console.log('记录登录日志:', user.name);
});

// 订阅用户登出事件
eventBus.on('user:logout', (user) => {
  console.log('用户登出:', user.name);
});

// 触发事件
eventBus.emit('user:login', { name: '张三', id: 1 });
// 输出:
// 用户登录: 张三
// 记录登录日志: 张三

eventBus.emit('user:logout', { name: '张三', id: 1 });
// 输出: 用户登出: 张三


// ========== 示例4:一次性事件 ==========
console.log('\n=== 一次性事件 ===');
const emitter = new EventEmitter();

emitter.once('init', () => {
  console.log('初始化完成(只执行一次)');
});

emitter.emit('init'); // 初始化完成(只执行一次)
emitter.emit('init'); // 无输出


// ========== 示例5:实际应用场景 - 状态管理 ==========
console.log('\n=== 状态管理示例 ===');
class Store {
  constructor(initialState = {}) {
    this.state = initialState;
    this.subject = new Subject();
  }

  // 获取状态
  getState() {
    return this.state;
  }

  // 更新状态
  setState(newState) {
    this.state = { ...this.state, ...newState };
    // 通知所有订阅者
    this.subject.notify(this.state);
  }

  // 订阅状态变化
  subscribe(observer) {
    return this.subject.subscribe(observer);
  }
}

const store = new Store({ count: 0, user: null });

// 组件1订阅状态
store.subscribe((state) => {
  console.log('组件1更新:', state);
});

// 组件2订阅状态
store.subscribe((state) => {
  console.log('组件2更新:', state);
});

// 更新状态
store.setState({ count: 1 });
// 输出:
// 组件1更新: { count: 1, user: null }
// 组件2更新: { count: 1, user: null }

store.setState({ user: '李四' });
// 输出:
// 组件1更新: { count: 1, user: '李四' }
// 组件2更新: { count: 1, user: '李四' }

关键点

  • 数据结构:使用数组存储观察者列表,使用对象存储命名事件的观察者映射
  • 订阅方法:添加观察者到列表中,避免重复添加,返回取消订阅函数方便使用
  • 取消订阅:通过 indexOfsplice 从数组中移除指定观察者
  • 通知机制:遍历观察者列表,依次调用每个观察者函数,传递数据参数
  • 错误处理:使用 try-catch 捕获观察者执行中的错误,避免影响其他观察者
  • 链式调用:返回取消订阅函数,支持更优雅的使用方式
  • 事件总线扩展:支持命名事件,实现更灵活的发布订阅模式
  • once 实现:通过包装函数在执行后自动取消订阅,实现一次性监听
  • 实际应用:可用于状态管理、事件系统、组件通信等场景
  • 性能优化:避免重复订阅,提供清空方法释放内存