fp-tsを使ったFail Firstせずに複数プロミスタスクをフロー化して実行するワークアラウンドの紹介
こんにちわ。nap5です。
fp-tsを使ったFail Firstせずに複数プロミスタスクをフロー化して実行するワークアラウンドの紹介です。
まずは、ミニデモからです。TaskモナドをつかうとFail Firstの振る舞いをせずに処理を継続することができます。ほかのモナド系はたいてい即時でエラーを返して処理を終了してくれます。
import { z } from "zod";
import { TaskEither } from "fp-ts/lib/TaskEither";
import { tryCatch } from "fp-ts/lib/TaskEither";
import { ApplicativePar, ApplicativeSeq } from "fp-ts/lib/Task";
import { sequence } from 'fp-ts/lib/Array'
const CustomErrorDataSchema = z.custom<CustomError>();
type CustomErrorData = z.infer<typeof CustomErrorDataSchema>;
type UserId = number;
class CustomError extends Error {
constructor(message: string, option?: { cause: unknown }) {
super(message, option);
}
}
const doN = (n: number): TaskEither<CustomErrorData, UserId> => {
return tryCatch(
async () => {
if (n % 2 === 0) {
return Promise.reject(
new CustomError(`Something went wrong... [${n}]`)
);
}
return n;
},
(e) => e as CustomErrorData
);
};
(async () => {
const numbers = [1, 2, 3, 4, 5];
const tasks = numbers.map(doN);
const resultsPar = await sequence(ApplicativePar)(tasks)() // @see https://github.com/gcanti/fp-ts/issues/1626#issuecomment-1000891972
const resultsSeq = await sequence(ApplicativeSeq)(tasks)()
console.log(resultsPar, resultsSeq)
})();
これをもとに少し応用してバッチ処理のようなユースケースでデモを作ってみました。
import { pipe } from "fp-ts/lib/function";
import seedrandom from "seedrandom";
import * as TE from "fp-ts/lib/TaskEither";
import * as T from "fp-ts/lib/Task";
import * as A from "fp-ts/lib/Array";
import * as E from "fp-ts/lib/Either";
const rng = seedrandom("fixed-seed");
interface ErrorFormat {
fn: string;
detail: Record<string, unknown>;
}
interface CustomErrorOptions extends ErrorOptions {
cause?: ErrorFormat;
}
class CustomError extends Error {
cause?: ErrorFormat;
constructor(message: string, options?: CustomErrorOptions) {
super(message, options);
this.cause = options?.cause;
}
}
export class ValidationError extends CustomError {
name = "ValidationError" as const;
constructor(message: string, options?: CustomErrorOptions) {
super(message, options);
}
}
export type User = {
id: number;
name: string;
isDeleted?: boolean;
};
const createUser = (formData: User): TE.TaskEither<ValidationError, User> =>
rng() < 0.1
? TE.left(
new ValidationError("Failed createUser.", {
cause: { fn: "createUser", detail: formData },
}),
)
: TE.right({ ...formData, isDeleted: rng() > 0.8 });
const deleteUser = (formData: User): TE.TaskEither<ValidationError, User> =>
!formData.isDeleted
? TE.right({ ...formData, isDeleted: !formData.isDeleted })
: TE.left(
new ValidationError("Already deleted.", {
cause: { fn: "deleteUser", detail: formData },
}),
);
const notifyToUser = (formData: User): TE.TaskEither<ValidationError, User> =>
rng() < 0.8
? TE.right({ ...formData })
: TE.left(
new ValidationError("Failed notification.", {
cause: { fn: "notifyToUser", detail: formData },
}),
);
const createUsers = (data: User[]) =>
A.sequence(T.ApplicativePar)(data.map(createUser));
const deleteUsers = (data: User[]) =>
A.sequence(T.ApplicativePar)(data.map(deleteUser));
const notifyToUsers = (data: User[]) =>
A.sequence(T.ApplicativePar)(data.map(notifyToUser));
export const demo = (data: User[]) =>
pipe(
data,
createUsers,
T.chain((results) => {
const { left: lefts, right: rights } = A.separate(results);
const deleteUsersTask = deleteUsers(rights);
return pipe(
deleteUsersTask,
T.map((d) => [...d, ...lefts.map(E.left)]),
); // combined previous errors
}),
T.chain((results: E.Either<ValidationError, User>[]) => {
const { left: lefts, right: rights } = A.separate(results);
const notifyToUsersTask = notifyToUsers(rights);
return pipe(
notifyToUsersTask,
T.map((d) => [...d, ...lefts.map(E.left)]),
); // combined previous errors
}),
);
コアのトランザクションプロセスは以下になります。
これらは引数に複数のユーザーデータを受け取り、プロミスタスク化して実行するプロセスになります。
const createUsers = (data: User[]) => A.sequence(T.ApplicativePar)(data.map(createUser));
const deleteUsers = (data: User[]) => A.sequence(T.ApplicativePar)(data.map(deleteUser));
const notifyToUsers = (data: User[]) => A.sequence(T.ApplicativePar)(data.map(notifyToUser));
このインターフェースを保ちながら、前回処理の結果(T型とE型)のエラーハンドリングを行っている部分は以下になります。
export const demo = (data: User[]) =>
pipe(
data,
createUsers,
T.chain((results) => {
const { left: lefts, right: rights } = A.separate(results);
const deleteUsersTask = deleteUsers(rights);
return pipe(
deleteUsersTask,
T.map((d) => [...d, ...lefts.map(E.left)]),
); // combined previous errors
}),
T.chain((results: E.Either<ValidationError, User>[]) => {
const { left: lefts, right: rights } = A.separate(results);
const notifyToUsersTask = notifyToUsers(rights);
return pipe(
notifyToUsersTask,
T.map((d) => [...d, ...lefts.map(E.left)]),
); // combined previous errors
}),
);
達成したいシナリオに応じてTaskモナドのchainで前回プロセス結果をseparateしながら、EitherモナドのエラーをCombineしているのがポイントです。前回プロセスのうち正常系データはそのまま、次のプロセスの引数に渡しております。
const { left: lefts, right: rights } = A.separate(results);
const notifyToUsersTask = notifyToUsers(rights);
最後にデモコードになります。
簡単ですが、以上です。
この記事が気に入ったらサポートをしてみませんか?