Flutter GUI 模式:异步任务

我最开始接触编程的时候,就对 GUI 非常着迷 —— 它就好像一扇观察计算机如何工作的窗户,让我能够将计算机的内部状态和外部世界联系在一起,感觉非常的酷。在研究生时,我接触的第一门语言是 Python,当时最流行的是 PyQt 的界面库绑定,我查了很多资料,使用它写了一些 GUI 应用。不过,在处理 IO/CPU 密集型任务的时候,我总是不得要领,总是让界面卡死好久,直到 IO/CPU 任务完成,中间界面状态不能被更新,用户体验非常不好。

这大约是因为我对并发编程的理解不深入,因而后来我又接触了 Java —— 它被认为原生支持并发编程,也使用 JavaFx 写了一些其他带有界面的应用,这时候我懂得使用 Worker,能够让后台运行的密集 IO/CPU 任务间隔性的更新状态到前端界面,增强了用户体验。不过,这个过程比较繁琐,并且大部分应用也没有这样细致的异步需求,尽管如此,异步任务的 GUI 程序在我看来还是很酷。

在我三年多的工作生涯中,我也写了很多 GUI 程序,技术栈五花八门,从 Swing 到 JavaFx,从 纯 HTML、JavaScript 到 React.js,然后是 SwiftUI 和 Flutter。这些 GUI 程序中,大部分都是 CRUD 增删改查,偶尔调用系统命令、或者执行一些简单计算,我大部分都没有使用异步任务,因为处理日常任务所需的应用太简单了,没有那么多复杂、耗时的任务需要处理。

不过,我最近还是遇到过一些比较复杂的任务,比如一个需要并行解析数个十兆左右的 Excel 文件并解析和合并的应用,以及一个需要并行在多个远程主机执行 SSH 命令并收集结果的应用。这些应用都需要处理大量的数据,并且需要并行执行,因此有了使用异步任务的契机。

下面展示了 Flutter 基于 Stream 流的异步任务响应式通知例子,这个例子展示了一种场景:编译本地项目(CPU 密集),然后向多台远程主机发送 SFTP 命令 —— 上传一些文件(IO 密集)。由于编译成功与否不确定,不同远程主机的连接速度、可达性可能不同,目标文件发送成功也不确定,因此迫切的需要通过 GUI 来展示各个步骤的状态,以便使用者能够清楚的知晓当前任务的执行情况并作出对应反应 —— 这可能是 GUI 相比较 CLI 所拥有绝对优势的领域了。

先展示一下界面效果:

stream

很容易想到,对于 Spawn 命令,可以通过对线程输出的监听来获取其内容,通过其 exitCode 判断是否成功执行。的确,上图的编译步骤就是用了这样的模式。需要注意的是,这个过程需要小心的处理监听订阅,在界面关闭时及时关闭线程,清理订阅,如果使用了异步,那么执行 GUI 相关代码时先判断是否挂载。

startCompile() async {
  setState(() {
    start = true;
    exitCode = null;
  });
  final cmds = widget.models.map((e) {
    var pan = e.substring(0, 2);
    var prefixCmd = pan.endsWith(":") ? "$pan && " : "";
    return "${prefixCmd}cd $e && mvn clean install -Dmaven.test.skip=true -Dfile.encoding=UTF-8";
  }).join(" && ");
  process = await Process.start("cmd", [ "/c", cmds], environment: {
    "PATH": widget.pathWithJavaHome,
    "JAVA_HOME": widget.javaHome
  });
  subscriptions.add(process!.stdout
      .transform(const Utf8Decoder())
      .listen(terminal.write, onError: (e) {}));
  subscriptions.add(process!.stderr
      .transform(const Utf8Decoder())
      .listen(terminal.write, onError: (e) {}));
  process!.exitCode.then((value) {
    if (context.mounted) {
      setState(() {
        exitCode = value;
        start = false;
      });
      if (_actionAfterCompileDone) handleAction();
      if (widget.exitAfterCompileSuccess > 0) {
        Future.delayed(Duration(seconds: widget.exitAfterCompileSuccess), () {
          if (context.mounted) exit();
        });
      }
    }
  });
}
exit() {
  for (var sub in subscriptions) sub.cancel();
  process?.kill();
  context.pop();
}

