[Dart] 메시지를 통해 동작하는 Isolate를 추상화하기

2024. 7. 31. 11:32Programming/Flutter

반응형

Isolate에 대해 알아보자에서 Isolate를 사용할 시 유의해야 할 점과, 포트를 사용해서 Isolate 사이에 여러개의 메시지를 주고받기에 기재되어있는 예제를 살펴봤습니다. 유의할 점 중에는 Isolate 갯수가 CPU 코어 갯수±2개 정도를 유지하는게 좋다는 내용이 있었는데요. 이번에는 포트를 사용해서 Isolate 사이에 여러개의 메시지를 주고받기에서 살펴본 내용을 기반으로, 하나의 Isolate를 사용해서 여러개의 작업을 할 수 있도록 수정해보겠습니다.

먼저 예제를 살펴봅시다. 포트를 사용해서 Isolate 사이에 여러개의 메시지를 주고받기에 기재되어있는 예제는 아래와 같습니다.

import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = await Worker.spawn();
  print(await worker.parseJson('{"key":"value"}'));
  print(await worker.parseJson('"banana"'));
  print(await worker.parseJson('[true, false, null, 1, "string"]'));
  print(
    await Future.wait(
      [
        worker.parseJson('"yes"'),
        worker.parseJson('"no"'),
      ],
    ),
  );
  worker.close();
}

class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;
  bool _closed = false;

  Future<Object?> parseJson(String message) async {
    if (_closed) throw StateError('Closed');
    final Completer<Object?> completer = Completer<Object?>.sync();
    final int id = _idCounter++;
    _activeRequests[id] = completer;
    _commands.send((id, message));
    return await completer.future;
  }

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final RawReceivePort initPort = RawReceivePort();
    final Completer<(ReceivePort, SendPort)> connection =
        Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      SendPort commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };

    // Spawn the isolate.
    try {
      await Isolate.spawn<SendPort>(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }

  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    final (int id, Object? response) = message as (int, Object?);
    Completer<Object?> completer = _activeRequests.remove(id)!;

    if (response is RemoteError) {
      completer.completeError(response);
    } else {
      completer.complete(response);
    }

    if (_closed && _activeRequests.isEmpty) _responses.close();
  }

  static void _handleCommandsToIsolate(
    ReceivePort receivePort,
    SendPort sendPort,
  ) {
    receivePort.listen((dynamic message) {
      if (message == 'shutdown') {
        receivePort.close();
        return;
      }
      final (int id, String jsonText) = message as (int, String);
      try {
        final jsonData = jsonDecode(jsonText);
        sendPort.send((id, jsonData));
      } catch (e) {
        sendPort.send((id, RemoteError(e.toString(), '')));
      }
    });
  }

  static void _startRemoteIsolate(SendPort sendPort) {
    final ReceivePort receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);
    _handleCommandsToIsolate(receivePort, sendPort);
  }

  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
}

1. IsolateMessage로 주고받는 메시지를 규격화

위의 코드에서 먼저 살펴볼 것은 Worker Isolate에서 실행되는 _handleCommandsToIsolate()입니다. 이 함수에서는 아래와같이 receivePort.listen()을 실행해서 핸들러를 등록하며, Main Isolate에서 요청한 메시지를 처리해서 응답하는 구조로 되어있습니다.

static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((dynamic message) {
    if (message == 'shutdown') {
      receivePort.close();
      return;
    }
    final (int id, String jsonText) = message as (int, String);
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData));
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}

우선적으로 message는 dynamic 타입이므로 타입에 대한 불확실성이 있습니다. 형변환시에는 에러가 발생하지만, 메시지에 내용이 복잡해지면 복잡해질수록 어디서 문제가 발생하는지 파악하기 어려울 수 있습니다. 이러한 단점을 줄이고자, 다음과 같이 Isolate간 통신에 사용할 메시지 데이터 클래스를 작성해봅시다.

class IsolateMessage {
  final int id;
  final String command;
  final dynamic message;

  IsolateMessage(
    this.command,
    this.message,
    this.id,
  );
}

이 데이터 클래스는 예제를 참조해서, Worker Isolate에서 실행할 작업을 구분하기 위한 문자열 command, 인자값을 넘기기 위한 message, 그리고 요청을 응답 포트에서 수행할 Completer를 구분하기 위한 id값으로 구성되어있습니다. receivePort.listen()에 등록한 클로저의 인자값이 dynamic인 것은 ReceivePortStream<dynamic>을 구현한 클래스이므로 변경할 수 없지만, Main Isolate에서 호출하는 parseJson()함수에서 전송하는 메시지와 _handleCommandsToIsolate()함수에서 전달받는 메시지를 IsolateMessage 통일하면, dynamic 타입으로 주고받는 메시지가 항상 IsolateMessage 타입이라고 간주할 수 있으므로 안심하고 형변환이 가능합니다. (Stream<T>를 상속받는 ReceivePort를 새로 구현하는게 나을까했지만, 너무 과한 것 같았습니다. 별도로 패키지를 만들 것도 아니고.)

