Task Manager를 옮기며

2020년 10월 18일

예전에 Playground에 Task를 매니징할 수 있는 Task Runner을 구현하고 해당 구현체에 대해 예제 페이지를 만들어 두었습니다. 일전에 올렸던 글에서 언급했던 것 처럼 기존의 Playground에서 새로운 Playground로 필요하거나 재밌었던 것들을 옮기는 작업을 진행 중에 있습니다. 이에 Task Runner도 다시 리펙토링을 거쳐 옮기는 작업을 진행하였는데, 이 과정에 대해 글로 남기면 좋을 것 같아 포스팅을 작성하고 있습니다.

제작 동기

우선 Task Runner, 현재는 Task Manager로 이름을 바꾼 이 Manager를 제작하게 된 동기를 먼저 말씀드리고자 합니다. 이전 회사에서 클라이언트에서 끊김없이 지도를 렌더링하기 위한 많은 고민을 하였습니다. 이때 팀원들과 렌더링 성능 및 여러 참조할 수 있는 글을 읽으며 각 Task들에 대해 매니징을 할 수 있는 Manager에 대해 고민을 했습니다.

오늘날 사용하고 있는 대부분의 기기는 초당 60회의 빈도로 화면을 새로 고침을 합니다. 이때 각 프레임에는 16ms(1초/60)가량의 시간만 할당되고 브라우저에서는 실행 준비를 해야 하므로 10ms 내에 모든 작업을 완료한다고 합니다.

자바스크립트는 브라우저의 메인 스레드에서 스타일 계산, 레이아웃 및 페인트와 함께 실행되고, 장시간 실행되면 다른 작업을 차단하여 프레임이 누락될 가능성이 있어 이를 언제 얼마나 오래 실행할지 전략을 수립해야 합니다. 애니메이션의 경우에는 이상적으로 3-4ms의 영역에서 자바스크립트를 유지해야 Jank를 방지할 수 있다고 합니다.

var taskList = breakBigTaskIntoMicroTasks(monsterTaskList);
requestAnimationFrame(processTaskList);
function processTaskList(taskStartTime) {
  var taskFinishTime;

  do {
    // Assume the next task is pushed onto a stack.
    var nextTask = taskList.pop();
    // Process nextTask.
    processTask(nextTask);

    // Go again if there’s enough time to do the next task.
    taskFinishTime = window.performance.now();
  } while (taskFinishTime - taskStartTime < 3);
  if (taskList.length > 0)
    requestAnimationFrame(processTaskList);

}

해당 글에서 상단의 코드를 찾을 수 있는데 이를 간략이 설명하자면 다음과 같습니다. 이 코드의 핵심은 requestAnimationFrame에 의해 트리거되는 processTaskList에서는 Task의 목록에서 Task들을 꺼내 작업의 수행하다 전체 수행 시간이 3ms가 넘으면 다음 프레임으로 작업을 넘기게 됩니다.

팀원분께서 앞선 코드를 바탕으로 단순한 목록에서 Task를 뽑아 오는 방식에서 우선 순위를 정렬해서 뽑아오는 방식으로 Task에 우선 순위 개념을 가지고 있는 Task Manager 구현체를 만들어 주셨습니다.

이 Task Manager의 우선 순위를 처리하는 부분에서 자바스크립트의 Nativie List 대신 여러 개의 Queue를 쓰면 어떨까 생각하며 POC를 해보기 위해 Task Runner를 작성하게 되었습니다.

상세 구현

Queue

우선 Task들을 보관하는 역할을 하는 자료 구조에 대해 말씀드리겠습니다. Nativie List 대신 직접 구현한 Queue를 사용하는 방향으로 잡았습니다. 자바스크립트 Nativie ListQueue처럼 사용이 가능하겠지만 많은 수의 Task들이 삽입되고 빠지는 작업에 shift로 빼고 정렬하는 과정이 부담스러웠기 때문입니다.

export default class Queue {
  get size() {
    return this.end - this.front;
  }

  constructor() {
    this.clear();
  }

  clear() {
    this.store = {};
    this.front = 0;
    this.end = 0;
  }

  enqueue(data) {
    this.store[this.end] = data;
    this.end++;
  }

