Flutter GUI 模式:异步任务
我最开始接触编程的时候,就对 GUI 非常着迷 —— 它就好像一扇观察计算机如何工作的窗户,让我能够将计算机的内部状态和外部世界联系在一起,感觉非常的酷。在研究生时,我接触的第一门语言是 Python,当时最流行的是 PyQt 的界面库绑定,我查了很多资料,使用它写了一些 GUI 应用。不过,在处理 IO/CPU 密集型任务的时候,我总是不得要领,总是让界面卡死好久,直到 IO/CPU 任务完成,中间界面状态不能被更新,用户体验非常不好。
这大约是因为我对并发编程的理解不深入,因而后来我又接触了 Java —— 它被认为原生支持并发编程,也使用 JavaFx 写了一些其他带有界面的应用,这时候我懂得使用 Worker,能够让后台运行的密集 IO/CPU 任务间隔性的更新状态到前端界面,增强了用户体验。不过,这个过程比较繁琐,并且大部分应用也没有这样细致的异步需求,尽管如此,异步任务的 GUI 程序在我看来还是很酷。
在我三年多的工作生涯中,我也写了很多 GUI 程序,技术栈五花八门,从 Swing 到 JavaFx,从 纯 HTML、JavaScript 到 React.js,然后是 SwiftUI 和 Flutter。这些 GUI 程序中,大部分都是 CRUD 增删改查,偶尔调用系统命令、或者执行一些简单计算,我大部分都没有使用异步任务,因为处理日常任务所需的应用太简单了,没有那么多复杂、耗时的任务需要处理。
不过,我最近还是遇到过一些比较复杂的任务,比如一个需要并行解析数个十兆左右的 Excel 文件并解析和合并的应用,以及一个需要并行在多个远程主机执行 SSH 命令并收集结果的应用。这些应用都需要处理大量的数据,并且需要并行执行,因此有了使用异步任务的契机。
下面展示了 Flutter 基于 Stream 流的异步任务响应式通知例子,这个例子展示了一种场景:编译本地项目(CPU 密集),然后向多台远程主机发送 SFTP 命令 —— 上传一些文件(IO 密集)。由于编译成功与否不确定,不同远程主机的连接速度、可达性可能不同,目标文件发送成功也不确定,因此迫切的需要通过 GUI 来展示各个步骤的状态,以便使用者能够清楚的知晓当前任务的执行情况并作出对应反应 —— 这可能是 GUI 相比较 CLI 所拥有绝对优势的领域了。
先展示一下界面效果:
很容易想到,对于 Spawn 命令,可以通过对线程输出的监听来获取其内容,通过其 exitCode 判断是否成功执行。的确,上图的编译步骤就是用了这样的模式。需要注意的是,这个过程需要小心的处理监听订阅,在界面关闭时及时关闭线程,清理订阅,如果使用了异步,那么执行 GUI 相关代码时先判断是否挂载。
startCompile() async {
setState(() {
start = true;
exitCode = null;
});
final cmds = widget.models.map((e) {
var pan = e.substring(0, 2);
var prefixCmd = pan.endsWith(":") ? "$pan && " : "";
return "${prefixCmd}cd $e && mvn clean install -Dmaven.test.skip=true -Dfile.encoding=UTF-8";
}).join(" && ");
process = await Process.start("cmd", [ "/c", cmds], environment: {
"PATH": widget.pathWithJavaHome,
"JAVA_HOME": widget.javaHome
});
subscriptions.add(process!.stdout
.transform(const Utf8Decoder())
.listen(terminal.write, onError: (e) {}));
subscriptions.add(process!.stderr
.transform(const Utf8Decoder())
.listen(terminal.write, onError: (e) {}));
process!.exitCode.then((value) {
if (context.mounted) {
setState(() {
exitCode = value;
start = false;
});
if (_actionAfterCompileDone) handleAction();
if (widget.exitAfterCompileSuccess > 0) {
Future.delayed(Duration(seconds: widget.exitAfterCompileSuccess), () {
if (context.mounted) exit();
});
}
}
});
}
exit() {
for (var sub in subscriptions) sub.cancel();
process?.kill();
context.pop();
}
但是,对于并行执行多个异步任务,不论是 IO 还是 CPU 密集,这种监听就很繁琐了:每个任务都需要小心的更新界面状态,处理异常,清理订阅,此外还可能有并发问题。解决这个问题的思路很简单,就是使用一个 Stream 流,异步业务通过向流中添加消息,界面通过 StreamBuilder,当收到新消息后根据消息内容渲染界面。这里的技巧在于,可以在内存中保留最近一条消息,业务可基于此消息追加内容或更新新的内容,实现类似于日志的追加输出或截断输出。
上图中多台远程主机执行的 CPU 密集任务(查找 jar 包)和 IO 密集任务(重命名文件、上传文件)就使用了这样的思想,通过 Stream 流来传递消息。
具体而言,下面的 handleSFTP
函数接受一个 func 函数,其定义了业务逻辑,此函数初始化 Stream 流并为多台设备(setting.currentCluster.nodes)初始化 SSH 和 SFTP 链接,然后将 SFTP 和往 Stream 中添加消息的处理器交给业务逻辑,并返回一个 StreamBuilder 对话框 Widget,所有 Stream 流的消息都会被 StreamBuilder 接收并响应式渲染。
handleSFTP(
Setting setting,
Future<void> Function(ClusterNode, SftpClient,
void Function(ScpTaskMsg, {bool append}) notify) func) async {
StreamController<ScpNodeMsgs> stream = StreamController();
final msg = ScpNodeMsgs(
tasks: Map.fromEntries(
(setting.currentCluster?.nodes ?? <ClusterNode>{}).map((node) {
return MapEntry(
node, ScpTaskMsg(message: "正在初始化", done: false, error: false));
})));
stream.add(msg);
setting.currentCluster?.nodes.forEach((node) async {
SSHClient? client;
SftpClient? sftp;
try {
client = SSHClient(await SSHSocket.connect(node.ip, 22),
username: node.user, onPasswordRequest: () => node.password);
msg.tasks[node] =
ScpTaskMsg(message: "已连接至 SSH", done: false, error: false);
stream.add(msg);
sftp = await client.sftp();
await func(node, sftp, (newMsg, {bool append = true}) {
msg.tasks[node] = newMsg.copyWith(
message: append
? "${msg.tasks[node]?.message}\n${newMsg.message}"
: newMsg.message);
stream.add(msg);
});
} catch (e) {
msg.tasks[node] =
ScpTaskMsg(done: true, error: true, message: "操作失败:$e");
stream.add(msg);
} finally {
debugPrint("closing sftp and client for ${node.ip} now...");
sftp?.close();
client?.close();
}
});
await showDialog(
context: context,
barrierDismissible: false,
builder: (context) {
return AlertDialog(
title: const Text("SFTP 操作"),
content: SingleChildScrollView(
child: StreamBuilder(
stream: stream.stream,
builder: (context, snapshot) {
if (snapshot.hasData) {
return Column(
crossAxisAlignment: CrossAxisAlignment.start,
mainAxisSize: MainAxisSize.min,
children: [
...snapshot.data!.tasks.entries.map((e) {
final node = e.key;
final msg = e.value;
return Padding(
padding: const EdgeInsets.only(bottom: 9),
child: Row(
children: [
msg.done
? msg.error
? const Icon(
Icons.error,
color: Colors.red)
: const Icon(
Icons.check,
color: Colors.green)
: const CircularProgressIndicator(),
const SizedBox(width: 10),
Expanded(
child: Column(
children: [
Text("节点 ${node.ip}"),
Text(msg.message)
]))]));
})]);
} else {
return const Text("正在加载");
}})),
actions: [TextButton(onPressed: context.pop, child: const Text("关闭"))]);
});
await stream.close();
}
消息的定义很简单,ScpTaskMsg
包含需要渲染在界面上的一句话,以及当前动作是否标记着失败,以及是否代表结束。ScpNodeMsgs
代表了所有节点的消息,在使用中,可直接通过 copyWith 来基于最后一条消息来生成新的消息,然后通过 add
添加到 stream 中。
(makeCollectionsUnmodifiable: false)
class ScpNodeMsgs with _$ScpNodeMsgs {
factory ScpNodeMsgs({({}) Map<ClusterNode, ScpTaskMsg> tasks}) =
_ScpNodeMsgs;
}
class ScpTaskMsg with _$ScpTaskMsg {
factory ScpTaskMsg(
{("") String message,
(false) bool done,
(false) bool error}) = _ScpTaskMsg;
factory ScpTaskMsg.fromJson(Map<String, dynamic> json) =>
_$ScpTaskMsgFromJson(json);
}
下面是 SFTP 的业务逻辑实现,这里先查找 jar 包,失败则发送一条失败消息,反之执行 PSCP 的 rename 备份,同样记录一条消息,之后执行替包操作,记录消息,这个过程如果失败,那么 try...catch 捕获并标记整个过程失败。
Future<void> sftpReplacePackage(ClusterNode node, SftpClient sftp,
void Function(ScpTaskMsg, {bool append}) notify) async {
String errMsg = "";
for (final model in modelSelected.entries) {
if (!model.value) continue;
final modelPath = model.key;
final modelName = p.basename(modelPath);
final modelDisplay =
"${p.dirname(modelPath).split(Platform.pathSeparator).lastOrNull ?? ""}${Platform.pathSeparator}$modelName";
try {
notify(ScpTaskMsg(message: "正在处理 $modelDisplay"));
final dir = Directory("${model.key}${Platform.pathSeparator}target");
notify(ScpTaskMsg(message: "$modelDisplay 正在查找 jar 包"));
final files = dir.listSync();
final jar = files.firstWhere((element) =>
element.path.endsWith(".jar") &&
!element.path.endsWith("sources.jar"));
final localJar = jar.path;
if (!await File(localJar).exists()) {
notify(
ScpTaskMsg(message: "$modelDisplay 未找到 jar 包,请先编译", error: true));
throw Exception("$modelDisplay 未找到 jar 包");
}
final jarName = localJar.split(Platform.pathSeparator).last;
var split = jarName.split("-");
final snap = split.removeLast().replaceAll(".jar", "");
final version = split.removeLast();
final modelName = split.join("-");
final remoteJar =
"${node.path}/system/com/xxx/$modelName/$version-$snap/$jarName";
final date = DateTime.now();
final time = "${date.month}_${date.day}_${date.hour}_${date.minute}";
final remoteBackupJarFullName = "$remoteJar.$time.bak";
notify(ScpTaskMsg(
message:
"$modelDisplay 备份到 ${p.basename(remoteBackupJarFullName)}"));
await sftp.rename(remoteJar, remoteBackupJarFullName);
notify(ScpTaskMsg(message: "$modelDisplay 正执行替包操作"));
final file = await sftp.open(remoteJar,
mode: SftpFileOpenMode.create |
SftpFileOpenMode.truncate |
SftpFileOpenMode.write);
await file.write(File(localJar).openRead().cast());
notify(ScpTaskMsg(message: "$modelDisplay 替包完成"));
} catch (e, st) {
notify(ScpTaskMsg(message: "$modelDisplay 替包失败:$e"));
debugPrintStack(stackTrace: st);
errMsg += "$modelDisplay 失败:$e\n";
}
}
notify(errMsg.isEmpty
? ScpTaskMsg(message: "已完成替包操作", done: true)
: ScpTaskMsg(message: "替包操作失败", error: true, done: true));
}
Flutter 的异步任务 GUI 展示大体如此,对于复杂多模块应用,可以分多个 Stream,然后在中间执行过滤、合并、映射,通过监听等方式最后合并到一个输出流通过 StreamBuilder 展示。当然,也可以不通过 StreamBuilder,通过监听 Stream 回调实时 setState 更新界面文本也可。