Main Isolate와 Worker Isolate가 IsolateMessage를 주고받도록, 아래와 같이 parseJson(), close(), _handleCommandsToIsolate() 세 개의 함수를 수정했습니다.

Future<Object?> parseJson(String command, {dynamic params}) async {
  if (_closed) throw StateError('Closed');
  final Completer<Object?> completer = Completer<Object?>.sync();
  final int id = _idCounter++;

  _activeRequests[id] = completer;
  _commands.send(IsolateMessage('parseJson', params, id));

  return await completer.future;
}

void close() {
  if (!_closed) {
    _closed = true;
    _commands.send(IsolateMessage('shutdown', null, -1));
    if (_activeRequests.isEmpty) _responses.close();
    print('--- port closed --- ');
  }
}

static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((dynamic message) {
    IsolateMessage taskMessage = message as IsolateMessage;

    if (taskMessage.command == 'shutdown') {
      receivePort.close();
      return;
    } else if (taskMessage.command == 'parseJson') {
      try {
        String jsonText = taskMessage.message as String;
        final jsonData = jsonDecode(jsonText);
        sendPort.send((id, jsonData));
      } catch (e) {
        sendPort.send((id, RemoteError(e.toString(), 'failed to run ${taskMessage.command} with ${taskMessage.message}')));
      }
    } else {
      sendPort.send((id, RemoteError(e.toString(), 'Wrong command! ${taskMessage.command} is not support!)));
    }
  });
}

이것으로 Main Isolate에서 Worker Isolate로 전달하는 메시지를 IsolateMessage 객체로 변경하여, Main Isolate가 Worker Isolate에서 수행하려고 하는 작업에 따라서 message를 다른 타입으로 형변환 할 일이 없어졌습니다. 또한 Worker Isolate에서 어떤 작업을 하는 중 에러가 발생했는지, 조금은 알기 쉬워졌습니다.

IsolateMessage.command을 기준으로 Worker Isolate에서 실행할 동작을 결정하므로, 하나의 Worker Isolate에서 조금 더 다양한 동작을 할 수 있게 됐습니다. 그렇지만 매번 Isolate를 사용할 때마다 Worker Isolate에 동작할 코드들을 추가하자면 코드가 불필요하게 길어지므로, Worker.spawn() 함수를 실행할 시 전달할 명령어와 실행할 핸들러를 정의할 수 있도록 코드를 수정해봅시다.

2. Worker Isolate에서 실행할 핸들러와 타입 가드를 위한 Task 클래스

기본적으로 위에서 작성한 코드는 Main Isolate에서 Worker Isolate로 dynamic message를 전달하고, 이 값을 각각 (int, String) Record 타입으로 형변환한 뒤, 문자열을 처리한 다음 int값을 키값으로 저장해놓은 Completer를 사용해서 응답을 처리하는 것이었습니다. 즉 dynamic Function(dynamic) 형태의 핸들러를 전달하면 IsolateMessage.message로 전달하는 인자값을 받아서 Record 타입으로 형변환한 뒤, Worker Isolate에서 처리한 결과값을 Main Isolate로 전달할 수 있습니다. 이 과정에서 dynamic message가 어떤 타입인지 미리 작성해둠으로써, 타입 가드를 구현하는 것도 가능합니다.

명령어와 핸들러, 그리고 핸들러에서 사용할 타입을 매핑하기 위해 아래와 같은 Task 데이터 클래스를 작성하겠습니다.

class Task {
  final dynamic Function(dynamic) handler;
  final Type requestType;
  final Type responseType;

  Task({
    required this.handler,
    required this.requestType,
    required this.responseType,
  });
}

여기서는 Worker Isolate에서 실행할 함수 dynamic Function(dynamic) handler와, requestTyperesponseType을 선언합니다. 이를 기반으로 Main Isolate가 Worker Isolate에 IsolateMessage를 전달했을 시, IsolateMessage.message가 올바른 타입으로 전달됐는지 확인할 수 있습니다.

3. Map<String, Task>Worker.spawn()에 전달하기

이제 위에서 선언한 Task 데이터 클래스를 통해서, Main Isolate가 Worker Isolate에서 실행할 동작들을 추상화해봅시다. 우선 Worker.spawn()에 인자값으로 Map<String, Task>을 전달해서, Worker 클래스를 생성할 시 실행할 동작들을 지정하도록 변경하도록 하겠습니다.

우선 spawn() 메서드에 인자값을 추가하고, Isolate.spawn()을 실행할 때 Map<String, Task>를 인자값으로 전달하도록 수정합니다.

static Future<Worker> spawn(Map<String, Task> tasks) async {
  final RawReceivePort initPort = RawReceivePort();
  final Completer<(ReceivePort, SendPort)> connection =
      Completer<(ReceivePort, SendPort)>.sync();
  initPort.handler = (initialMessage) {
    SendPort commandPort = initialMessage as SendPort;
    connection.complete((
      ReceivePort.fromRawReceivePort(initPort),
      commandPort,
    ));
  };

  try {
    await Isolate.spawn<(SendPort, Map<String, Task>)>(
      _startRemoteIsolate,
      (initPort.sendPort, tasks),
    );
  } on Object {
    initPort.close();
    rethrow;
  }

  final (ReceivePort receivePort, SendPort sendPort) =
      await connection.future;

  return Worker._(receivePort, sendPort);
}

여기서는 Isolate.spawn을 실행할 때 (SendPort, Map<String, Task>) Record 타입을 인자로 전달하여, _startRemoteIsolate 함수가 Worker Isolate에서 실행되도록 합니다.

static void _startRemoteIsolate(
      (SendPort, Map<String, Task>) args) {
  final (SendPort sendPort, Map<String, Task> tasks) = args;
  final ReceivePort receivePort = ReceivePort();
  sendPort.send(receivePort.sendPort);
  _handleTaskToIsolate(receivePort, sendPort, tasks);
}

그 뒤 _startRemoteIsolate()에서는 인자로 받은 Record 타입 args를 받아서, Map<String, Task> tasks_handleTaskToIsolate()로 전달합니다. _handleTaskToIsolate()함수는 Worker Isolate에서 실행되며, ReceivePort에 메시지가 수신됐을 때 인자로 전달받은 Map<String, Task> tasks에서 IsolateMessage.command에 해당하는 Task.handler를 찾아서 실행해야 합니다. 또한 Task에 정의되어있는 RequestType을 사용해서 타입 가드가 가능합니다. 아래는 이러한 내용을 반영한 _handleTaskToIsolate()입니다.

static void _handleTaskToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
  Map<String, Task> tasks,
) {
  receivePort.listen((dynamic message) {
    IsolateMessage taskMessage = message as IsolateMessage;

    if (taskMessage.command == 'shutdown') {
      receivePort.close();
    } else if (tasks.containsKey(taskMessage.command)) {
      if (tasks[taskMessage.command]?.requestType ==
          taskMessage.message.runtimeType) {
        final result =
            tasks[taskMessage.command]?.handler(taskMessage.message);

        sendPort.send(
          IsolateMessage(
            taskMessage.command,
            result,
            taskMessage.id,
          ),
        );
      } else {
        sendPort.send(
          IsolateMessage(
            taskMessage.command,
            RemoteError('param type is error',
                '${tasks[taskMessage.command]?.requestType} != ${taskMessage.message.runtimeType}'),
            taskMessage.id,
          ),
        );
      }
    } else {
      sendPort.send(
        IsolateMessage(
          taskMessage.command,
          RemoteError(
              'wrong command!', '${taskMessage.command} is not defined!'),
          taskMessage.id,
        ),
      );
    }
  });
}

여기서 ReceivePort.listen()에 등록한 클로저는 dynamic message를 수신해서 IsolateMessage로 형변환한 뒤, 다음과 같이 세 가지 분기로 나뉘어서 동작합니다.

  1. taskMessage.commandshutdown일 경우
  2. taskMessage.commandMap<String, Task> tasks의 키 값일 경우
  3. taskMessage.commandMap<String, Task> tasks의 키 값이 아닌 경우

1의 경우에는 ReceivePost.close()를 호출해야하는데, Worker Isolate를 종료할 필요가 없는 특수한 경우가 아니고서야 이 동작이 바뀔 일은 없습니다. 또한 포트를 닫고 Worker Isolate를 종료하는데는 별다른 인자값이 필요 없으므로, IsolateMessage.message만 확인한 뒤 바로 실행합니다.

2의 경우에는 Map<String, Task> tasksIsolateMessage.command가 포함되어있는 경우, 즉 Main Isolate에서 Worker.spawn()을 실행했을 때 넘겨준 명령어 목록에 정의해놓은 핸들러가 존재할 경우입니다. 이 경우에는 IsolateMessage.command에 해당하는 Task객체를 사용해서, IsolateMessage.message.runtimeTypeTask.requestType과 일치하는지 비교하고, 일치하는 경우에는 Task.handler()를 실행합니다. 만약 일치하지 않는 경우에는 잘못된 타입의 인자값이 넘어온 것이므로, RemoteError에 인자값이 일치하지 않는다는 오류 문구를 설정한 뒤, Main Isolate로 전달합니다.

3의 경우에는 Map<String, Task> tasksIsolateMessage.command가 포함되어있지 않은 경우, 즉 Main Isolate에서 Worker.spawn()을 실행했을 때 넘겨준 명령어 목록에 정의해놓은 핸들러가 없는 경우입니다. 이 경우에는 RemoteError에 전달한 명령어가 존재하지 않는다는 오류 문구를 설정한 뒤, Main Isolate로 전달합니다.

이 뒤에는 아래와 같이 Main Isolate에서 Worker Isolate가 송신한 메시지를 수신한 뒤 실행되는, _handleResponsesFromIsolate()를 수정합니다. dynamic 타입의 messageIsolateMessage로 변환한 뒤, taskMessage.messageRemoteError 타입이면 taskMessage.id에 해당하는 Completer.completeError()를 사용해서 에러를 전달합니다. 그 외의 경우에는 Completer.complete()를 사용해서 결과값인 taskMessage.message를 반환합니다.

void _handleResponsesFromIsolate(dynamic message) {
  IsolateMessage taskMessage = message as IsolateMessage;
  Completer<Object?> completer = _activeRequests.remove(taskMessage.id)!;

  if (taskMessage.message is RemoteError) {
    completer.completeError(taskMessage.message);
  } else {
    completer.complete(taskMessage.message);
  }

  if (_closed && _activeRequests.isEmpty) _responses.close();
}

마지막으로 parseJson()함수명을 task()로 변경한 뒤, dynamic params를 옵셔널하게 전달하게끔 수정합시다.

Future<Object?> task(String command, {dynamic params}) async {
  if (_closed) throw StateError('Closed');
  final Completer<Object?> completer = Completer<Object?>.sync();
  final int id = _idCounter++;

  _activeRequests[id] = completer;
  _commands.send(IsolateMessage(command, params, id));

  return await completer.future;
}

이제 Worker객체의 task() 함수를 호출하면 인자값으로 전달한 commandparams를 기반으로 IsolateMessage객체를 생성한 뒤 전달하여, 원하는 동작을 실행하게 됩니다.

4. 완성! Map<String, Task>의 작성과 사용.

이제 Map<String, Task>를 선언해서 각 명령어가 Worker Isolate로 전달됐을 때 어떤 동작을 수행할지 정의하고, Worker Isolate에서 처리한 내용을 수신할 수 있게 됐습니다. 아래는 위에서 작성한 코드를 실행하는 main() 함수입니다.

void main() async {
  Map<String, Task> tasks = {
    'parseJson': Task(
      requestType: String,
      responseType: String,
      handler: (dynamic params) => jsonDecode(params),
    ),
    'jsonToString': Task(
      requestType: List<int>,
      responseType: String,
      handler: (dynamic params) => params.join(', '),
    ),
    'hello': Task(
      requestType: Null,
      responseType: Null,
      handler: (dynamic params) => print('hello!'),
    ),
  };

  final worker = await Worker.spawn(tasks);
  print(
    await worker.task(
      'parseJson',
      params: '{"key":"value"}',
    ),
  );
  print(
    await worker.task(
      'jsonToString',
      params: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    ),
  );
  print(
    await worker.task(
      'parseJson',
      params: '"banana"',
    ),
  );
  print(
    await worker.task(
      'parseJson',
      params: '[true, false, null, 1, "string"]',
    ),
  );

  await worker.task('hello');

  try {
    await worker.task(
      'jsonToString',
      params: ['a', 'b', 'c'],
    );
  } on RemoteError catch (e) {
    print('${e.toString()}: ${e.stackTrace}');
  }

  print(
    await Future.wait(
      [
        worker.task('parseJson', params: '"yes"'),
        worker.task('parseJson', params: '"no"'),
      ],
    ),
  );
  worker.close();
}

main()함수의 출력 내용은 아래와 같습니다.

Connecting to VM Service at ws://127.0.0.1:49922/LItF6YV4M9o=/ws
{key: value}
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
banana
[true, false, null, 1, string]
hello!
param type is error: List<int> != List<String>
[yes, no]
--- port closed --- 

Exited.

출력 결과를 살펴보면 jsonToString을 실행할 시 인자값으로 ['a', 'b', 'c']을 전달했을 시 발생한 타입 관련 예외를, try-catch문에 의해 검출되어 출력되는 것을 볼 수 있습니다. 이것으로 최소한의 Isolate를 생성한 뒤 여러가지 처리를 하는 코드를 추상화하여, 좀 더 간단히 사용할 수 있게 됐습니다. :)

반응형