[Flutter] 여러개의 비동기 처리에 대한 경쟁 상태의 처리

2021. 7. 27. 17:21Programming/Flutter

반응형

여러개의 비동기 처리에 대한 경쟁 상태의 처리

Dart에서는 Future<T> 클래스를 사용해서 비동기에 대한 처리를 할 수 있습니다. async/await에 대한 내용은 비동기(Asynchronous)와 async/await, 그리고 여러개의 await에 대한 비동기 처리에서 한 차례 언급했으므로, 이번에는 가볍게 패스할께요. 이번에 살펴볼 내용은 여러개의 비동기 처리가 경쟁 상태(Race condition)로 주어졌을 때, 어떻게 처리하는지에 대한 얘기를 해보려고 합니다.

엥? 비동기에 경쟁 상태? 그거 Future<T>.any<T>() 호출하면 끝 아니냐?

Dart의 dart:async 패키지에서 제공하는 Future패키지에는 Future<T>.any<T>라는 메소드를 제공합니다.

Future<T>.any<T>(Iterable<Future<T>> futures)

Returns the result of the first future in futures to complete.

The returned future is completed with the result of the first future in futures to report that it is complete, whether it's with a value or an error. The results of all the other futures are discarded.

If futures is empty, or if none of its futures complete, the returned future never completes.

네, 대충 살펴보면 Iterable로 넘겨준 futures 중에, 성공을 하건 에러가 나건 제일 빨리 끝난 녀석의 결과값 T를 반환해주는 메소드입니다. 이 녀석 하나만 있으면, Future<T>.wait<T>와는 달리 인자로 넘겨준 Iterable<Future<T>> 중 가장 빨리 처리가 끝난 Future만 얻을 수 있으므로, 경쟁 상태에 대한 고민이 끝나게 되죠!

...나는 가장 처리가 빨리 끝난 녀석도 중요하지만, 에러난 녀석은 필요없고 성공한 녀석만 필요한데...?

Future<T>.any<T>를 사용할때 로직이 생각과는 다르게 동작하는 경우가 있습니다. 바로 이 글을 쓰게 된 이유기도 하죠. Future<T>.any<T>는 인자로 전달받은 Iterable<Future<T>> 중 가장 빨리 처리가 된 Future의 실행결과를 반환하므로, 하나의 Future가 빠르게 에러를 토해내면 다른 Future는 성공을 하건 에러를 뱉건 버려지게됩니다.

예를 들어서 A API, B API, C API 세 개의 HTTP API가 주어졌다고 가정해봅시다. 세 개의 API를 동시에 호출한 뒤, 가장 빨리 실행된 API의 결과값에 따라 처리를 해줘야하는 상황인거죠. 좀 더 이해하기 쉽게끔 조건을 추가해봅시다. A API는 빠르지만 네트워크 에러가 발생할 확률이 높고, C API는 느리지만 네트워크 에러가 발생할 확률이 낮다고 해볼께요. 이런 상황에서는 단순히 Future<T>.any<T>를 사용하면, API B나 API C의 실행결과와는 상관없이 에러가 발생하는 경우가 생길거에요. 이유는 앞에서 설명했듯, Future<T>.any<T>는 에러의 발생과 상관없이 Iterable<Future<T>> 중 가장 빨리 처리된 녀석 외에는 전부 내다버리기 때문이에요.

// API는 A, B, C 순으로 실행속도가 빠르고,
// A, B, C 순으로 네트워크 에러 발생 확률이 높다고 가정한다.
HttpResponse response = await Future.any([callApiA, callApiB, callApiC]);

Future<T>.any<T>를 뜯어보자

Future<T>.any<T>는 다음과 같이 생겼습니다. (Flutter 2.2.3 기준)

static Future<T> any<T>(Iterable<Future<T>> futures) {
  var completer = new Completer<T>.sync();
  void onValue(T value) {
    if (!completer.isCompleted) completer.complete(value);
  }

  void onError(Object error, StackTrace stack) {
    if (!completer.isCompleted) completer.completeError(error, stack);
  }

  for (var future in futures) {
    future.then(onValue, onError: onError);
  }
  return completer.future;
}

Completer<T>라는 녀석이 등장하긴 했지만, 코드 자체는 그렇게 어렵지 않습니다. 인자로 넘어온 Iterable<Future<T>>의 모든 요소에 then으로 onValue, onError 콜백을 등록해줍니다. 그리고 onError 혹은 onValue가 실행될 때 completer.isCompleted값을 확인한 후 아직 완료되지 않은 상태이면, completer.complete 혹은 completer.completeError를 실행시켜주죠. 이후에 완료되는 콜백들은, completer.isComplete의 값이 true이므로 전부 버려지게 될 겁니다. 와, 간단해!

