UnityJobSystem
UnityJobSystem是Unity官方推出的一个多线程计算系统结构。可以将一些多线程计算任务分发的子线程,即Unity中的Job中取运算。提高运算效率。这一部分实际上跟很多变成语言中的多线程库,或者并行计算模型不谋而合。可以通过JobSystem来看并行计算的一些东西。
而在Catlike大佬的基础教程中(https://catlikecoding.com/unity/tutorials/basics/jobs/)也提及了JobSystem的使用。用来做更新分形(Fractal)计算中批量更新物体位置。
这里单独提出JobSystem并给予一些简单的并行计算任务来研究JobSystem。实际上JobSystem的背后是并行计算理论。
JobSystem接口概述
UnityJobSystem官方文档[https://docs.unity3d.com/Manual/job-system.html]。官方文档对JobSystem有个比较详细的叙述。
要使用Unity的JobSystem非常简单。JobSystem提供了一组接口,只要自定义一个Struct并且实现对应接口便可以使用JobSystem将计算任务分发到Job线程中去进行。大部分接口都只需要实现一个Execute函数即可。该函数即JobSystem中子线程所调用运行的接口函数。
这里需要注意几点:
- JobSystem分配线程方式由JobSystem决定。它有可能分配到主线程,也可能分配到系统内的子线程上面。所以说子线程并不准确,简单称之为Job线程。
- JobSystem在运行的时候,会将实现的Struct实例数据复制一份,放到各自Job线程中去运行。
JobSystem提供的接口主要为以下四类:
- IJob:用于在一个Job线程上跑一个单独计算任务。
- IJobParallelFor:用于并行的跑一个计算任务。每个线程并行的运行,并且为了保证共享数据,在接口中提供了一个int参数起到索引作用。
- IJobFor:跟IJobParallelFor接口一样。但是允许非并行运行。即可以前后有一定任务依赖的方式运行,这种方式通过
Schedule返回的JobHandle来实现。
- IJobParallelForTransform:跟IJobParallelFor接口一样。但是接口中会多一个Transform参数,用来计算Transform数据。
运行一个Job大致可以描述为以下流程(以IJob为例):
- 创建Job:实现一个实现了IJob的Struct结构。
- 规划Job:在对应Struct实例上面调用Schedule函数来分配计算任务。
- 等待Job完成:Job会由JobSystem安排运行。其可能立即完成,也可能持续一段时间。JobSystem提供Complete函数,来确保对应的Job已经完成,可以由主线程来读取数据。
前面也提到过用于跑Job的Struct会被复制到各个子线程上去。对于各线程读取数据来说,还无大碍。但是对于最后数据汇总的部分,其一定会有写入共享内存的问题。Unity为此提供了一套线程安全的数据类型,即NativeContainer类型。使用该类型,相当于让子线程访问跟主线程共享的一块内存区域,而不是一份copy。这样才能方便实现最后数据汇总流程。
对于使用NativeContainer来说由两点需要注意的。
对于一个默认实现的NativeContainer容器来说,子线程同时拥有读写权限。这样会稍微降低效率,可以通过Attribute标记字段来约定只读或只写来提高性能。Attribute对应如下
1
2
3
4
|
[ReadOnly] // 表示这个字段容器数据 子线程只用来读取
public NativeArray<int> input;
[WriteOnly] // 表示这个字段容器数据 子线程只用来写入
public NativeArray<int> output;
|
NativeContainer容器类型的内存都分配在非托管堆上面。所以这部分内存需要程序员手动控制释放,否则会造成内存泄漏。为了方便内存控制,unity对其实例化的参数中添加了生命周期控制参数:
- Allocator.Temp:最快分配内存方式。这个内存的生命周期为一帧或更短。不可以将该方式分配的内存实例赋值给实现Job的Struct字段。
- Allocator.TempJob:略慢于Temp但快于Persistent的方式。该内存的生命周期为四帧。重要提示:必须在四帧以内用Dispose接口释放该内存,否则会输出一个Warning。
- Allocator.Persistent:最慢的分配方式。但是可以按照你的需要存活任意时长,在不需要的时候需要用Dispose来释放该内存区域,否则会泄露。该方式实际上是直接调用
malloc分配的一个包装。。
这些接口的使用会在下面的例子中详细说明:
简单的求和拆分运算
下面简单看一个例子来看JobSystem怎么运行的。这里以一个简单的求和任务为例子。在主线程写一个简单的大整数求和可以如下进行:
1
2
3
4
5
6
7
8
9
10
|
public int total_count = 100000000
// 给定一个数字,从0开时到该数字所有数字求和
public void SimpleSumNumber()
{
for (int i = 0; i < total_coun; i++)
{
result += i;
}
}
|
当数字很大的时候,这个计算会消耗很多时间。可以通过Unity的Profiler或者C#的Stopwatch来查看这段允运行的耗时。
在我的电脑上这一段耗时如下图
可以看到耗时大概237.64ms。
先用单个的Job来将该段运行放到Job线程中去。这个按照前面可以通过如下方式实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
// 实现IJob接口的Struct 承载计算要用到的数据和最后计算结果
// 因为计算主要用到一个只读的目标数字所以需要一个total_count字段
// 而最后的结果则需要返回到主线程所以使用一个NativeArray类型来传递
public struct SumNumberJob : IJob
{
public long total_count;
// 因为最终结果只用来写入 标记WriteOnly
// NativeArray则是一个数组容器结构,所以最后写入的时候默认第一位为结果
[WriteOnly]
public NativeArray<long> result;
public void Execute()
{
long temp = 0;
for (long i = 0; i < total_count; i++)
{
temp += i;
}
result[0] = temp;
}
}
// 调度启用JobSystem函数
public void ScheduleSumNumerJob()
{
// 实例化用于计算的JobStruct,result则实例化一个长度为1的NativeArray
var job_struct = new SumNumberJob
{
total_count = total_count,
result = new NativeArray<long>(1, Allocator.Persistent)
};
// 调用Schedule函数来让JobSystem规划该Job的计算,该函数会返回一个JobHandle对象。
// JobHandle对象是一个Struct,用来记录Job的运行状态
var job_handle = job_struct.Schedule();
// JobHandle有一个Complete函数。调用该函数,会阻塞当前线程。确保目标Job计算完成。相当于强制当前线程去等待目标线程。如果目标已经计算完毕,则会立即返回
job_handle.Complete();
// 最后从Job共享内存部分取出计算数据
result = job_struct.result[0];
// 因为NativeArray用的Persistent分配方式,需要手动Dispose来释放该内存。
job_struct.result.Dispose();
}
|
该实现实际上会在当帧内完成计算。因为在主线程函数ScheduleSumNumerJob中直接调用了Complete接口。这个会迫使等待Job计算完成。运行该函数,打开Profiler可以看到如下图。
我们可以看到这个Job是直接在主线程上运行了。Complete函数里面直接调用了JobStruct上的Execute函数来执行。
现在我们考虑将这个任务分解成并行任务。这个任务实际上属于易并行任务一类(这个可以看并行计算相关书籍)。我们将任务划分成$n$个区间,每个区间$[S_i,S_{i+1}]$内的计算交给一个Job线程来运行。可以看到多个线程之间的计算互不影响,可并发执行。
只要在上面的Job流程略加改造即可。在原有的JobStruct上添加区间范围,每个Job只是计数对应区间。在规划的时候创建多个Job来执行。最后汇总所有结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
public struct SumNumberJob : IJob
{
public long from_num;
public long to_num;
[WriteOnly]
public NativeArray<long> result;
// 计数改为区间范围
public void Execute()
{
long temp = 0;
for (long i = from_num; i < to_num; i++)
{
temp += i;
}
result[0] = temp;
}
}
// Job多线程规划的时候 则for循环一次启动多个Job 最后等到所有Job完成之后把每个结果累加起来即可
public void ScheduleSumNumerJob()
{
var job_count = 8;
SumNumberJob[] job_struct_array = new SumNumberJob[job_count];
NativeArray<JobHandle> job_handle_array = new NativeArray<JobHandle>(job_count, Allocator.Persistent);
for (int i = 0; i < job_count; i++)
{
job_struct_array[i] = new SumNumberJob
{
from_num = total_count * i / job_count,
to_num = total_count * (i + 1) / job_count,
result = new NativeArray<long>(1, Allocator.Persistent)
};
job_handle_array[i] = job_struct_array[i].Schedule();
}
// 该函数相当于将所有的JobHandle合并为一个。调用all_handle的Complete相当于要求数组中所有JobHandle都完成。
var all_handle = JobHandle.CombineDependencies(job_handle_array);
all_handle.Complete();
for (int i = 0; i < job_count; i++)
{
result += job_struct_array[i].result[0];
job_struct_array[i].result.Dispose();
}
job_handle_array.Dispose();
}
|
运行之后Profiler效果如下
可以明显看到,整个计算的时长缩短了。只用了约48ms左右。同时可以看到以下几个情况。
- Job栏中的Worker不再是Idle状态,而是在运行对应的SumNumberJob函数。
- 并且Worker的个数与我们规划的Job个数一致。
- 主线程中有个明显的WaitForJobGroup流程,在等待Job完成其工作。
下面重新用并行接口版的IJobFor来实现该效果。其代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
// 改为实现IJobfor接口 这次只会实例化一个Job实例
// 所以传入数据部分改为提前传入划分好的区间结构 根据Execute传入的index来调整使用的计数区间
public struct SumNumberParallelJob : IJobFor
{
[Unity.Collections.ReadOnly]
public NativeArray<long> input;
[Unity.Collections.WriteOnly]
public NativeArray<long> result;
// IJobFor接口的Execute带有一个参数index 该index相当于一个标识符虽然数据一致,但每个线程计算有所不同区分
// index跟Schedule中参数有关,规划了多少个会传入几
public void Execute(int index)
{
long temp = 0;
var from_num = input[index];
var to_num = input[index + 1];
for (long i = from_num; i < to_num; i++)
{
temp += i;
}
result[index] = temp;
}
}
// 使用IJobFor来并行计算任务
public void ScheduleSumNumerParallelJob()
{
var job_count = 8;
// 实例化一个数据结构 该实例只有一份,会copy到各个运作Job线程上去
var job_struct = new SumNumberParallelJob
{
input = new NativeArray<long>(job_count + 1, Allocator.Persistent),
result = new NativeArray<long>(job_count, Allocator.Persistent)
};
for (int i = 0; i < job_count + 1; i++)
{
job_struct.input[i] = total_count * i / job_count;
}
// 并行规划接口调用ScheduleParallel
// 第一个参数相当于调用Execute次数,会在同一个或不同线程上调用Execute共job_count次数
// 第二个参数官方称之为workstealing,即工作偷取数量。即每个线程相当于来拿取任务个数。
// 例如为1,表示每个线程空闲时会尝试获取任务,每次只会拿去一个任务,其只会跑一次Execute函数,传入一个index。
// 等到结束之后,其会尝试再来偷取对应数量的任务个数。
var job_handle = job_struct.ScheduleParallel(job_count, 1, default);
job_handle.Complete();
for (int i = 0; i < job_count; i++)
{
result += job_struct.result[i];
}
job_struct.input.Dispose();
job_struct.result.Dispose();
}
|
关于SchedulleParallel运行官网有个详细的图来描述,可见
https://docs.unity3d.com/Manual/job-system-parallel-for-jobs.html
运行之后,Profiler效果如下
可以看到计算效果跟前面相差无几。实际上并行变成有着效率提升的上限。也可以明显看到,一个计算划分为原来的$1/8$之后计算时间并没有缩减为$1/8$。这是因为调度线程本身就有性能消耗。所以即便再多的线程也有一个提升的上限。当我把我的job个数拉满到16之后,占满所有核,计算大概再30ms左右。
补充信息
- Job依赖
前面有提到Job可以建立前后计算的依赖关系。这种主要为计算依赖服务。这个方式通过JobHandle来实现。在每个Schedule方法中实际都可以传入一个JobHandle对象。传入表示,当前规划Job依赖目标Job执行完成。其大致框架如下
1
2
3
4
5
6
7
8
|
public void JobDependency()
{
//...
JobHandle first_job_handle = job_data.Schedule();
// 这个表示第二个Job以来第一个完成。 后续只用关系第二个Job的Complete即可
JobHandle second_job_handle = second_job_data.Schedule(first_job_handle);
//...
}
|
- Burst编译
实际上JobSystem还有着配套的Burst编译系统,可以通过Attribute标记Burst编译来优化job的运行效果。Burst根据官方来看,试了LLVM编译来优化运行,可以提高并行运行效率。要使用其按如下标记即可
1
2
3
4
5
|
[BurstCompile]
public struct SumNumberParallelJob : IJobFor
{
//...
}
|
使用了Burst编译优化后,可以明显发现上面的计算任务提升很多。这可能跟把这个求和运算直接优化了有关。也有说,测试来看Burst编译优化不如il2cpp的直接转C++运行方式快。
在我的机器上编译后运行耗时大概只有7ms。profiler如下
Job的规划接口,即Schedule接口。都是放在扩展方法里面的。其处理基础的Schedule接口外,还有如下几类接口
- Run:直接在当前线程上运行。
- xxByRef:当传递JobStruct不是值拷贝,而是引用方式。这样对于较大的数据快来说会比Schedule略快一些。
这些接口在做某些类型并行计算时会非常有用。例如我在开启的Job线程中再开启一些Job计算。
总结
在我看来,JobSystem其实有点类似于MPI那种
还有其余一些并行计算可以看Catlike分形运动那一部分,即上面那篇文章。对此我也记录了一篇。