  dequeue() {
    if (this.front === this.end) {
      return null;
    }

    const data = this.store[this.front];
    delete this.store[this.front];
    this.front++;
    return data;
  }

  peek() {
    if (this.size === 0) {
      return null;
    }

    return this.store[this.front];
  }
}

초기에 Task Runner를 구현할 때 사용한 Queue입니다. 객체를 생성해서 frontend를 증가시키는 방식으로 구현하였습니다.

POC를 작업할 때 제대로 고려하지 못한 부분이 있었는데 그것은 대량 Task의 취소하는 경우에 대한 Queue의 처리였습니다. 당시 저는 정지된 Task는 내부 로직만 수행하지 않고 반환되면 문제가 없을 것이라고 생각하고 구현했습니다.

export interface NodeInterface<T = any> {
  prev: NodeInterface<T> | null;
  next: NodeInterface<T> | null;
}

export default class Node<T> implements NodeInterface<T> {
  constructor(
    public prev: Node<T> | null = null,
    public next: Node<T> | null = null
  ) {
  }

  public destroy(): void {
    this.prev = null;
    this.next = null;
  }
}
import Node, { NodeInterface } from './Node';

export default class Queue<T> {
  public get size(): number {
    return this.count;
  }

  public get isEmpty(): boolean {
    return this.count === 0;
  }

  public get first(): NodeInterface<T> | null {
    if (this.isEmpty) {
      return null;
    }
    return this.head.next;
  }

  private head: NodeInterface<T> = new Node<T>();
  private tail: NodeInterface<T> = new Node<T>();
  private count: number = 0;

  constructor() {
    this.clear();
  }

  public clear(): void {
    this.head.next = this.tail;
    this.tail.prev = this.head;
    this.count = 0;
  }

  public enqueue(node: NodeInterface<T>): void {
    const tail = this.tail;
    const prev = tail.prev;

    prev.next = node;
    node.prev = prev;
    node.next = tail;
    tail.prev = node;
    this.count += 1;
  }

  public dequeue(): NodeInterface<T> | null {
    if (this.isEmpty) {
      return null;
    }
    const head = this.head;
    const node = head.next;
    const next = node.next;

    next.prev = head;
    head.next = next;
    node.prev = null;
    node.next = null;
    this.count -= 1;

    return node;
  }

  public remove(node: NodeInterface<T>): void {
    if (!node.prev || !node.next) {
      return;
    }

    const prev = node.prev;
    const next = node.next;

    prev.next = next;
    next.prev = prev;
    node.prev = null;
    node.next = null;
    this.count -= 1;
  }
}

하지만 이후 직접 사용해보며 엄청나게 많이 삽입한 Task들을 정지하면 아주 짧은 시간의 수행 시간이지만 많은 수의 정지된 Task들를 Queue에서 해소하기 위해 예상보다 많은 시간이 드는 것을 확인할 수 있었습니다. 이를 해결하기 위해서 Queue의 중간의 요소들을 삭제하기 위한 방법에 대해 고민을 하였고 이를 Double Linked List를 사용해서 해결하고자 하였습니다.

이렇다면 일반적인 삽입, 삭제도 Big O가 상수가 되고, 중간의 노드만 알 수 있다면 중간의 노드 삭제도 Big O가 상수가 될거라고 판단해서 입니다.

Task

수행될 Taskaction이라는 작은 작업을 수행하는 함수를 받도록 하였습니다. 이를 바탕으로 Task Runner에 의해 process가 해당 함수를 실행하게 됩니다.

export class Task extends EventEmitter {

  get uuid() {
    return this._uuid;
  }

  constructor(action, ...args) {
    super();
    this._deferred = defer();
    this._uuid = this._uuidv4();
    this._initAction(this._deferred, action, args);
    this.status = TASK_STATUS.NORMAL;
    this.promise = this._deferred.promise;
  }

  process() {
    if (this.status !== TASK_STATUS.NORMAL) {
      if (this.status === TASK_STATUS.ABORT) {
        this._abort();
      }
      return;
    }

    this.action();
  }

  _initAction(deferred, action, args) {
    this.action = () => {
      let r;
      try {
        r = action.apply(null, args);
      } catch (e) {
        this._error(e);
        return;
      }
      this._done(r);
    };
  }

