Next.js에 Worker 설정

2020년 12월 17일
Guidance Simulator

지난 글에 이어 이번에는 Next.js에서 Worker를 설정하는 방법에 대해서 글을 작성하고자 합니다. 이전 글을 작성할 때에는 Simulator를 먼저 작성하고 Worker를 진행할 생각이었지만 개발 편의상 Worker를 먼저 진행하는 쪽이 낫다고 생각하여 순서를 조정하였습니다.

with-web-worker

우선 Next.js에서 Worker를 같이 빌드하게 하려면 어떻게할까 찾아보던 중에, Examples에서 with-web-worker라는 예제를 찾을 수 있었습니다. 예제를 살펴보면 worker-plugin라는 webpack plugin을 다음과 같이 next.config.js에 설정하는 것을 확인할 수 있었습니다.

const WorkerPlugin = require('worker-plugin')

module.exports = {
  webpack: (config, { buildId, dev, isServer, defaultLoaders, webpack }) => {
    if (!isServer) {
      config.plugins.push(
        new WorkerPlugin({
          // use "self" as the global object when receiving hot updates.
          globalObject: 'self',
        })
      )
    }
    return config
  },
}

threads.js

예전에 Task Manager를 작성하면서 Worker를 사용하는 부분을 threadpool.js을 참조해서 사용했었는데, Labs로 옮기면서 다시 확인해보니 threadpool.js는 deprecated되고 threads.js라는 좀 더 개선된 라이브러리를 찾을 수 있었습니다. 당시 threads.js를 사용해보고 싶다는 마음이 생겼지만, Task Manager에서 필요했던 것 이상의 기능으로 인해 포기하고 이번 기회에 Worker를 사용하는 부분에서 사용해보고자 했습니다.

// master.js
import { spawn, Thread, Worker } from "threads"

const auth = await spawn(new Worker("./workers/auth"))
const hashed = await auth.hashPassword("Super secret password", "1234")

console.log("Hashed password:", hashed)

await Thread.terminate(auth)
// workers/auth.js
import sha256 from "js-sha256"
import { expose } from "threads/worker"

expose({
  hashPassword(password, salt) {
    return sha256(password + salt)
  }
})

threads.js 공식 문서에서 webpack에서 사용하는 경우에는 threads-plugin을 설치하고 아래와 같이 설정하도록 가이드하고 있습니다.

const ThreadsPlugin = require('threads-plugin');

module.exports = {
  // ...
  plugins: [
    new ThreadsPlugin()
  ],
  externals: {
    "tiny-worker": "tiny-worker"
  }
  // ...
}

threads-plugin를 살펴보니 worker-plugin를 포크해서 수정한 라이브러리여서 앞선 with-web-worker 예제에서 확인한 Worker 설정을 참고해 Next.js에서 threads.js를 사용할 수 있도록 설정을 시도해 보았습니다.

const WorkerPlugin = require('worker-plugin');

module.exports = {
  webpack: (config, { isServer }) => {
    if (!isServer) {
      config.plugins.push(
        new ThreadsPlugin()
      );
      config.externals.push({"tiny-worker": "tiny-worker"});
    }
    return config;
  },
}

설정하고 동작을 확인해보니 의도한 바와 다르게 동작하는 것을 확인하였습니다. 찾아보니 Next.js support라는 이슈가 이미 올라와 있었고 이에 대한 마땅한 해결책이 제시되어 있지 않아서 threads-plugin를 설정하는 것은 보류하게 되었습니다.

worker-plugin

threads.js를 사용하지 못한다는 부분은 아쉬웠지만 당초 목적인 Worker를 사용하는 부분 자체를 포기한 것이 아니었기 때문에 with-web-worker 예제를 참고하여 worker-plugin를 설정하였습니다. worker-plugin를 설정하고 이런저런 사용을 해보았는데 내부 모듈들을 잘 포함하고, TypeScript에서도 동작함은 물론이고 paths 설정까지 아주 잘 동작하는 것을 확인하였습니다.

const withPlugins = require('next-compose-plugins');
const WorkerPlugin = require('worker-plugin');

const plugins = withPlugins(
  [
    //...
  ],
  {
    //...
    webpack: (config, {isServer}) => {
            if (!isServer) {
        config.plugins.push(
          new WorkerPlugin({
            // use "self" as the global object when receiving hot updates.
            globalObject: 'self',
          })
        );
      }

      return config;
    }
  },
);

module.exports = plugins;

이때 문득 threads.js에서 threads-plugin와 관련 있는 Worker 부분을 worker-plugin를 사용한 기본 Worker로 대치하면 나머지는 사용이 가능하지 않을까 하는 생각이 들었습니다.

// master.js
import { spawn, Thread } from "threads"

const auth = await spawn(new Worker("./workers/auth", { type: 'module' }))const hashed = await auth.hashPassword("Super secret password", "1234")

console.log("Hashed password:", hashed)

await Thread.terminate(auth)

