JavaScript/TypeScriptメモ

TypeScript fp-tsのTaskEither+sequenceでPromiseの順次処理、並列処理をシンプルに書く

fp-tsのTaskEitherでPromiseの並列・直列処理をシンプルに

fp-tsのTaskEitherを使うことで、複数のPromiseについての処理をシンプルに書くことができます。

TaskEitherがわからない、というときには、下記の記事をご覧ください。

TypeScript Promiseをfp-tsのTask、TaskEitherに変換する

下準備:TaskEitherオブジェクトを準備する

まず、下記のようにTaskEither型のオブジェクトを4つ準備します。

import * as TE from 'fp-ts/TaskEither';

const sleep = (ms: number) =>
  new Promise<void>((resolve) => {
    setTimeout(() => {
      resolve();
    }, ms);
  });

const createTasks = () => {
  const te1 = TE.tryCatch(
    () =>
      sleep(1000).then(() => {
        console.log(`te1`);
        return 1;
      }),
    (e) => e as string[]
  );

  const te2 = TE.tryCatch(
    () =>
      sleep(2000).then(() => {
        console.log(`te2`);
        return `a`;
      }),
    (e) => e as string[]
  );

  const te3 = TE.tryCatch(
    () =>
      sleep(1500).then(() => {
        console.log(`te3`);
        return Promise.reject([`error te3-1`, `error te3-2`]);
      }),
    (e) => e as string[]
  );

  const te4 = TE.tryCatch(
    () =>
      sleep(2500).then(() => {
        console.log(`te4`);
        return Promise.reject([`error te4`]);
      }),
    (e) => e as string[]
  );
  return [te1, te2, te3, te4] as const;
};

「te1」~「te4」まで、次のような挙動の4つのTaskEitherオブジェクトを準備しています。

変数名 待ち時間 resolve reject
te1 1000ms 1 -
te2 2000ms 'a' -
te3 1500ms - ['error te3-1', 'error te3-2']
te4 2500ms - ['error te4']

たとえば、

  • 「te1」は、1000ms後に「1」をresolveする
  • 「te4」は、2500ms後に「['error te4']」をrejectする

ということです。

今回の4つのオブジェクトを見ると、

  1. resolveされる値の型が違う(「te1」はnumber型、「te2」はstring型)
  2. rejectされる値の型はstring[]型

ことに注意してください。

この4つのTaskEitherオブジェクトを様々な方法で実行してみます。

1. 並列実行-1つでもrejectされたら全体をrejectにする(Promise.all相当)

まず、この4つのTaskEitherオブジェクトを並列実行し、1つでもrejectされたら全体をreject扱いにします。

reject時には、一番最初にrejectされたときのエラーメッセージを表示させます。

要するに、Promise.allとまったく同じ挙動になるようにしてみます。

import * as TE from 'fp-ts/TaskEither';
import { sequenceT } from 'fp-ts/Apply';

const sleep = (ms: number) => { 
  // (略)
} 

const createTasks = () => {
  // (略)
  return [te1, te2, te3, te4] as const;
};

const test1 = async () => {
  const [te1, te2, te3, te4] = createTasks();

  const ret = await sequenceT(TE.ApplyPar)(te1, te2, te3, te4)();  return ret;
};

test1()
  .then((v) => {
    console.log(v);
  })
  .catch(() => {
    console.log(`error`);
  });

//↓実行結果
//te1
//te3
//te2
//te4
//{ _tag: 'Left', left: [ 'error te3-1', 'error te3-2' ] }

sequenceT(TE.ApplyPar)で、複数のTaskEitherオブジェクトから、並列実行をするTaskEitherオブジェクトを作る関数が得られます。

そこで、sequenceT(TE.ApplyPar)(te1, te2, te3, te4)と、並列実行したいTaskEitherを引数で渡すと、並列実行用のTaskEitherオブジェクトができあがります。

あとは、末尾に「()」を付けることで、指定したTaskEitherが並列に実行されます。

その結果、待ち時間の少ない順(「te1」→「te3」→「te2」→「te4」)に実行が完了します。

今回は、「te3」と「te4」がrejectされますので、そのうち最初にrejectされた「te3」のエラーメッセージがLeft<string[]>型の値として得られます。

(参考)全TaskEitherがresolveされた場合

全TaskEitherがresolveされた場合には、Promise.allと同様に、実行結果が配列で得られます。

たとえば、次のように「te1」「te2」だけを並列実行してみます。

すると、待ち時間の少ない順(「te1」→「te2」)の順に実行が完了し、「te1」「te2」の実行結果がRight<[number, string]>型の値として得られます。

import * as TE from 'fp-ts/TaskEither';
import { sequenceT } from 'fp-ts/Apply';