  _uuidv4() {
    return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {
      const r = Math.random() * 16 | 0, v = c === 'x' ? r : (r & 0x3 | 0x8);
      return v.toString(16);
    });
  }

  _error(e) {
    this._deferred.reject(e);
    this.status = TASK_STATUS.ERROR;
    this.emit('error', e);
  }

  _done(r) {
    this._deferred.resolve(r);
    this.status = TASK_STATUS.DONE;
    this.emit('done', r);
  }

  _abort() {
    this._deferred.resolve();
    this.status = TASK_STATUS.ABORT;
    this.emit('abort');
  }
}

Task Runner를 작성 당시 구현했던 TaskEventEmitter를 상속하고 Task 참조를 통해서 Promise를 사용할 수 있도록 Defer를 사용하여 처리하였습니다.

import { EventEmitter } from 'events';
import { v4 as uuid } from 'uuid';
import {
  TaskPriority,
  TaskStatus
} from './enum';
import { NodeInterface } from '@libs/task-manager/Node';

export default class Task extends EventEmitter implements NodeInterface {
  public data: null = null;
  public prev: NodeInterface;
  public next: NodeInterface;

  public get status(): TaskStatus {
    return this._status;
  }

  public get uuid(): string {
    return this._uuid;
  }

  public get priority(): TaskPriority {
    return this._priority;
  }

  protected _action: (...args: any[]) => Promise<void>;
  private _status: TaskStatus;

  private readonly _uuid: string;
  private readonly _priority: TaskPriority;

  constructor(
    priority: TaskPriority,
    action: Function,
    ...args: any[]
  ) {
    super();
    this._uuid = uuid();
    this._status = TaskStatus.NORMAL;
    this._priority = priority;
    this.initAction(action, args);
  }

  public async process(...args: any[]): Promise<void> {
    if (this.status !== TaskStatus.NORMAL) {
      return;
    }

    await this._action.apply(this, args);
  }

  public abort(): void {
    this._status = TaskStatus.ABORT;
    this.emit('abort');
  }

  protected initAction(action: Function, args: any[]): void {
    this._action = async (): Promise<void> => {
      let r: any;
      try {
        r = await action.apply(null, args);
      } catch (e) {
        this.error(e);
        return;
      }
      this.done(r);
    };
  }

  protected error(e: any): void {
    this._status = TaskStatus.ERROR;
    this.emit('error', e);
  }

  protected done(r): void {
    this._status = TaskStatus.DONE;
    this.emit('done', r);
  }
}

이번에 리펙토링을 거치며 Promise를 사용하는 부분은 EventEmitter 만으로 충분하다고 생각하였고, 오히려 process를 수행하는 부분만 비동기로 처리되는 것이 좋겠다고 생각해서 변경하습니다.

WorkerTask도 같이 개편하였는데 기존에 사용하던 Deprecated된 threadpool.js대신 새롭게 threads.js를 사용해보고자 했지만 Next.js에서 처리해야할 것도 많고 해서 기존의 threadpool.js를 사용하게 되어 큰 차이가 없어 이 글에서는 생략하고자 합니다.

TaskManager

import { EventEmitter } from 'events';
import { TaskPriority } from '@libs/task-manager/enum';
import Queue from '@libs/task-manager/Queue';
import Task from '@libs/task-manager/Task';
import WorkerTask from '@libs/task-manager/WorkerTask';
import ThreadPool from '@libs/thread-pool/thread-pool';

export interface TaskManagerOptions {
  processLimitTime?: number;
  priorities?: TaskPriority[];
}

export interface TaskManagerStatus {
  priority: TaskPriority;
  size: number;
}

const DEFAULT_OPTIONS: TaskManagerOptions = {
  processLimitTime: 3,
  priorities: [
    TaskPriority.CRITICAL,
    TaskPriority.ESSENTIAL,
    TaskPriority.NORMAL,
    TaskPriority.IDLE,
    TaskPriority.WORKER,
  ]
};

export default class TaskManager extends EventEmitter {
  private readonly processLimitTime: number;
  private readonly priorities: TaskPriority[];
  private readonly queueMap: Map<TaskPriority, Queue<Task>> = new Map();
  private readonly threadPool: ThreadPool = new ThreadPool();
  private readonly threadMap: Map<string, WorkerTask> = new Map();