상단과 같이 작성하고 실행해보니 의도한대로 Worker는 잘 빌드해서 가져오며, 이를 바탕으로 threads.js의 다른 기능들도 잘 동작하여 사용하는 데 무리가 없음을 확인할 수 있었습니다.

Vehicle Worker

Worker 설정이 잘 동작하는 것을 확인하고는 해당 기능을 바탕으로 차량에 대해 기본적인 부분을 작성하여 잘 동작하는지 확인해보고자 하였습니다. 차량이 수행할 작업과 차량에 대한 코드를 다음과 같이 작성하였습니다.

export default class Task extends EventEmitter {
  private start: LngLat;
  private goal: LngLat;
  private route: FeatureCollection;
  private status: TaskStatus = TaskStatus.PREPARING;

  constructor(
    private readonly id: string = uuid(),
  ) {
    super();
  }

  public async initialize(start?: LngLat, goal?: LngLat): Promise<void> {
    this.start = start ?? Task.getRandomPoint(AREA);
    this.goal = goal ?? Task.getRandomPoint(AREA);

    const routes = await CONTEXT.contexts.map.repositories.map.routes(
      this.start,
      this.goal,
    )
      .toPromise()
      .catch(_ => this.emit('status', TaskStatus.UNKNOWN));
    this.route = routes?.[0]?.toGeoJson() ?? null;
    this.setStatus(TaskStatus.READY);
  }

  public getStart(): LngLat {
    return this.start;
  }

  public getGoal(): LngLat {
    return this.goal;
  }

  public getRoute(): FeatureCollection {
    return this.route;
  }

  public serialize(): SerializedTask {
    return {
      __type: '$$Task',
      id: this.id,
      start: this.start,
      goal: this.goal,
      route: this.route,
      status: this.status,
    }
  }

  public static deserialize(message: SerializedTask) {
    const task = new Task(message.id);
    task.start = message.start;
    task.goal = message.goal;
    task.route = message.route;
    task.status = message.status;
    return task;
  }

  private setStatus(status: TaskStatus): void {
    this.status = status;
    this.emit('status', status);
  }

  private static getRandomPoint(area: Feature<Polygon>): LngLat {
    const point = randomPoint(1, { bbox: area.bbox }).features[0];

    if (!booleanPointInPolygon(point, area)) {
      return Task.getRandomPoint(area);
    }

    return { lng: point.geometry.coordinates[0], lat: point.geometry.coordinates[1] };
  }
}

export enum TaskStatus {
  UNKNOWN = 'UNKNOWN',
  PREPARING = 'PREPARING',
  READY = 'READY',
}

export interface SerializedTask {
  __type: '$$Task';
  id: string;
  start: LngLat;
  goal: LngLat;
  route: FeatureCollection;
  status: TaskStatus;
}

export const TaskSerializer: SerializerImplementation<SerializedTask, Task> = {
  deserialize(message, defaultHandler) {
    if (message && message.__type === '$$Task') {
      return Task.deserialize(message);
    } else {
      return defaultHandler(message);
    }
  },
  serialize(thing, defaultHandler) {
    if (thing instanceof Task) {
      return thing.serialize();
    } else {
      return defaultHandler(thing);
    }
  }
}

Task는 차량이 수행할 작업에 대한 정보를 가지고 있으며 출발, 도착을 바탕으로 지난 글에서 소개한 경로 탐색 API를 호출하여 경로를 초기화합니다. 장성한 코드를 살펴보면 Taskserializedeserialize를 구현하고 TaskSerializer를 작성함으로써 Worker 간 통신에서 serializable 하도록 추가하였음을 볼 수 있습니다.

export default class Vehicle extends EventEmitter {
  private task: Task = null
  private position: LngLat = null;
  private route: FeatureCollection = null;
  private velocity: number = VELOCITY;
  private acceleration: number = ACCELERATION;
  private status: VehicleStatus = VehicleStatus.PREPARING;

  constructor(
    private readonly id: string = uuid(),
  ) {
    super();
  }

  public initialize(): void {
    this.task = new Task();
    void this.task.initialize(this.position);
    this.task.on('status', status => {
      if (status === TaskStatus.PREPARING) {
        return;
      }

      if (status === TaskStatus.UNKNOWN) {
        this.setStatus(VehicleStatus.UNKNOWN);
        return;
      }

      this.ready();
    });
  }

  public getPosition(): LngLat {
    return this.position;
  }

  public getRoute(): FeatureCollection {
    return this.route;
  }

  public getStart(): LngLat {
    return this.task.getStart();
  }

  public getGoal(): LngLat {
    return this.task.getGoal();
  }

  public getTask(): Task {
    return this.task;
  }

  public setAcceleration(acceleration: number): void {
    this.acceleration = acceleration;
  }

  public serialize(): SerializedVehicle {
    return {
      __type: '$$Vehicle',
      id: this.id,
      position: this.position,
      route: this.route,
      velocity: this.velocity,
      acceleration: this.acceleration,
      status: this.status,
      task: this.task?.serialize(),
    }
  }