但是,对于并行执行多个异步任务,不论是 IO 还是 CPU 密集,这种监听就很繁琐了:每个任务都需要小心的更新界面状态,处理异常,清理订阅,此外还可能有并发问题。解决这个问题的思路很简单,就是使用一个 Stream 流,异步业务通过向流中添加消息,界面通过 StreamBuilder,当收到新消息后根据消息内容渲染界面。这里的技巧在于,可以在内存中保留最近一条消息,业务可基于此消息追加内容或更新新的内容,实现类似于日志的追加输出或截断输出。

上图中多台远程主机执行的 CPU 密集任务(查找 jar 包)和 IO 密集任务(重命名文件、上传文件)就使用了这样的思想,通过 Stream 流来传递消息。

具体而言,下面的 handleSFTP 函数接受一个 func 函数,其定义了业务逻辑,此函数初始化 Stream 流并为多台设备(setting.currentCluster.nodes)初始化 SSH 和 SFTP 链接,然后将 SFTP 和往 Stream 中添加消息的处理器交给业务逻辑,并返回一个 StreamBuilder 对话框 Widget,所有 Stream 流的消息都会被 StreamBuilder 接收并响应式渲染。

handleSFTP(
    Setting setting,
    Future<void> Function(ClusterNode, SftpClient,
            void Function(ScpTaskMsg, {bool append}) notify) func) async {
  StreamController<ScpNodeMsgs> stream = StreamController();
  final msg = ScpNodeMsgs(
      tasks: Map.fromEntries(
          (setting.currentCluster?.nodes ?? <ClusterNode>{}).map((node) {
    return MapEntry(
        node, ScpTaskMsg(message: "正在初始化", done: false, error: false));
  })));
  stream.add(msg);
  setting.currentCluster?.nodes.forEach((node) async {
    SSHClient? client;
    SftpClient? sftp;
    try {
      client = SSHClient(await SSHSocket.connect(node.ip, 22),
          username: node.user, onPasswordRequest: () => node.password);
      msg.tasks[node] =
          ScpTaskMsg(message: "已连接至 SSH", done: false, error: false);
      stream.add(msg);
      sftp = await client.sftp();
      await func(node, sftp, (newMsg, {bool append = true}) {
        msg.tasks[node] = newMsg.copyWith(
            message: append
                ? "${msg.tasks[node]?.message}\n${newMsg.message}"
                : newMsg.message);
        stream.add(msg);
      });
    } catch (e) {
      msg.tasks[node] =
          ScpTaskMsg(done: true, error: true, message: "操作失败:$e");
      stream.add(msg);
    } finally {
      debugPrint("closing sftp and client for ${node.ip} now...");
      sftp?.close();
      client?.close();
    }
  });
  await showDialog(
      context: context,
      barrierDismissible: false,
      builder: (context) {
        return AlertDialog(
            title: const Text("SFTP 操作"),
            content: SingleChildScrollView(
                child: StreamBuilder(
                    stream: stream.stream,
                    builder: (context, snapshot) {
                      if (snapshot.hasData) {
                        return Column(
                            crossAxisAlignment: CrossAxisAlignment.start,
                            mainAxisSize: MainAxisSize.min,
                            children: [
                              ...snapshot.data!.tasks.entries.map((e) {
                                final node = e.key;
                                final msg = e.value;
                                return Padding(
                                    padding: const EdgeInsets.only(bottom: 9),
                                    child: Row(
                                        children: [
                                          msg.done
                                            ? msg.error
                                                ? const Icon(
                                                    Icons.error,
                                                    color: Colors.red)
                                                : const Icon(
                                                    Icons.check,
                                                    color: Colors.green)
                                            : const CircularProgressIndicator(),
                                          const SizedBox(width: 10),
                                          Expanded(
                                              child: Column(
                                                  children: [
                                                Text("节点 ${node.ip}"),
                                                Text(msg.message)
                                              ]))]));
                              })]);
                      } else {
                        return const Text("正在加载");
                      }})),
            actions: [TextButton(onPressed: context.pop, child: const Text("关闭"))]);
      });
  await stream.close();
}