  public get status(): TaskManagerStatus[] {
    return this.priorities
      .map(priority => ({
        priority,
        size: priority === TaskPriority.WORKER ? this.threadMap.size : this.queueMap.get(priority).size,
      }));
  }

  constructor(options?: TaskManagerOptions) {
    super();

    options = {
      ...DEFAULT_OPTIONS,
      ...options,
    };

    this.processLimitTime = options.processLimitTime;
    this.priorities = options.priorities;
    this.priorities.forEach(p => this.queueMap.set(p, new Queue()));
    requestAnimationFrame(this.run.bind(this));
  }

  public terminate(priority: TaskPriority): void {
    if (!priority || !this.queueMap.has(priority)) {
      throw new Error(`This priority is not exist: ${priority}`);
    }

    this.queueMap.get(priority).clear();
    this.emit('terminate:priority', priority);
  }

  public terminateAll(): void {
    this.priorities.forEach(priority => this.queueMap.get(priority).clear());
    this.threadMap.clear();
    this.threadPool.terminateAll();
    this.emit('terminate:all');
  }

  public enroll(task: Task): Task {
    if (!(task instanceof Task)) {
      throw new Error('This task is not instance of Task');
    }

    if (task instanceof WorkerTask) {
      this.threadMap.set(task.uuid, task);
      this.processThreadTask(task);
    } else {
      if (!task.priority || !this.queueMap.has(task.priority)) {
        throw new Error(`This priority is not exist: ${task.priority}`);
      }

      this.queueMap.get(task.priority).enqueue(task);
    }

    this.emit('task:enroll', task);

    return task;
  }

  public cancel(task: Task): Task {
    if (!(task instanceof Task)) {
      throw new Error('This task is not instance of Task');
    }

    if (task instanceof WorkerTask) {
      throw new Error(`Worker not support cancel`);
    } else {
      this.queueMap.get(task.priority).remove(task);
    }

    this.emit('task:cancel', task);

    return task;
  }

  private async run(taskStartTime: number): Promise<void> {
    let taskFinishTime: number;

    do {
      const nextTask = this.getTask();

      if (!nextTask) {
        break;
      }

      await this.processTask(nextTask);

      taskFinishTime = window.performance.now();
    } while (taskFinishTime - taskStartTime < this.processLimitTime);

    requestAnimationFrame(this.run.bind(this));
  }

  private getTask(): Task {
    let task = null;

    this.priorities.some(priority => {
      task = this.queueMap.get(priority).dequeue();
      return !!task;
    });

    return task;
  }

  private async processTask(task: Task): Promise<void> {
    await task.process();
    this.emit('task:process');
  }

  private async processThreadTask(task: WorkerTask): Promise<void> {
    await task.process(this.threadPool);

    const update = () => {
      this.threadMap.delete(task.uuid);
      this.emit('worker-task:process');
    };
    task.once('done', update);
    task.once('error', update);
  }
}

TaskManager의 경우 Task Runner을 작성했던 때와 크게 구조상 달라진 것이 없습니다. 우선 순위를 처리하기 위해 우선 순위를 처리하는 ListMap, Queue가 있고, 이를 바탕으로 Task들을 enroll하거나 cancel, terminate한다는 것 입니다. 등록된 Task들은 run에 의해 주어진 processLimitTime 시간 동안 Task들을 꺼내 수행합니다.

마치며

기회가 된다면 Task Runner를 한번 정리하고 싶었습니다. 기존에 작성했던 Queue가 마음한 구석에 늘 불편함으로 남아있었기 때문입니다. 이번에 기회가 되서 이름도 바꾸고 코드도 정리할 수 있어서 즐거운 시간이 된 것 같습니다.

사실 FrontEnd의 웬만한 상황에서 이런 코드를 사용하게 되는 경우는 없다고 생각합니다. 웬만한 상황에서 이런 코드가 필요한 경구가 생긴다면 저는 필명 무엇인가 잘 못 됬다고 생각할 것 같습니다. 다만 당시의 상황에서 꼭 필요했던 도구로서 제 추억 한편에 크게 잡은 도구여서 정리하는데 즐거웠던 것 같습니다.

본 글에 대한 예제는 Task Manager에서 확인하실 수 있습니다.

Recently posts
© 2016-2023 smilecat.dev