Completer<T>에 대한 내용은 API 문서를 참조하도록 합시다. 단순히 Future에 대한 상태값이라고 이해하고 넘겨도 괜찮을거에요, 아마도...?

error가 발생한 인자는 버리고, 성공한 인자의 실행값만 반환하는 anySuccess를 만들어보자

뭔가 길게 늘어놓고보니까 좀 있어보이지만, 실제로는 굉장히 간단한 얘기입니다. Future<T>.any<T>onError 내에서, completer.completeError만 호출하지 않으면 되거든요.

Future<T> anySuccess<T>(Iterable<Future<T>> futures) {
  var completer = new Completer<T>.sync();
  int errorCount = 0;
  void onValue(T value) {
    if (!completer.isCompleted) completer.complete(value);
  }

  void onError(Object error, StackTrace stack) {
    if (!completer.isCompleted) {
      errorCount++;

      if (errorCount == futures.length) {
        completer.completeError(error, stack);
      }
    }
  }

  for (var future in futures) {
    future.then(onValue, onError: onError);
  }
  return completer.future;
}

위의 코드는 Future<T>.any<T>를 기반으로 작성한 anySuccess입니다. 변경된 사항은 onError가 Iterable<Future<T>>의 길이만큼 발생했을때만 completer.completeError(error, stack);를 실행한다는 점이죠. 자, 이걸로 간단하게 에러 없이 가장 빨리 처리된 Future의 결과값만 추려내고, 전부 실패했을 경우에는 마지막에 발생한 error에 대해서 반환해주는 anySuccess가 완성됐습니다.

* Extension methods 확인 결과 static 메서드는 extension이 불가능하다고 나와있어서, 일반 메서드로 작성했습니다.

난 에러가 아니라 특정 값을 먼저 뱉은 녀석이 필요한데...?

특정 값이 필요하면 Iterable<Future<T>> 요소 내부에서 특정 값이 아닐 때 Exception을 던지면 되잖아, 이 자식아...! 라는 생각이 들 수도 있습니다. 뭐, 어렵지는 않으니까 대충 만들어보죠. Iterable<Future<T>> 외에 특정 값 K targetValue를 추가해준 뒤, 해당 값이 아니면 error로 간주하면 간단하게 추려낼 수 있습니다.


Future<T> anyValue<T, K>(Iterable<Future<T>> futures, K targetValue) {
  var completer = new Completer<T>.sync();
  int errorCount = 0;

  void onValue(T value) {
    if (!completer.isCompleted) {
      if (value == targetValue)
        completer.complete(value);
      else {
        errorCount++;

        if (errorCount == futures.length) {
          completer.completeError(
            Exception("Not found target value."),
          );
        }
      }
    }
  }

  void onError(Object error, StackTrace stack) {
    if (!completer.isCompleted) {
      errorCount++;

      if (errorCount == futures.length) {
        completer.completeError(error, stack);
      }
    }
  }

  for (var future in futures) {
    future.then(onValue, onError: onError);
  }

  return completer.future;
}

위의 함수는 Future를 처리하는 중 Exception이 발생하지 않더라도, 특정 값이 발생하지 않는 경우에는 Exception으로 간주해서 errorCount값을 증가시키고 있습니다. 대충 어떻게 활용할 수 있냐면, 위의 예시에서 A API, B API, C API의 상태 코드가 200이 아닌 경우 내부에서 예외로 던지게끔 작성되어있지 않더라도 처리해줄 수 있죠. 에? 뭔가 와닿지 않는다구요?

맞아요, 저도 사실 만들어놓고 어따 쓰나 하고 있었어요.

마무리

대충 이번에는 Flutter/Dart를 사용해서 여러개의 비동기 처리 중, 경쟁 상태(Race condition)에 놓였을 시 어떻게 처리할건지에 대해 정리해봤습니다. 아마 이런 특수한 경우는 일반적이지 않은데다가, Isolate를 사용하지 않는다면 Javascript와 마찬가지로 Single thread로 처리되기 때문에 공유 자원에 대한 경쟁 상태에 대한 고려는 필요하지 않을거에요.

하지만 Future<T>.any<T>Future<T>.wait<T> 등, Future<T>의 내부 구현을 살펴보면 도움되는 내용이 많을거에요.

긴 글 읽어주셔서 감사합니다. :)

반응형