消息的定义很简单,ScpTaskMsg 包含需要渲染在界面上的一句话,以及当前动作是否标记着失败,以及是否代表结束。ScpNodeMsgs 代表了所有节点的消息,在使用中,可直接通过 copyWith 来基于最后一条消息来生成新的消息,然后通过 add 添加到 stream 中。

(makeCollectionsUnmodifiable: false)
class ScpNodeMsgs with _$ScpNodeMsgs {
  factory ScpNodeMsgs({({}) Map<ClusterNode, ScpTaskMsg> tasks}) =
      _ScpNodeMsgs;
}


class ScpTaskMsg with _$ScpTaskMsg {
  factory ScpTaskMsg(
      {("") String message,
      (false) bool done,
      (false) bool error}) = _ScpTaskMsg;

  factory ScpTaskMsg.fromJson(Map<String, dynamic> json) =>
      _$ScpTaskMsgFromJson(json);
}

下面是 SFTP 的业务逻辑实现,这里先查找 jar 包,失败则发送一条失败消息,反之执行 PSCP 的 rename 备份,同样记录一条消息,之后执行替包操作,记录消息,这个过程如果失败,那么 try...catch 捕获并标记整个过程失败。

Future<void> sftpReplacePackage(ClusterNode node, SftpClient sftp,
    void Function(ScpTaskMsg, {bool append}) notify) async {
  String errMsg = "";
  for (final model in modelSelected.entries) {
    if (!model.value) continue;
    final modelPath = model.key;
    final modelName = p.basename(modelPath);
    final modelDisplay =
        "${p.dirname(modelPath).split(Platform.pathSeparator).lastOrNull ?? ""}${Platform.pathSeparator}$modelName";
    try {
      notify(ScpTaskMsg(message: "正在处理 $modelDisplay"));
      final dir = Directory("${model.key}${Platform.pathSeparator}target");
      notify(ScpTaskMsg(message: "$modelDisplay 正在查找 jar 包"));
      final files = dir.listSync();
      final jar = files.firstWhere((element) =>
          element.path.endsWith(".jar") &&
          !element.path.endsWith("sources.jar"));
      final localJar = jar.path;
      if (!await File(localJar).exists()) {
        notify(
            ScpTaskMsg(message: "$modelDisplay 未找到 jar 包,请先编译", error: true));
        throw Exception("$modelDisplay 未找到 jar 包");
      }
      final jarName = localJar.split(Platform.pathSeparator).last;
      var split = jarName.split("-");
      final snap = split.removeLast().replaceAll(".jar", "");
      final version = split.removeLast();
      final modelName = split.join("-");
      final remoteJar =
          "${node.path}/system/com/xxx/$modelName/$version-$snap/$jarName";
      final date = DateTime.now();
      final time = "${date.month}_${date.day}_${date.hour}_${date.minute}";
      final remoteBackupJarFullName = "$remoteJar.$time.bak";
      notify(ScpTaskMsg(
          message:
              "$modelDisplay 备份到 ${p.basename(remoteBackupJarFullName)}"));
      await sftp.rename(remoteJar, remoteBackupJarFullName);
      notify(ScpTaskMsg(message: "$modelDisplay 正执行替包操作"));
      final file = await sftp.open(remoteJar,
          mode: SftpFileOpenMode.create |
              SftpFileOpenMode.truncate |
              SftpFileOpenMode.write);
      await file.write(File(localJar).openRead().cast());
      notify(ScpTaskMsg(message: "$modelDisplay 替包完成"));
    } catch (e, st) {
      notify(ScpTaskMsg(message: "$modelDisplay 替包失败:$e"));
      debugPrintStack(stackTrace: st);
      errMsg += "$modelDisplay 失败:$e\n";
    }
  }
  notify(errMsg.isEmpty
      ? ScpTaskMsg(message: "已完成替包操作", done: true)
      : ScpTaskMsg(message: "替包操作失败", error: true, done: true));
}

Flutter 的异步任务 GUI 展示大体如此,对于复杂多模块应用,可以分多个 Stream,然后在中间执行过滤、合并、映射,通过监听等方式最后合并到一个输出流通过 StreamBuilder 展示。当然,也可以不通过 StreamBuilder,通过监听 Stream 回调实时 setState 更新界面文本也可。