const sleep = (ms: number) => { 
  // (略)
} 

const createTasks = () => {
  // (略)
  return [te1, te2, te3, te4] as const;
};

const test1_2 = async () => {
  const [te1, te2] = createTasks();
  const ret = await sequenceT(TE.ApplyPar)(te1, te2)();  return ret;
};

test1_2()
  .then((v) => {
    console.log(v);
  })
  .catch(() => {
    console.log(`error`);
  });

//↓実行結果
//te1
//te2
//{ _tag: 'Right', right: [ 1, 'a' ] }

2. 直列実行-1つでもrejectされたら全体をrejectにする

今度は、「te1」~「te4」までの4つのTaskEitherオブジェクトを直列実行し、1つでもrejectされたら全体をreject扱いにします。

reject時には、一番最初にrejectされたときのエラーメッセージを表示させます。

import * as TE from 'fp-ts/TaskEither';
import { sequenceT } from 'fp-ts/Apply';

const sleep = (ms: number) => { 
  // (略)
} 

const createTasks = () => {
  // (略)
  return [te1, te2, te3, te4] as const;
};

const test2 = async () => {
  const [te1, te2, te3, te4] = createTasks();
  const ret = await sequenceT(TE.ApplySeq)(te1, te2, te3, te4)();  return ret;
};

test2()
  .then((v) => {
    console.log(v);
  })
  .catch(() => {
    console.log(`error`);
  });

//↓実行結果
//te1
//te2
//te3
//{ _tag: 'Left', left: [ 'error te3-1', 'error te3-2' ] }

sequenceT(TE.ApplySeq)で、複数のTaskEitherを直列実行する関数が得られます。

先ほどのパターンと比べると「TE.ApplyPar」が「TE.ApplySeq」に変わっただけで、そのほかはまったく同じです

あとは、直列実行したいTaskEitherオブジェクトを引数で渡して、新たなTaskEitherオブジェクトを作成・実行することで、指定した順(「te1」→「te2」→「te3」)に直列実行されます。

そして、「te3」がrejectされたところで、そのエラーメッセージがLeft<string[]>型の値として得られます。

直列実行ですので、「te3」がrejectされたら、それ以降のTaskEitherオブジェクトは実行されません。

このように、TaskEitherを使うと、直列実行も簡単に記述することができます。

3. 並列実行-rejectされた全メッセージを統合して取得する

指定したすべてのTaskEitherを並列に実行します。

すべてresolveされたときの挙動は「1.」と同じですが、rejectされたものがあるときには、すべてのTaskEitherのエラーメッセージを統合して取得します

import * as TE from 'fp-ts/TaskEither';
import { sequenceT } from 'fp-ts/Apply';
import * as T from 'fp-ts/Task';import { getSemigroup } from 'fp-ts/lib/Array';
const sleep = (ms: number) => { 
  // (略)
} 

const createTasks = () => {
  // (略)
  return [te1, te2, te3, te4] as const;
};

const test3 = async () => {
  const [te1, te2, te3, te4] = createTasks();

  const v = TE.getApplicativeTaskValidation(T.ApplyPar, getSemigroup<string>());  const ret = await sequenceT(v)(te1, te2, te3, te4)();
  return ret;
};

test3()
  .then((v) => {
    console.log(v);
  })
  .catch(() => {
    console.log(`error`);
  });

//↓実行結果
//te1
//te3
//te2
//te4
//{ _tag: 'Left', left: [ 'error te3-1', 'error te3-2', 'error te4' ] }

まず、下記の行で、sequenceTに渡す用のオブジェクトを作ります。

  const v = TE.getApplicativeTaskValidation(T.ApplyPar, getSemigroup<string>());

細かいところはわからないのですが、いわゆるValidationパターン(EitherのLeft側が複数ある場合にはArray.concatで結合していく)操作を定義しているのだろうと思います。

このオブジェクトが生成できたら、あとは、先ほどまでと手順は、ほとんど同じです。

sequenceT(v)で、複数のTaskEitherオブジェクトから、並列実行をしつつValidationパターンに従ってTaskEitherオブジェクトを作る関数が得られます。

そこで、sequenceT(v)(te1, te2, te3, te4)と、並列実行したいTaskEitherを引数で渡すと、並列実行(Validationパターン)用のTaskEitherオブジェクトができあがります。

あとは、末尾に「()」を付けることで、指定したTaskEitherが並列に実行されます。

その結果、待ち時間の少ない順(「te1」→「te3」→「te2」→「te4」)に実行が完了します。

今回は、「te3」と「te4」がrejectされますので、それらのエラーメッセージを統合したものが、Left<string[]>型の値として得られます。

