2024. 7. 31. 11:32ㆍProgramming/Flutter
Isolate에 대해 알아보자에서 Isolate를 사용할 시 유의해야 할 점과, 포트를 사용해서 Isolate 사이에 여러개의 메시지를 주고받기에 기재되어있는 예제를 살펴봤습니다. 유의할 점 중에는 Isolate 갯수가 CPU 코어 갯수±2개 정도를 유지하는게 좋다는 내용이 있었는데요. 이번에는 포트를 사용해서 Isolate 사이에 여러개의 메시지를 주고받기에서 살펴본 내용을 기반으로, 하나의 Isolate를 사용해서 여러개의 작업을 할 수 있도록 수정해보겠습니다.
먼저 예제를 살펴봅시다. 포트를 사용해서 Isolate 사이에 여러개의 메시지를 주고받기에 기재되어있는 예제는 아래와 같습니다.
import 'dart:async';
import 'dart:convert';
import 'dart:isolate';
void main() async {
final worker = await Worker.spawn();
print(await worker.parseJson('{"key":"value"}'));
print(await worker.parseJson('"banana"'));
print(await worker.parseJson('[true, false, null, 1, "string"]'));
print(
await Future.wait(
[
worker.parseJson('"yes"'),
worker.parseJson('"no"'),
],
),
);
worker.close();
}
class Worker {
final SendPort _commands;
final ReceivePort _responses;
final Map<int, Completer<Object?>> _activeRequests = {};
int _idCounter = 0;
bool _closed = false;
Future<Object?> parseJson(String message) async {
if (_closed) throw StateError('Closed');
final Completer<Object?> completer = Completer<Object?>.sync();
final int id = _idCounter++;
_activeRequests[id] = completer;
_commands.send((id, message));
return await completer.future;
}
static Future<Worker> spawn() async {
// Create a receive port and add its initial message handler
final RawReceivePort initPort = RawReceivePort();
final Completer<(ReceivePort, SendPort)> connection =
Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
SendPort commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};
// Spawn the isolate.
try {
await Isolate.spawn<SendPort>(_startRemoteIsolate, (initPort.sendPort));
} on Object {
initPort.close();
rethrow;
}
final (ReceivePort receivePort, SendPort sendPort) =
await connection.future;
return Worker._(receivePort, sendPort);
}
Worker._(this._responses, this._commands) {
_responses.listen(_handleResponsesFromIsolate);
}
void _handleResponsesFromIsolate(dynamic message) {
final (int id, Object? response) = message as (int, Object?);
Completer<Object?> completer = _activeRequests.remove(id)!;
if (response is RemoteError) {
completer.completeError(response);
} else {
completer.complete(response);
}
if (_closed && _activeRequests.isEmpty) _responses.close();
}
static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
) {
receivePort.listen((dynamic message) {
if (message == 'shutdown') {
receivePort.close();
return;
}
final (int id, String jsonText) = message as (int, String);
try {
final jsonData = jsonDecode(jsonText);
sendPort.send((id, jsonData));
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}
static void _startRemoteIsolate(SendPort sendPort) {
final ReceivePort receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
_handleCommandsToIsolate(receivePort, sendPort);
}
void close() {
if (!_closed) {
_closed = true;
_commands.send('shutdown');
if (_activeRequests.isEmpty) _responses.close();
print('--- port closed --- ');
}
}
}
1. IsolateMessage로 주고받는 메시지를 규격화
위의 코드에서 먼저 살펴볼 것은 Worker Isolate에서 실행되는 _handleCommandsToIsolate()
입니다. 이 함수에서는 아래와같이 receivePort.listen()
을 실행해서 핸들러를 등록하며, Main Isolate에서 요청한 메시지를 처리해서 응답하는 구조로 되어있습니다.
static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
) {
receivePort.listen((dynamic message) {
if (message == 'shutdown') {
receivePort.close();
return;
}
final (int id, String jsonText) = message as (int, String);
try {
final jsonData = jsonDecode(jsonText);
sendPort.send((id, jsonData));
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), '')));
}
});
}
우선적으로 message는 dynamic 타입이므로 타입에 대한 불확실성이 있습니다. 형변환시에는 에러가 발생하지만, 메시지에 내용이 복잡해지면 복잡해질수록 어디서 문제가 발생하는지 파악하기 어려울 수 있습니다. 이러한 단점을 줄이고자, 다음과 같이 Isolate간 통신에 사용할 메시지 데이터 클래스를 작성해봅시다.
class IsolateMessage {
final int id;
final String command;
final dynamic message;
IsolateMessage(
this.command,
this.message,
this.id,
);
}
이 데이터 클래스는 예제를 참조해서, Worker Isolate에서 실행할 작업을 구분하기 위한 문자열 command, 인자값을 넘기기 위한 message, 그리고 요청을 응답 포트에서 수행할 Completer를 구분하기 위한 id값으로 구성되어있습니다. receivePort.listen()
에 등록한 클로저의 인자값이 dynamic
인 것은 ReceivePort
가 Stream<dynamic>
을 구현한 클래스이므로 변경할 수 없지만, Main Isolate에서 호출하는 parseJson()
함수에서 전송하는 메시지와 _handleCommandsToIsolate()
함수에서 전달받는 메시지를 IsolateMessage
통일하면, dynamic
타입으로 주고받는 메시지가 항상 IsolateMessage
타입이라고 간주할 수 있으므로 안심하고 형변환이 가능합니다. (Stream<T>
를 상속받는 ReceivePort
를 새로 구현하는게 나을까했지만, 너무 과한 것 같았습니다. 별도로 패키지를 만들 것도 아니고.)
Main Isolate와 Worker Isolate가 IsolateMessage
를 주고받도록, 아래와 같이 parseJson()
, close()
, _handleCommandsToIsolate()
세 개의 함수를 수정했습니다.
Future<Object?> parseJson(String command, {dynamic params}) async {
if (_closed) throw StateError('Closed');
final Completer<Object?> completer = Completer<Object?>.sync();
final int id = _idCounter++;
_activeRequests[id] = completer;
_commands.send(IsolateMessage('parseJson', params, id));
return await completer.future;
}
void close() {
if (!_closed) {
_closed = true;
_commands.send(IsolateMessage('shutdown', null, -1));
if (_activeRequests.isEmpty) _responses.close();
print('--- port closed --- ');
}
}
static void _handleCommandsToIsolate(
ReceivePort receivePort,
SendPort sendPort,
) {
receivePort.listen((dynamic message) {
IsolateMessage taskMessage = message as IsolateMessage;
if (taskMessage.command == 'shutdown') {
receivePort.close();
return;
} else if (taskMessage.command == 'parseJson') {
try {
String jsonText = taskMessage.message as String;
final jsonData = jsonDecode(jsonText);
sendPort.send((id, jsonData));
} catch (e) {
sendPort.send((id, RemoteError(e.toString(), 'failed to run ${taskMessage.command} with ${taskMessage.message}')));
}
} else {
sendPort.send((id, RemoteError(e.toString(), 'Wrong command! ${taskMessage.command} is not support!)));
}
});
}
이것으로 Main Isolate에서 Worker Isolate로 전달하는 메시지를 IsolateMessage
객체로 변경하여, Main Isolate가 Worker Isolate에서 수행하려고 하는 작업에 따라서 message
를 다른 타입으로 형변환 할 일이 없어졌습니다. 또한 Worker Isolate에서 어떤 작업을 하는 중 에러가 발생했는지, 조금은 알기 쉬워졌습니다.
IsolateMessage.command
을 기준으로 Worker Isolate에서 실행할 동작을 결정하므로, 하나의 Worker Isolate에서 조금 더 다양한 동작을 할 수 있게 됐습니다. 그렇지만 매번 Isolate를 사용할 때마다 Worker Isolate에 동작할 코드들을 추가하자면 코드가 불필요하게 길어지므로, Worker.spawn()
함수를 실행할 시 전달할 명령어와 실행할 핸들러를 정의할 수 있도록 코드를 수정해봅시다.
2. Worker Isolate에서 실행할 핸들러와 타입 가드를 위한 Task 클래스
기본적으로 위에서 작성한 코드는 Main Isolate에서 Worker Isolate로 dynamic message
를 전달하고, 이 값을 각각 (int, String)
Record 타입으로 형변환한 뒤, 문자열을 처리한 다음 int
값을 키값으로 저장해놓은 Completer
를 사용해서 응답을 처리하는 것이었습니다. 즉 dynamic Function(dynamic)
형태의 핸들러를 전달하면 IsolateMessage.message
로 전달하는 인자값을 받아서 Record 타입으로 형변환한 뒤, Worker Isolate에서 처리한 결과값을 Main Isolate로 전달할 수 있습니다. 이 과정에서 dynamic message
가 어떤 타입인지 미리 작성해둠으로써, 타입 가드를 구현하는 것도 가능합니다.
명령어와 핸들러, 그리고 핸들러에서 사용할 타입을 매핑하기 위해 아래와 같은 Task
데이터 클래스를 작성하겠습니다.
class Task {
final dynamic Function(dynamic) handler;
final Type requestType;
final Type responseType;
Task({
required this.handler,
required this.requestType,
required this.responseType,
});
}
여기서는 Worker Isolate에서 실행할 함수 dynamic Function(dynamic) handler
와, requestType
및 responseType
을 선언합니다. 이를 기반으로 Main Isolate가 Worker Isolate에 IsolateMessage
를 전달했을 시, IsolateMessage.message
가 올바른 타입으로 전달됐는지 확인할 수 있습니다.
3. Map<String, Task>
를 Worker.spawn()
에 전달하기
이제 위에서 선언한 Task
데이터 클래스를 통해서, Main Isolate가 Worker Isolate에서 실행할 동작들을 추상화해봅시다. 우선 Worker.spawn()
에 인자값으로 Map<String, Task>
을 전달해서, Worker
클래스를 생성할 시 실행할 동작들을 지정하도록 변경하도록 하겠습니다.
우선 spawn()
메서드에 인자값을 추가하고, Isolate.spawn()
을 실행할 때 Map<String, Task>
를 인자값으로 전달하도록 수정합니다.
static Future<Worker> spawn(Map<String, Task> tasks) async {
final RawReceivePort initPort = RawReceivePort();
final Completer<(ReceivePort, SendPort)> connection =
Completer<(ReceivePort, SendPort)>.sync();
initPort.handler = (initialMessage) {
SendPort commandPort = initialMessage as SendPort;
connection.complete((
ReceivePort.fromRawReceivePort(initPort),
commandPort,
));
};
try {
await Isolate.spawn<(SendPort, Map<String, Task>)>(
_startRemoteIsolate,
(initPort.sendPort, tasks),
);
} on Object {
initPort.close();
rethrow;
}
final (ReceivePort receivePort, SendPort sendPort) =
await connection.future;
return Worker._(receivePort, sendPort);
}
여기서는 Isolate.spawn
을 실행할 때 (SendPort, Map<String, Task>)
Record 타입을 인자로 전달하여, _startRemoteIsolate
함수가 Worker Isolate에서 실행되도록 합니다.
static void _startRemoteIsolate(
(SendPort, Map<String, Task>) args) {
final (SendPort sendPort, Map<String, Task> tasks) = args;
final ReceivePort receivePort = ReceivePort();
sendPort.send(receivePort.sendPort);
_handleTaskToIsolate(receivePort, sendPort, tasks);
}
그 뒤 _startRemoteIsolate()
에서는 인자로 받은 Record 타입 args
를 받아서, Map<String, Task> tasks
를 _handleTaskToIsolate()
로 전달합니다. _handleTaskToIsolate()
함수는 Worker Isolate에서 실행되며, ReceivePort
에 메시지가 수신됐을 때 인자로 전달받은 Map<String, Task> tasks
에서 IsolateMessage.command
에 해당하는 Task.handler
를 찾아서 실행해야 합니다. 또한 Task
에 정의되어있는 RequestType
을 사용해서 타입 가드가 가능합니다. 아래는 이러한 내용을 반영한 _handleTaskToIsolate()
입니다.
static void _handleTaskToIsolate(
ReceivePort receivePort,
SendPort sendPort,
Map<String, Task> tasks,
) {
receivePort.listen((dynamic message) {
IsolateMessage taskMessage = message as IsolateMessage;
if (taskMessage.command == 'shutdown') {
receivePort.close();
} else if (tasks.containsKey(taskMessage.command)) {
if (tasks[taskMessage.command]?.requestType ==
taskMessage.message.runtimeType) {
final result =
tasks[taskMessage.command]?.handler(taskMessage.message);
sendPort.send(
IsolateMessage(
taskMessage.command,
result,
taskMessage.id,
),
);
} else {
sendPort.send(
IsolateMessage(
taskMessage.command,
RemoteError('param type is error',
'${tasks[taskMessage.command]?.requestType} != ${taskMessage.message.runtimeType}'),
taskMessage.id,
),
);
}
} else {
sendPort.send(
IsolateMessage(
taskMessage.command,
RemoteError(
'wrong command!', '${taskMessage.command} is not defined!'),
taskMessage.id,
),
);
}
});
}
여기서 ReceivePort.listen()
에 등록한 클로저는 dynamic message
를 수신해서 IsolateMessage
로 형변환한 뒤, 다음과 같이 세 가지 분기로 나뉘어서 동작합니다.
taskMessage.command
가shutdown
일 경우taskMessage.command
가Map<String, Task> tasks
의 키 값일 경우taskMessage.command
가Map<String, Task> tasks
의 키 값이 아닌 경우
1의 경우에는 ReceivePost.close()
를 호출해야하는데, Worker Isolate를 종료할 필요가 없는 특수한 경우가 아니고서야 이 동작이 바뀔 일은 없습니다. 또한 포트를 닫고 Worker Isolate를 종료하는데는 별다른 인자값이 필요 없으므로, IsolateMessage.message
만 확인한 뒤 바로 실행합니다.
2의 경우에는 Map<String, Task> tasks
에 IsolateMessage.command
가 포함되어있는 경우, 즉 Main Isolate에서 Worker.spawn()
을 실행했을 때 넘겨준 명령어 목록에 정의해놓은 핸들러가 존재할 경우입니다. 이 경우에는 IsolateMessage.command
에 해당하는 Task
객체를 사용해서, IsolateMessage.message.runtimeType
이 Task.requestType
과 일치하는지 비교하고, 일치하는 경우에는 Task.handler()
를 실행합니다. 만약 일치하지 않는 경우에는 잘못된 타입의 인자값이 넘어온 것이므로, RemoteError
에 인자값이 일치하지 않는다는 오류 문구를 설정한 뒤, Main Isolate로 전달합니다.
3의 경우에는 Map<String, Task> tasks
에 IsolateMessage.command
가 포함되어있지 않은 경우, 즉 Main Isolate에서 Worker.spawn()
을 실행했을 때 넘겨준 명령어 목록에 정의해놓은 핸들러가 없는 경우입니다. 이 경우에는 RemoteError
에 전달한 명령어가 존재하지 않는다는 오류 문구를 설정한 뒤, Main Isolate로 전달합니다.
이 뒤에는 아래와 같이 Main Isolate에서 Worker Isolate가 송신한 메시지를 수신한 뒤 실행되는, _handleResponsesFromIsolate()
를 수정합니다. dynamic
타입의 message
를 IsolateMessage
로 변환한 뒤, taskMessage.message
가 RemoteError
타입이면 taskMessage.id
에 해당하는 Completer.completeError()
를 사용해서 에러를 전달합니다. 그 외의 경우에는 Completer.complete()
를 사용해서 결과값인 taskMessage.message
를 반환합니다.
void _handleResponsesFromIsolate(dynamic message) {
IsolateMessage taskMessage = message as IsolateMessage;
Completer<Object?> completer = _activeRequests.remove(taskMessage.id)!;
if (taskMessage.message is RemoteError) {
completer.completeError(taskMessage.message);
} else {
completer.complete(taskMessage.message);
}
if (_closed && _activeRequests.isEmpty) _responses.close();
}
마지막으로 parseJson()
함수명을 task()
로 변경한 뒤, dynamic params
를 옵셔널하게 전달하게끔 수정합시다.
Future<Object?> task(String command, {dynamic params}) async {
if (_closed) throw StateError('Closed');
final Completer<Object?> completer = Completer<Object?>.sync();
final int id = _idCounter++;
_activeRequests[id] = completer;
_commands.send(IsolateMessage(command, params, id));
return await completer.future;
}
이제 Worker
객체의 task()
함수를 호출하면 인자값으로 전달한 command
와 params
를 기반으로 IsolateMessage
객체를 생성한 뒤 전달하여, 원하는 동작을 실행하게 됩니다.
4. 완성! Map<String, Task>
의 작성과 사용.
이제 Map<String, Task>
를 선언해서 각 명령어가 Worker Isolate로 전달됐을 때 어떤 동작을 수행할지 정의하고, Worker Isolate에서 처리한 내용을 수신할 수 있게 됐습니다. 아래는 위에서 작성한 코드를 실행하는 main()
함수입니다.
void main() async {
Map<String, Task> tasks = {
'parseJson': Task(
requestType: String,
responseType: String,
handler: (dynamic params) => jsonDecode(params),
),
'jsonToString': Task(
requestType: List<int>,
responseType: String,
handler: (dynamic params) => params.join(', '),
),
'hello': Task(
requestType: Null,
responseType: Null,
handler: (dynamic params) => print('hello!'),
),
};
final worker = await Worker.spawn(tasks);
print(
await worker.task(
'parseJson',
params: '{"key":"value"}',
),
);
print(
await worker.task(
'jsonToString',
params: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
),
);
print(
await worker.task(
'parseJson',
params: '"banana"',
),
);
print(
await worker.task(
'parseJson',
params: '[true, false, null, 1, "string"]',
),
);
await worker.task('hello');
try {
await worker.task(
'jsonToString',
params: ['a', 'b', 'c'],
);
} on RemoteError catch (e) {
print('${e.toString()}: ${e.stackTrace}');
}
print(
await Future.wait(
[
worker.task('parseJson', params: '"yes"'),
worker.task('parseJson', params: '"no"'),
],
),
);
worker.close();
}
이 main()
함수의 출력 내용은 아래와 같습니다.
Connecting to VM Service at ws://127.0.0.1:49922/LItF6YV4M9o=/ws
{key: value}
1, 2, 3, 4, 5, 6, 7, 8, 9, 10
banana
[true, false, null, 1, string]
hello!
param type is error: List<int> != List<String>
[yes, no]
--- port closed ---
Exited.
출력 결과를 살펴보면 jsonToString
을 실행할 시 인자값으로 ['a', 'b', 'c']
을 전달했을 시 발생한 타입 관련 예외를, try-catch문에 의해 검출되어 출력되는 것을 볼 수 있습니다. 이것으로 최소한의 Isolate를 생성한 뒤 여러가지 처리를 하는 코드를 추상화하여, 좀 더 간단히 사용할 수 있게 됐습니다. :)
'Programming > Flutter' 카테고리의 다른 글
[Dart] Isolate에 대해 알아보자 (0) | 2024.07.29 |
---|---|
Pigeon을 사용해서 여러개의 인터페이스를 생성할 때 발생할 수 있는 에러 정리 (0) | 2024.06.24 |
Pigeon을 사용하여 Type-safety한 네이티브 코드 작성하기 (0) | 2024.06.21 |
Provider의 ChangeNotifier와 Dispose와 비동기 함수 (0) | 2024.06.20 |
최상위 Navigator와 MaterialApp, 그리고 Navigator.push()와 GetX.to() (0) | 2022.07.05 |