Contents

01UnityJobSystem

UnityJobSystem

UnityJobSystem是Unity官方推出的一个多线程计算系统结构。可以将一些多线程计算任务分发的子线程,即Unity中的Job中取运算。提高运算效率。这一部分实际上跟很多变成语言中的多线程库,或者并行计算模型不谋而合。可以通过JobSystem来看并行计算的一些东西。

而在Catlike大佬的基础教程中(https://catlikecoding.com/unity/tutorials/basics/jobs/)也提及了JobSystem的使用。用来做更新分形(Fractal)计算中批量更新物体位置。

这里单独提出JobSystem并给予一些简单的并行计算任务来研究JobSystem。实际上JobSystem的背后是并行计算理论。

JobSystem接口概述

UnityJobSystem官方文档[https://docs.unity3d.com/Manual/job-system.html]。官方文档对JobSystem有个比较详细的叙述。

要使用Unity的JobSystem非常简单。JobSystem提供了一组接口,只要自定义一个Struct并且实现对应接口便可以使用JobSystem将计算任务分发到Job线程中去进行。大部分接口都只需要实现一个Execute函数即可。该函数即JobSystem中子线程所调用运行的接口函数。

这里需要注意几点:

  • JobSystem分配线程方式由JobSystem决定。它有可能分配到主线程,也可能分配到系统内的子线程上面。所以说子线程并不准确,简单称之为Job线程。
  • JobSystem在运行的时候,会将实现的Struct实例数据复制一份,放到各自Job线程中去运行。

JobSystem提供的接口主要为以下四类:

  • IJob:用于在一个Job线程上跑一个单独计算任务。
  • IJobParallelFor:用于并行的跑一个计算任务。每个线程并行的运行,并且为了保证共享数据,在接口中提供了一个int参数起到索引作用。
  • IJobFor:跟IJobParallelFor接口一样。但是允许非并行运行。即可以前后有一定任务依赖的方式运行,这种方式通过Schedule返回的JobHandle来实现。
  • IJobParallelForTransform:跟IJobParallelFor接口一样。但是接口中会多一个Transform参数,用来计算Transform数据。

运行一个Job大致可以描述为以下流程(以IJob为例):

  • 创建Job:实现一个实现了IJob的Struct结构。
  • 规划Job:在对应Struct实例上面调用Schedule函数来分配计算任务。
  • 等待Job完成:Job会由JobSystem安排运行。其可能立即完成,也可能持续一段时间。JobSystem提供Complete函数,来确保对应的Job已经完成,可以由主线程来读取数据。

前面也提到过用于跑Job的Struct会被复制到各个子线程上去。对于各线程读取数据来说,还无大碍。但是对于最后数据汇总的部分,其一定会有写入共享内存的问题。Unity为此提供了一套线程安全的数据类型,即NativeContainer类型。使用该类型,相当于让子线程访问跟主线程共享的一块内存区域,而不是一份copy。这样才能方便实现最后数据汇总流程。

对于使用NativeContainer来说由两点需要注意的。

  • 读写权限

对于一个默认实现的NativeContainer容器来说,子线程同时拥有读写权限。这样会稍微降低效率,可以通过Attribute标记字段来约定只读或只写来提高性能。Attribute对应如下

1
2
3
4
[ReadOnly] // 表示这个字段容器数据 子线程只用来读取
public NativeArray<int> input;
[WriteOnly] // 表示这个字段容器数据 子线程只用来写入
public NativeArray<int> output;
  • 内存分配

NativeContainer容器类型的内存都分配在非托管堆上面。所以这部分内存需要程序员手动控制释放,否则会造成内存泄漏。为了方便内存控制,unity对其实例化的参数中添加了生命周期控制参数:

  • Allocator.Temp:最快分配内存方式。这个内存的生命周期为一帧或更短。不可以将该方式分配的内存实例赋值给实现Job的Struct字段。
  • Allocator.TempJob:略慢于Temp但快于Persistent的方式。该内存的生命周期为四帧。重要提示:必须在四帧以内用Dispose接口释放该内存,否则会输出一个Warning。
  • Allocator.Persistent:最慢的分配方式。但是可以按照你的需要存活任意时长,在不需要的时候需要用Dispose来释放该内存区域,否则会泄露。该方式实际上是直接调用malloc分配的一个包装。。

这些接口的使用会在下面的例子中详细说明:

简单的求和拆分运算

下面简单看一个例子来看JobSystem怎么运行的。这里以一个简单的求和任务为例子。在主线程写一个简单的大整数求和可以如下进行:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public int total_count = 100000000

// 给定一个数字,从0开时到该数字所有数字求和
public void SimpleSumNumber()
{
    for (int i = 0; i < total_coun; i++)
    {
        result += i;
    }
}

当数字很大的时候,这个计算会消耗很多时间。可以通过Unity的Profiler或者C#的Stopwatch来查看这段允运行的耗时。

在我的电脑上这一段耗时如下图

可以看到耗时大概237.64ms。

先用单个的Job来将该段运行放到Job线程中去。这个按照前面可以通过如下方式实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 实现IJob接口的Struct 承载计算要用到的数据和最后计算结果
// 因为计算主要用到一个只读的目标数字所以需要一个total_count字段
// 而最后的结果则需要返回到主线程所以使用一个NativeArray类型来传递
public struct SumNumberJob : IJob
{
    public long total_count;
    // 因为最终结果只用来写入 标记WriteOnly
    // NativeArray则是一个数组容器结构,所以最后写入的时候默认第一位为结果
    [WriteOnly]
    public NativeArray<long> result;

    public void Execute()
    {
        long temp = 0;
        for (long i = 0; i < total_count; i++)
        {
            temp += i;
        }
        result[0] = temp;
    }
}

// 调度启用JobSystem函数
public void ScheduleSumNumerJob()
{
    // 实例化用于计算的JobStruct,result则实例化一个长度为1的NativeArray
    var job_struct = new SumNumberJob
    {
        total_count = total_count,
        result = new NativeArray<long>(1, Allocator.Persistent)
    };

    // 调用Schedule函数来让JobSystem规划该Job的计算,该函数会返回一个JobHandle对象。
    // JobHandle对象是一个Struct,用来记录Job的运行状态
    var job_handle = job_struct.Schedule();
    // JobHandle有一个Complete函数。调用该函数,会阻塞当前线程。确保目标Job计算完成。相当于强制当前线程去等待目标线程。如果目标已经计算完毕,则会立即返回
    job_handle.Complete();
    // 最后从Job共享内存部分取出计算数据
    result = job_struct.result[0];
    // 因为NativeArray用的Persistent分配方式,需要手动Dispose来释放该内存。
    job_struct.result.Dispose();
}

该实现实际上会在当帧内完成计算。因为在主线程函数ScheduleSumNumerJob中直接调用了Complete接口。这个会迫使等待Job计算完成。运行该函数,打开Profiler可以看到如下图。

我们可以看到这个Job是直接在主线程上运行了。Complete函数里面直接调用了JobStruct上的Execute函数来执行。

现在我们考虑将这个任务分解成并行任务。这个任务实际上属于易并行任务一类(这个可以看并行计算相关书籍)。我们将任务划分成$n$个区间,每个区间$[S_i,S_{i+1}]$内的计算交给一个Job线程来运行。可以看到多个线程之间的计算互不影响,可并发执行。

只要在上面的Job流程略加改造即可。在原有的JobStruct上添加区间范围,每个Job只是计数对应区间。在规划的时候创建多个Job来执行。最后汇总所有结果。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public struct SumNumberJob : IJob
{
    public long from_num;
    public long to_num;
    [WriteOnly]
    public NativeArray<long> result;
    // 计数改为区间范围
    public void Execute()
    {
        long temp = 0;
        for (long i = from_num; i < to_num; i++)
        {
            temp += i;
        }
        result[0] = temp;
    }
}

// Job多线程规划的时候 则for循环一次启动多个Job 最后等到所有Job完成之后把每个结果累加起来即可
public void ScheduleSumNumerJob()
{
    var job_count = 8;

    SumNumberJob[] job_struct_array = new SumNumberJob[job_count];
    NativeArray<JobHandle> job_handle_array = new NativeArray<JobHandle>(job_count, Allocator.Persistent);
    for (int i = 0; i < job_count; i++)
    {
        job_struct_array[i] = new SumNumberJob
        {
            from_num = total_count * i / job_count,
            to_num = total_count * (i + 1) / job_count,
            result = new NativeArray<long>(1, Allocator.Persistent)
        };
        job_handle_array[i] = job_struct_array[i].Schedule();
    }
    // 该函数相当于将所有的JobHandle合并为一个。调用all_handle的Complete相当于要求数组中所有JobHandle都完成。
    var all_handle = JobHandle.CombineDependencies(job_handle_array);

    all_handle.Complete();

    for (int i = 0; i < job_count; i++)
    {
        result += job_struct_array[i].result[0];
        job_struct_array[i].result.Dispose();
    }
    job_handle_array.Dispose();
}

运行之后Profiler效果如下

可以明显看到,整个计算的时长缩短了。只用了约48ms左右。同时可以看到以下几个情况。

  • Job栏中的Worker不再是Idle状态,而是在运行对应的SumNumberJob函数。
  • 并且Worker的个数与我们规划的Job个数一致。
  • 主线程中有个明显的WaitForJobGroup流程,在等待Job完成其工作。

下面重新用并行接口版的IJobFor来实现该效果。其代码如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 改为实现IJobfor接口 这次只会实例化一个Job实例
// 所以传入数据部分改为提前传入划分好的区间结构 根据Execute传入的index来调整使用的计数区间
public struct SumNumberParallelJob : IJobFor
{
    [Unity.Collections.ReadOnly]
    public NativeArray<long> input;
    [Unity.Collections.WriteOnly]
    public NativeArray<long> result;

    // IJobFor接口的Execute带有一个参数index 该index相当于一个标识符虽然数据一致,但每个线程计算有所不同区分
    // index跟Schedule中参数有关,规划了多少个会传入几
    public void Execute(int index)
    {
        long temp = 0;
        var from_num = input[index];
        var to_num = input[index + 1];
        for (long i = from_num; i < to_num; i++)
        {
            temp += i;
        }
        result[index] = temp;
    }
}

// 使用IJobFor来并行计算任务
public void ScheduleSumNumerParallelJob()
{
    var job_count = 8;
    // 实例化一个数据结构 该实例只有一份,会copy到各个运作Job线程上去
    var job_struct = new SumNumberParallelJob
    {
        input = new NativeArray<long>(job_count + 1, Allocator.Persistent),
        result = new NativeArray<long>(job_count, Allocator.Persistent)
    };

    for (int i = 0; i < job_count + 1; i++)
    {
        job_struct.input[i] = total_count * i / job_count;
    }

    // 并行规划接口调用ScheduleParallel
    // 第一个参数相当于调用Execute次数,会在同一个或不同线程上调用Execute共job_count次数
    // 第二个参数官方称之为workstealing,即工作偷取数量。即每个线程相当于来拿取任务个数。
    // 例如为1,表示每个线程空闲时会尝试获取任务,每次只会拿去一个任务,其只会跑一次Execute函数,传入一个index。
    // 等到结束之后,其会尝试再来偷取对应数量的任务个数。
    var job_handle = job_struct.ScheduleParallel(job_count, 1, default);
    job_handle.Complete();

    for (int i = 0; i < job_count; i++)
    {
        result += job_struct.result[i];
    }
    job_struct.input.Dispose();
    job_struct.result.Dispose();
}

关于SchedulleParallel运行官网有个详细的图来描述,可见 https://docs.unity3d.com/Manual/job-system-parallel-for-jobs.html

运行之后,Profiler效果如下

可以看到计算效果跟前面相差无几。实际上并行变成有着效率提升的上限。也可以明显看到,一个计算划分为原来的$1/8$之后计算时间并没有缩减为$1/8$。这是因为调度线程本身就有性能消耗。所以即便再多的线程也有一个提升的上限。当我把我的job个数拉满到16之后,占满所有核,计算大概再30ms左右。

补充信息

  • Job依赖 前面有提到Job可以建立前后计算的依赖关系。这种主要为计算依赖服务。这个方式通过JobHandle来实现。在每个Schedule方法中实际都可以传入一个JobHandle对象。传入表示,当前规划Job依赖目标Job执行完成。其大致框架如下
1
2
3
4
5
6
7
8
public void JobDependency()
{
    //...
    JobHandle first_job_handle = job_data.Schedule();
    // 这个表示第二个Job以来第一个完成。 后续只用关系第二个Job的Complete即可
    JobHandle second_job_handle = second_job_data.Schedule(first_job_handle);
    //...
}
  • Burst编译 实际上JobSystem还有着配套的Burst编译系统,可以通过Attribute标记Burst编译来优化job的运行效果。Burst根据官方来看,试了LLVM编译来优化运行,可以提高并行运行效率。要使用其按如下标记即可
1
2
3
4
5
[BurstCompile]
public struct SumNumberParallelJob : IJobFor
{
    //...
}

使用了Burst编译优化后,可以明显发现上面的计算任务提升很多。这可能跟把这个求和运算直接优化了有关。也有说,测试来看Burst编译优化不如il2cpp的直接转C++运行方式快。

在我的机器上编译后运行耗时大概只有7ms。profiler如下

  • 运行在主线程

Job的规划接口,即Schedule接口。都是放在扩展方法里面的。其处理基础的Schedule接口外,还有如下几类接口

  • Run:直接在当前线程上运行。
  • xxByRef:当传递JobStruct不是值拷贝,而是引用方式。这样对于较大的数据快来说会比Schedule略快一些。

这些接口在做某些类型并行计算时会非常有用。例如我在开启的Job线程中再开启一些Job计算。

总结

在我看来,JobSystem其实有点类似于MPI那种

还有其余一些并行计算可以看Catlike分形运动那一部分,即上面那篇文章。对此我也记录了一篇。