  public static deserialize(message: SerializedVehicle): Vehicle {
    const vehicle = new Vehicle(message.id);
    vehicle.position = message.position;
    vehicle.route = message.route;
    vehicle.velocity = message.velocity;
    vehicle.acceleration = message.acceleration;
    vehicle.status = message.status;
    vehicle.task = message.task ? Task.deserialize(message.task) : null;
    return vehicle;
  }

  private ready(): void {
    this.position = this.task.getStart();
    this.setStatus(VehicleStatus.PREPARING);
  }

  private setStatus(status: VehicleStatus): void {
    this.status = status;
    this.emit('status', status);
  }
}

export enum VehicleStatus {
  UNKNOWN = 'UNKNOWN',
  PREPARING = 'PREPARING',
  WORKING = 'WORKING',
  DONE = 'DONE',
}

export interface SerializedVehicle {
  __type: '$$Vehicle';
  id: string;
  position: LngLat;
  route: FeatureCollection;
  velocity: number;
  acceleration: number;
  status: VehicleStatus;
  task: SerializedTask;
}

export const VehicleSerializer: SerializerImplementation<SerializedVehicle, Vehicle> = {
  deserialize(message, defaultHandler) {
    if (message && message.__type === '$$Vehicle') {
      return Vehicle.deserialize(message);
    } else {
      return defaultHandler(message);
    }
  },
  serialize(thing, defaultHandler) {
    if (thing instanceof Vehicle) {
      return thing.serialize();
    } else {
      return defaultHandler(thing);
    }
  }
}

VehicleTask를 생성 및 초기화하도록 추가하고 Task와 마찬가지로 serializable 할 수 있도록 반영하였습니다. 추후 이 클래스를 바탕으로 차량의 행위를 구현할 생각입니다.

registerSerializer(TaskSerializer as any);
registerSerializer(VehicleSerializer as any);

const vehicle = new Vehicle();
const positionSubject = new Subject<LngLat>();
const statusSubject = new Subject<Vehicle>();

const controller = {
  initialize(): void {
    vehicle.initialize();
    vehicle.on('status', _ => {
      positionSubject.next(vehicle.getPosition());
      statusSubject.next(vehicle);
    });
  },
  positions() {
    return Observable.from(positionSubject);
  },
  statuses() {
    return Observable.from(statusSubject);
  },
  kill(): void {
    positionSubject.complete();
    statusSubject.complete();
  },
};

expose(controller);

Worker는 Serializer를 등록하고 상단의 코드와 같이 필요한 행위들을 Main에 노출할 수 있도록 작업하였습니다. 하나의 작업이 끝날 때 마다 새롭게 생성되고 제거되는 방향이 아닌 계산 다음 작업을 자동으로 시작하도록 생각하고 있습니다. 그래서 대부분의 노출되는 행위는 Observable로 구성하고 Main에서는 해당 이벤트를 받아서 처리하도록 할 생각입니다.

const worker = await spawn(new Worker('../modules/worker.ts', { type: 'module' }));
worker.initialize();
worker.positions().subscribe(position => console.log(position));
worker.statuses().subscribe(vehicle => console.log(vehicle));
await Thread.terminate(worker);

Main에서는 다음과 같이 worker를 생성하고 이벤트를 구독하고 있다가 구독된 이벤트에 맞춰서 동작을 할 수 있도록 구성해 두었습니다. 추후 이 부분을 Simulator로 빼고 View에서 Simulator를 구독하여 동작하도록 구현할 생각입니다.

마치며

이번 작업을 수행하면서 Next.js에서 Worker를 설정하는 방법과 threads.js를 사용하는 부분에서 큰 재미를 느낄 수 있었습니다. 예전에 rollup을 가지고 Worker를 설정을 무척 어렵게 했던 반면에 이번 작업에서 이 부분이 너무 쉽게 해결되었고 Worker를 사용하면서 이런저런 통신 구조를 잡느라 불편했던 부분을 threads.js을 통해 손쉽게 해결할 수 있어서 좋았습니다. 이후 이번 글에서 설정한 Worker를 사용하여 Vehicle의 전반적인 동작을 구현하여 찾아뵙도록 하겠습니다.

P.S.

이번 Worker 설정을 하면서 다음 오류로 인해 잠시 헤맸습니다. 타입스크립트에서 self.postMessage(result)를 사용하면 Expected 2-3 arguments, but got 1.라는 경고를 보게 되어 self.postMessage(result, '*')와 같이 사용하고 실행했더니 아래와 같은 오류를 보게 되었습니다.

Uncaught TypeError: Failed to execute 'postMessage' on 'DedicatedWorkerGlobalScope': Overload resolution failed.
    at runParser

이는 Window.postMessage의 두 번째 인자에 targetOrigin을 설정하는 것과 달리 Worker.postMessage의 두 번째 인자에 바로 transfer를 전달해주게 되어 오류가 발생합니다. 이를 해결하기 위해서는 아래와 같이 tsconfig.jsonlib"webworker"를 추가해 주시면 해결할 수 있습니다.

{
  "compilerOptions": {
    "lib": [
      "esnext",
      "webworker"    ]
  }
}
Recently posts
© 2016-2023 smilecat.dev