4. 直列実行-rejectされた全メッセージを統合して取得する

パターンが読めてきたのではないかと思うのですが、「3.」のT.ApplyParT.ApplySeqに変えることで、直列実行に切り替えることができます。

「3.」と、ほとんど同じですので、詳細は省略します。

5. 並列実行-すべてのTaskEitherの結果を個別に取得する(Promise.allSettled相当)

指定したすべてのTaskEitherを並列に実行します。

resolve、rejectに関わらず、すべてのTaskEitherの結果を個別に取得してみましょう。

import * as TE from 'fp-ts/TaskEither';
import { sequenceT } from 'fp-ts/Apply';
import * as T from 'fp-ts/Task';
const sleep = (ms: number) => { 
  // (略)
} 

const createTasks = () => {
  // (略)
  return [te1, te2, te3, te4] as const;
};

const test5 = async () => {
  const [te1, te2, te3, te4] = createTasks();

  const ret = await sequenceT(T.ApplyPar)(te1, te2, te3, te4)();  return ret;
};

test5()
  .then((v) => {
    console.log(v);
  })
  .catch(() => {
    console.log(`error`);
  });

//↓実行結果
//te1
//te3
//te2
//te4
//[
//  { _tag: 'Right', right: 1 },
//  { _tag: 'Right', right: 'a' },
//  { _tag: 'Left', left: [ 'error te3-1', 'error te3-2' ] },
//  { _tag: 'Left', left: [ 'error te4' ] }
//]

「1.」と、ほとんど同じですが、17行目のsequenceTの引数が、「1.」ではTE.ApplyParだったのが、今回はT.ApplyParに変わっています(TE→Tに変わっている)。

こうすることで、各TaskEitherの実行結果をEither型の配列の形で取得することができます。

6. 直列実行-すべてのTaskEitherの結果を個別に取得する

これも、今までとパターンは、まったく同じです。「5.」のT.ApplyParT.ApplySeqに変えれば、直列実行に切り替えることができます。

7. 並列実行-オブジェクトのプロパティに格納されたTaskEitherをまとめて実行する

今まで「1.」~「6.」では、実行したいTaskEitherオブジェクトを、引数として設定していました。

今回は、TaskEitherオブジェクトを、あるオブジェクトのプロパティとして設定してみます。

import * as TE from 'fp-ts/TaskEither';
import { sequenceS } from 'fp-ts/Apply';
const sleep = (ms: number) => { 
  // (略)
} 

const createTasks = () => {
  // (略)
  return [te1, te2, te3, te4] as const;
};

const test7 = async () => {
  const [te1, te2, te3, te4] = createTasks();

  const ret = await sequenceS(TE.ApplyPar)({ te1, te2, te3, te4 })();  return ret;
};

test7()
  .then((v) => {
    console.log(v);
  })
  .catch(() => {
    console.log(`error`);
  });

//↓実行結果
//te1
//te3
//te2
//te4
//{ _tag: 'Left', left: [ 'error te3-1', 'error te3-2' ] }

「1.」のプログラムと比較すると、下記の3点が違います。

  • 2行目でsequenceTではなくsequenceSをインポート
  • 16行目でsequenceTの代わりにsequenceSを指定
  • 同じく16行目で、引数を({te1, te2, te3, te4})とオブジェクト形式で指定

今回のように、一部のTaskEitherでrejectされた場合には、実行結果自体は「1.」と変わりません。

(参考)全TaskEitherがresolveされた場合

全TaskEitherがresolveされた場合には、実行結果は「1.」と変わります。

import * as TE from 'fp-ts/TaskEither';
import { sequenceS } from 'fp-ts/Apply';
const sleep = (ms: number) => { 
  // (略)
} 

const createTasks = () => {
  // (略)
  return [te1, te2, te3, te4] as const;
};

const test7_2 = async () => {
  const [te1, te2] = createTasks();
  const ret = await sequenceS(TE.ApplyPar)({ te1, te2 })();  return ret;
};

test7_2()
  .then((v) => {
    console.log(v);
  })
  .catch(() => {
    console.log(`error`);
  });

//↓実行結果
//te1
//te2
//{ _tag: 'Right', right: { te1: 1, te2: 'a' } }

このように、引数として渡したオブジェクトと同じ構成で、プロパティにresolveされた実行結果が格納されます

全パターン試したわけではありませんが、パターン「2.」~「6.」についても、sequenceSsequenceTに変えれば、同じように動くのではないかと思います。

まとめ

TaskEitherを使うと、Promiseについての処理を非常にシンプルに書くことができるようになります。

非常に便利ですので、ぜひ、使ってみてください。