在起始的那篇《金融 Python 即服务:业务自助的数据服务模式》,我们介绍了:使用 Python 如何使用作为数据系统的 wrapper 层?在这一篇文章里,我们将继续之前的话题,介绍如何使用 Python 作为计算引擎核心的胶水层,即:如何使用 Python 构建 DAG(有向无环图,Directed Acyclic Graph) 任务?
除此,还可以了解一下,如何设计增量 DAG 计算?先看一下增量计算的概念:
增量计算(Incremental computing),是一种软件功能,每当一条数据发生更改时,它都会尝试通过仅重新计算依赖于更改数据的输出来节省时间。
常见的领域有:
- GUI 应用, 诸如于 React 的 Dom Diff
- 不断变化的大型计算,诸如于金融计算、电子表格、大数据系统
- 构建系统,诸如于 Gradle、Bazel、Rustc 等
所以,在开始之前,让我们先看一个简单的例子,Excel 如何实现增量计算。
引子 1:Excel 的增量计算
众所周知,Excel 是使用最广泛的数据分析工具。当我们使用了 Excel 中的公式之后,当我们修改了 A 单元格的值,对应的结果会自动发生变化。而如果在这时,还有其它依赖于此单元格的值时,对应的结果也会发生变化。如下图所示:
出自 《How to Recalculate a Spreadsheet》
在 Microsoft 官方的文档里(Excel 重新计算),可以看到对应的触发重新计算场景:输入新数据、删除或插入行或列等等。在 Excel 中,工作表的计算可视为包含三个阶段的过程:
- 构造依赖关系树
- 构造计算链
- 重新计算单元格
一旦触发了重新计算,Excel 会重新构造依赖关系树和计算链,并依赖于此的所有单元格标记为 ”脏单元格“。随后,根据计算链指定的顺序重新计算。通常来说,在我们设计依赖分析时,假定的是函数是不可变的。但是呢,还存在一些特殊的函数类型,诸如于文档中提到的:
- 异步函数 (UDF)。
- 可变函数。即哪怕参数没有变化时,值也可能修改。诸如于 Now、Today 等。
这意味着,我们在设计增量计算时,需要考虑到这个场景的问题。从原理和实现来说,它一点并不算太复杂,有诸如于
从注解 DAG 到增量 DAG 设计
DAG (有向无环图,Directed Acyclic Graph)是一种常用数据结构,仅就 DAG 而言,它已经在我们日常的各种工具中存在:
- 依赖系统。诸如如 NPM、Yarn、Gradle、Cargo 等
- 人工智能。如机器学习等
- 数据流系统。如编译器、Apache Spark、Apache Airflow 等。
- 数据可视化。常用的 Graphviz,又或者是各个语言里的 Network 相关的库,诸如于 Python 的 NetworkX。
当我们从任务编排和数据等的角度来看,DAG 的面向普通人术语是叫工作流(Workflow)。
常规 DAG 到函数式 DAG
通常情况下,实现一个 DAG 非常的简单 —— 只是数据结构。在使用时,也比较简单,如下是 Cytoscape 的 API 示例:
cy.add([
{ group: 'nodes', data: { id: 'n0' }, position: { x: 100, y: 100 } },
{ group: 'nodes', data: { id: 'n1' }, position: { x: 200, y: 200 } },
{ group: 'edges', data: { id: 'e0', source: 'n0', target: 'n1' } }
]);
而这一类 DAG 是静态的,当我们需要结合些任务时,就会需要添加函数。由此便会稍微复杂一些,再现看个示例:
comp = Computation()
comp.add_node('a')
comp.add_node('b', lambda a: a+1)
comp.add_node('c', lambda a, b: 2*a)
comp.add_node('d', lambda b, c: b + c)
comp.add_node('e', lambda c: c + 1)
comp.compute('d')
comp.get_value_dict()
上述的代码中,是 Loman 框架的示例,其中的 lambda a: a+1
是 Python 的 Lambda 表达式。Loman 会在运行时,分析这个 Lambda,获得 Lambda 中的参数,随后添加对应的计算依赖。
Loman 示例
而在多数场景之下,往往是采用注解的形式,诸如于 Airflow、Gradle 等。
基于注解与条件的 DAG 函数
回到研究的开始,如美银证券的 Quartz 的 DSL 扩展(Little languages),便是在 Loman 的形式上进行了一步扩展。使用注解代替了 Lambda:
class C:
@dag
def f1(self, x, y):
return self.f2(x) + y
@dag
def f2(self, x):
return x * x
围绕于这个注解,Quartz 在这一层的实现上,包含了四个特性:DAG、记忆化(memoization)、持久化、时间旅行调试(time travel)。考虑到 Quartz 并不是一个开源的实现,社区上的材料不一定靠谱,所以我们还是再看看 Apache Ariflow 的实现。引用官网的示例:
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperatorA DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2022, 1, 1), schedule="0 0 * * *") as dag:
Tasks are represented as operators
hello = BashOperator(task_id="hello", bash_command="echo hello")
@task()
def airflow():
print("airflow")Set dependencies between tasks
hello >> airflow()
从实现上来说,Apache Airflow 的 DAG 实现本着 “工作流即代码” 的思想设计的。上面代码中,比较有意思的是 >>
语法,其是在任务之间定义了一个依赖关系并控制任务的执行顺序。
增量 DAG 注解:Gradle —— 监听输入与输出
在编译上,Gradle 也是支持增量编译(也是一种增量计算)的,我们可以先看个简单的示例:
abstract class IncrementalReverseTask extends DefaultTask {
@Incremental
@InputDirectory
abstract DirectoryProperty getInputDir()@OutputDirectory
abstract DirectoryProperty getOutputDir()@TaskAction
void execute(InputChanges inputChanges) {
inputChanges.getFileChanges(inputDir).each { change ->
if (change.fileType == FileType.DIRECTORY) returndef targetFile = outputDir.file(change.normalizedPath).get().asFile if (change.changeType == ChangeType.REMOVED) { targetFile.delete() } else { targetFile.text = change.file.text.reverse() }
}
}
}
对于 Gradle 的增量任务来说,通常只需要关注输入和输出,只要 InputDirectory
和 OutputDirectory
不变,那么就认为 Task 不需要再执行。因为在实现处理逻辑时,只关注于这两个值是否发生变化。
Rust 语言:Salsa 框架的增量 DAG 设计
Rust 编译器的文档上也包含了相关的介绍:Incremental compilation,而这里我们是一个相关的实现 —— 增量编译的设计者之一(Niko Matsakis)编写的库 Salsa。Salsa 是一个用于编写增量 (incremental) 、按需 (on-demand) 程序的 Rust 框架,其采用的是 “红-绿”算法。与 Gradle 相似的,Salsa 结构体(Structs)是使用一种 Salsa 属性宏进行了标注的结构体:
#[salsa::input]
:用于指定计算的“基本输入”#[salsa::tracked]
:用于指定在计算过程中创建的中间值#[salsa::interned]
:用于指定易于进行相等比较的小型值
由于 Salsa 相比于 Gradle 是位于更底层的基础设施,所以需要手动构建存储层,即 Jar 和数据库)。数据库是一个结构体,它最终存储 Salsa 的所有中间状态,例如来自跟踪函数的被记忆的 (memoized) 返回值。数据库本身是以一些中间结构 (intermediate structure) 的形式定义的,这些中间结构被称为 jars,并包含每个函数的数据。
缓存计算与存储计算
既然,我们已经通过注解将输入、输出、函数等内容标注出来,下一步就是缓存结果。如此一来,我们就可以通过缓存来提升计算性能。对于计算的缓存来说,至少需要包含这三个部分:
- 函数表达式(Fn 类型)。
- 零个或多个参数。
- 一个可选名称。
由此,我们才能获得缓存后的结果。在一些框架的设计里,诸如于 Python 语言
内存:Memoization —— 函数式编程的记忆
Memoization(记忆化)是函数式语言的一种特性,使用一组参数初次调用函数时,缓存参数和计算结果,当再次使用相同的参数调用该函数时,直接返回相应的缓存结果。在一些不支持 memoization 的语言里,需要手动引入这种设计,如 Java:
Map<Integer, Integer> cache = new ConcurrentHashMap<>();
Integer addOne(Integer x) {
return cache.computeIfAbsent(x -> x + 1);
}
上述只是一个加法的示例,万能的 StackOverflow 上有更多的示例:Java memoization method。
当然了,缓存是有负作用的 —— 第一次计算时存储结果会花费一定的时间,不过大部分情况下可以忽略不计。
数据库存储
对于耗时更长的 AI 或者是金融计算场景时,需要采用分布式的任务调度器,才能更快的得到计算结果。于是乎,采用分布式键值存储来对结果进行缓存就是更好的选择。在 Salsa 框架里,由于考虑到不同的类型(input、output、tracked 等),对于数据结构函数等来说,其对应的 Index 由三部分组成:
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct DatabaseKeyIndex {
group_index: u16,
query_index: u16,
key_index: u32,
}
大抵是
增量计算框架与算法
由于时间与精力限制(主要是我看不懂一些用英语写的公式,还有暂时没打算学 OCaml),这里就没有展开对于各类计算框架论文的讨论。诸如于 Incremental 和 Adapton 就是相关的论文与实现,就包含了非常不错的资料。
除此:https://lord.io/spreadsheets/ 一文也给了非常好的介绍。
这里,我就不展开了。
有了增量计算,然后呢?
后续的计算部分,可以参考 Apache Airflow 来实现。它是一个支持开源分布式任务调度框架,其架构
- 调度程序,它处理触发计划的工作流,并将任务提交给执行程序以运行。
- 执行器,它处理正在运行的任务。在默认的 Airflow 安装中,这会在调度程序中运行所有内容,但大多数适合生产的执行程序实际上会将任务执行推送给工作人员。
- Web 服务器,它提供了一个方便的用户界面来检查、触发和调试 DAG 和任务的行为。
- DAG 文件的文件夹,由调度程序和执行程序(以及执行程序拥有的任何工作人员)读取
- 元数据数据库,由调度程序、执行程序和网络服务器用来存储状态。
其架构图如下:
Apache Airflow 架构
不过、过了、还是不过,考虑到 Airflow 的 DAG 实现是 Python,在分布式任务调度并不是那么流行。但是,作为一个参考还是非常不错的。
其他
相关参考资料:
- 《How to Recalculate a Spreadsheet》一篇非常不错的文章,介绍了不同的算法是如何重新计算电子表格的。当然了,也包含作者自己写的新方案 Anchors。对于写库来说,是一个非常不错的参考。
- 《Excel 重新计算》介绍了 Excel 重新计算的逻辑。
- Salsa 文档:https://salsa-rs.netlify.app/ (中文版翻译:https://rust-chinese-translation.github.io/salsa-book/ )
- Adapton 提供了一个增量计算的编程语言抽象,官网:http://adapton.org/ 提供了非常不错的参考资料
除此,在构建工具方面,在这一方面微软研究院的《Build Systems à la Carte》提供了一个非常不错的介绍,如果你可以参考这一篇《【工业聚看论文】第一期:《Build Systems à la Carte: Theory and Practice》
(PS:因微信限制,链接请https://www.phodal.com/blog/incremental-computing-for-financial-python/使用)