예전에 Playground에 Task를 매니징할 수 있는 Task Runner을 구현하고 해당 구현체에 대해 예제 페이지를 만들어 두었습니다. 일전에 올렸던 글에서 언급했던 것 처럼 기존의 Playground에서 새로운 Playground로 필요하거나 재밌었던 것들을 옮기는 작업을 진행 중에 있습니다. 이에 Task Runner도 다시 리펙토링을 거쳐 옮기는 작업을 진행하였는데, 이 과정에 대해 글로 남기면 좋을 것 같아 포스팅을 작성하고 있습니다.
우선 Task Runner, 현재는 Task Manager로 이름을 바꾼 이 Manager를 제작하게 된 동기를 먼저 말씀드리고자 합니다. 이전 회사에서 클라이언트에서 끊김없이 지도를 렌더링하기 위한 많은 고민을 하였습니다. 이때 팀원들과 렌더링 성능 및 여러 참조할 수 있는 글을 읽으며 각 Task들에 대해 매니징을 할 수 있는 Manager에 대해 고민을 했습니다.
오늘날 사용하고 있는 대부분의 기기는 초당 60회의 빈도로 화면을 새로 고침을 합니다. 이때 각 프레임에는 16ms(1초/60)가량의 시간만 할당되고 브라우저에서는 실행 준비를 해야 하므로 10ms 내에 모든 작업을 완료한다고 합니다.
자바스크립트는 브라우저의 메인 스레드에서 스타일 계산, 레이아웃 및 페인트와 함께 실행되고, 장시간 실행되면 다른 작업을 차단하여 프레임이 누락될 가능성이 있어 이를 언제 얼마나 오래 실행할지 전략을 수립해야 합니다. 애니메이션의 경우에는 이상적으로 3-4ms의 영역에서 자바스크립트를 유지해야 Jank를 방지할 수 있다고 합니다.
var taskList = breakBigTaskIntoMicroTasks(monsterTaskList);
requestAnimationFrame(processTaskList);
function processTaskList(taskStartTime) {
var taskFinishTime;
do {
// Assume the next task is pushed onto a stack.
var nextTask = taskList.pop();
// Process nextTask.
processTask(nextTask);
// Go again if there’s enough time to do the next task.
taskFinishTime = window.performance.now();
} while (taskFinishTime - taskStartTime < 3);
if (taskList.length > 0)
requestAnimationFrame(processTaskList);
}
해당 글에서 상단의 코드를 찾을 수 있는데 이를 간략이 설명하자면 다음과 같습니다. 이 코드의 핵심은 requestAnimationFrame
에 의해 트리거되는 processTaskList
에서는 Task의 목록에서 Task들을 꺼내 작업의 수행하다 전체 수행 시간이 3ms가 넘으면 다음 프레임으로 작업을 넘기게 됩니다.
팀원분께서 앞선 코드를 바탕으로 단순한 목록에서 Task를 뽑아 오는 방식에서 우선 순위를 정렬해서 뽑아오는 방식으로 Task에 우선 순위 개념을 가지고 있는 Task Manager 구현체를 만들어 주셨습니다.
이 Task Manager의 우선 순위를 처리하는 부분에서 자바스크립트의 Nativie List 대신 여러 개의 Queue를 쓰면 어떨까 생각하며 POC를 해보기 위해 Task Runner를 작성하게 되었습니다.
우선 Task
들을 보관하는 역할을 하는 자료 구조에 대해 말씀드리겠습니다. Nativie List
대신 직접 구현한 Queue
를 사용하는 방향으로 잡았습니다. 자바스크립트 Nativie List
도 Queue
처럼 사용이 가능하겠지만 많은 수의 Task
들이 삽입되고 빠지는 작업에 shift
로 빼고 정렬하는 과정이 부담스러웠기 때문입니다.
export default class Queue {
get size() {
return this.end - this.front;
}
constructor() {
this.clear();
}
clear() {
this.store = {};
this.front = 0;
this.end = 0;
}
enqueue(data) {
this.store[this.end] = data;
this.end++;
}
dequeue() {
if (this.front === this.end) {
return null;
}
const data = this.store[this.front];
delete this.store[this.front];
this.front++;
return data;
}
peek() {
if (this.size === 0) {
return null;
}
return this.store[this.front];
}
}
초기에 Task Runner를 구현할 때 사용한 Queue
입니다. 객체를 생성해서 front
와 end
를 증가시키는 방식으로 구현하였습니다.
POC를 작업할 때 제대로 고려하지 못한 부분이 있었는데 그것은 대량 Task
의 취소하는 경우에 대한 Queue
의 처리였습니다. 당시 저는 정지된 Task
는 내부 로직만 수행하지 않고 반환되면 문제가 없을 것이라고 생각하고 구현했습니다.
export interface NodeInterface<T = any> {
prev: NodeInterface<T> | null;
next: NodeInterface<T> | null;
}
export default class Node<T> implements NodeInterface<T> {
constructor(
public prev: Node<T> | null = null,
public next: Node<T> | null = null
) {
}
public destroy(): void {
this.prev = null;
this.next = null;
}
}
import Node, { NodeInterface } from './Node';
export default class Queue<T> {
public get size(): number {
return this.count;
}
public get isEmpty(): boolean {
return this.count === 0;
}
public get first(): NodeInterface<T> | null {
if (this.isEmpty) {
return null;
}
return this.head.next;
}
private head: NodeInterface<T> = new Node<T>();
private tail: NodeInterface<T> = new Node<T>();
private count: number = 0;
constructor() {
this.clear();
}
public clear(): void {
this.head.next = this.tail;
this.tail.prev = this.head;
this.count = 0;
}
public enqueue(node: NodeInterface<T>): void {
const tail = this.tail;
const prev = tail.prev;
prev.next = node;
node.prev = prev;
node.next = tail;
tail.prev = node;
this.count += 1;
}
public dequeue(): NodeInterface<T> | null {
if (this.isEmpty) {
return null;
}
const head = this.head;
const node = head.next;
const next = node.next;
next.prev = head;
head.next = next;
node.prev = null;
node.next = null;
this.count -= 1;
return node;
}
public remove(node: NodeInterface<T>): void {
if (!node.prev || !node.next) {
return;
}
const prev = node.prev;
const next = node.next;
prev.next = next;
next.prev = prev;
node.prev = null;
node.next = null;
this.count -= 1;
}
}
하지만 이후 직접 사용해보며 엄청나게 많이 삽입한 Task
들을 정지하면 아주 짧은 시간의 수행 시간이지만 많은 수의 정지된 Task
들를 Queue
에서 해소하기 위해 예상보다 많은 시간이 드는 것을 확인할 수 있었습니다. 이를 해결하기 위해서 Queue
의 중간의 요소들을 삭제하기 위한 방법에 대해 고민을 하였고 이를 Double Linked List
를 사용해서 해결하고자 하였습니다.
이렇다면 일반적인 삽입, 삭제도 Big O가 상수가 되고, 중간의 노드만 알 수 있다면 중간의 노드 삭제도 Big O가 상수가 될거라고 판단해서 입니다.
수행될 Task
는 action
이라는 작은 작업을 수행하는 함수를 받도록 하였습니다. 이를 바탕으로 Task Runner에 의해 process
가 해당 함수를 실행하게 됩니다.
export class Task extends EventEmitter {
get uuid() {
return this._uuid;
}
constructor(action, ...args) {
super();
this._deferred = defer();
this._uuid = this._uuidv4();
this._initAction(this._deferred, action, args);
this.status = TASK_STATUS.NORMAL;
this.promise = this._deferred.promise;
}
process() {
if (this.status !== TASK_STATUS.NORMAL) {
if (this.status === TASK_STATUS.ABORT) {
this._abort();
}
return;
}
this.action();
}
_initAction(deferred, action, args) {
this.action = () => {
let r;
try {
r = action.apply(null, args);
} catch (e) {
this._error(e);
return;
}
this._done(r);
};
}
_uuidv4() {
return 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function (c) {
const r = Math.random() * 16 | 0, v = c === 'x' ? r : (r & 0x3 | 0x8);
return v.toString(16);
});
}
_error(e) {
this._deferred.reject(e);
this.status = TASK_STATUS.ERROR;
this.emit('error', e);
}
_done(r) {
this._deferred.resolve(r);
this.status = TASK_STATUS.DONE;
this.emit('done', r);
}
_abort() {
this._deferred.resolve();
this.status = TASK_STATUS.ABORT;
this.emit('abort');
}
}
Task Runner를 작성 당시 구현했던 Task
는 EventEmitter
를 상속하고 Task
참조를 통해서 Promise
를 사용할 수 있도록 Defer
를 사용하여 처리하였습니다.
import { EventEmitter } from 'events';
import { v4 as uuid } from 'uuid';
import {
TaskPriority,
TaskStatus
} from './enum';
import { NodeInterface } from '@libs/task-manager/Node';
export default class Task extends EventEmitter implements NodeInterface {
public data: null = null;
public prev: NodeInterface;
public next: NodeInterface;
public get status(): TaskStatus {
return this._status;
}
public get uuid(): string {
return this._uuid;
}
public get priority(): TaskPriority {
return this._priority;
}
protected _action: (...args: any[]) => Promise<void>;
private _status: TaskStatus;
private readonly _uuid: string;
private readonly _priority: TaskPriority;
constructor(
priority: TaskPriority,
action: Function,
...args: any[]
) {
super();
this._uuid = uuid();
this._status = TaskStatus.NORMAL;
this._priority = priority;
this.initAction(action, args);
}
public async process(...args: any[]): Promise<void> {
if (this.status !== TaskStatus.NORMAL) {
return;
}
await this._action.apply(this, args);
}
public abort(): void {
this._status = TaskStatus.ABORT;
this.emit('abort');
}
protected initAction(action: Function, args: any[]): void {
this._action = async (): Promise<void> => {
let r: any;
try {
r = await action.apply(null, args);
} catch (e) {
this.error(e);
return;
}
this.done(r);
};
}
protected error(e: any): void {
this._status = TaskStatus.ERROR;
this.emit('error', e);
}
protected done(r): void {
this._status = TaskStatus.DONE;
this.emit('done', r);
}
}
이번에 리펙토링을 거치며 Promise
를 사용하는 부분은 EventEmitter
만으로 충분하다고 생각하였고, 오히려 process
를 수행하는 부분만 비동기로 처리되는 것이 좋겠다고 생각해서 변경하습니다.
WorkerTask
도 같이 개편하였는데 기존에 사용하던 Deprecated된 threadpool.js대신 새롭게 threads.js를 사용해보고자 했지만 Next.js에서 처리해야할 것도 많고 해서 기존의 threadpool.js를 사용하게 되어 큰 차이가 없어 이 글에서는 생략하고자 합니다.
import { EventEmitter } from 'events';
import { TaskPriority } from '@libs/task-manager/enum';
import Queue from '@libs/task-manager/Queue';
import Task from '@libs/task-manager/Task';
import WorkerTask from '@libs/task-manager/WorkerTask';
import ThreadPool from '@libs/thread-pool/thread-pool';
export interface TaskManagerOptions {
processLimitTime?: number;
priorities?: TaskPriority[];
}
export interface TaskManagerStatus {
priority: TaskPriority;
size: number;
}
const DEFAULT_OPTIONS: TaskManagerOptions = {
processLimitTime: 3,
priorities: [
TaskPriority.CRITICAL,
TaskPriority.ESSENTIAL,
TaskPriority.NORMAL,
TaskPriority.IDLE,
TaskPriority.WORKER,
]
};
export default class TaskManager extends EventEmitter {
private readonly processLimitTime: number;
private readonly priorities: TaskPriority[];
private readonly queueMap: Map<TaskPriority, Queue<Task>> = new Map();
private readonly threadPool: ThreadPool = new ThreadPool();
private readonly threadMap: Map<string, WorkerTask> = new Map();
public get status(): TaskManagerStatus[] {
return this.priorities
.map(priority => ({
priority,
size: priority === TaskPriority.WORKER ? this.threadMap.size : this.queueMap.get(priority).size,
}));
}
constructor(options?: TaskManagerOptions) {
super();
options = {
...DEFAULT_OPTIONS,
...options,
};
this.processLimitTime = options.processLimitTime;
this.priorities = options.priorities;
this.priorities.forEach(p => this.queueMap.set(p, new Queue()));
requestAnimationFrame(this.run.bind(this));
}
public terminate(priority: TaskPriority): void {
if (!priority || !this.queueMap.has(priority)) {
throw new Error(`This priority is not exist: ${priority}`);
}
this.queueMap.get(priority).clear();
this.emit('terminate:priority', priority);
}
public terminateAll(): void {
this.priorities.forEach(priority => this.queueMap.get(priority).clear());
this.threadMap.clear();
this.threadPool.terminateAll();
this.emit('terminate:all');
}
public enroll(task: Task): Task {
if (!(task instanceof Task)) {
throw new Error('This task is not instance of Task');
}
if (task instanceof WorkerTask) {
this.threadMap.set(task.uuid, task);
this.processThreadTask(task);
} else {
if (!task.priority || !this.queueMap.has(task.priority)) {
throw new Error(`This priority is not exist: ${task.priority}`);
}
this.queueMap.get(task.priority).enqueue(task);
}
this.emit('task:enroll', task);
return task;
}
public cancel(task: Task): Task {
if (!(task instanceof Task)) {
throw new Error('This task is not instance of Task');
}
if (task instanceof WorkerTask) {
throw new Error(`Worker not support cancel`);
} else {
this.queueMap.get(task.priority).remove(task);
}
this.emit('task:cancel', task);
return task;
}
private async run(taskStartTime: number): Promise<void> {
let taskFinishTime: number;
do {
const nextTask = this.getTask();
if (!nextTask) {
break;
}
await this.processTask(nextTask);
taskFinishTime = window.performance.now();
} while (taskFinishTime - taskStartTime < this.processLimitTime);
requestAnimationFrame(this.run.bind(this));
}
private getTask(): Task {
let task = null;
this.priorities.some(priority => {
task = this.queueMap.get(priority).dequeue();
return !!task;
});
return task;
}
private async processTask(task: Task): Promise<void> {
await task.process();
this.emit('task:process');
}
private async processThreadTask(task: WorkerTask): Promise<void> {
await task.process(this.threadPool);
const update = () => {
this.threadMap.delete(task.uuid);
this.emit('worker-task:process');
};
task.once('done', update);
task.once('error', update);
}
}
TaskManager
의 경우 Task Runner을 작성했던 때와 크게 구조상 달라진 것이 없습니다. 우선 순위를 처리하기 위해 우선 순위를 처리하는 List
와 Map
, Queue
가 있고, 이를 바탕으로 Task
들을 enroll
하거나 cancel
, terminate
한다는 것 입니다. 등록된 Task
들은 run
에 의해 주어진 processLimitTime
시간 동안 Task
들을 꺼내 수행합니다.
기회가 된다면 Task Runner를 한번 정리하고 싶었습니다. 기존에 작성했던 Queue가 마음한 구석에 늘 불편함으로 남아있었기 때문입니다. 이번에 기회가 되서 이름도 바꾸고 코드도 정리할 수 있어서 즐거운 시간이 된 것 같습니다.
사실 FrontEnd의 웬만한 상황에서 이런 코드를 사용하게 되는 경우는 없다고 생각합니다. 웬만한 상황에서 이런 코드가 필요한 경구가 생긴다면 저는 필명 무엇인가 잘 못 됬다고 생각할 것 같습니다. 다만 당시의 상황에서 꼭 필요했던 도구로서 제 추억 한편에 크게 잡은 도구여서 정리하는데 즐거웠던 것 같습니다.
본 글에 대한 예제는 Task Manager에서 확인하실 수 있습니다.