UE5_Shader并行编译

背景

最近的工程Shader编译时间太长, 之前4.24测试过Shader可以使用FASTBuild编译, 考虑利用下闲置机器算力增加编译效率 。
查看代码时候看的是UE5EA版本的代码, 之前用4.24,4.26版本代码是与此不同的 。

Shader编译流程

FShaderCompilingManager 是管理Shader编译的对象, 虚幻编辑器会在启动后(PreInit)构造该对象,构造函数中会检测平台特性,如果有分布式编译工具可用,那么就会启动编译线程 。

Shader编译器线程

编译器线程继承此类FShaderCompileThreadRunnableBase , 实现CompilingLoop 方法执行编译逻辑。

CompilingLoop是编译器线程中持续运行的函数, 运行一次, 则消费一次Manager->AllJobs.GetPendingJobs()获取的任务, 然后执行批量编译分派给具体的编译器(分布式), 等待编译完成后, 调用Manager->ProcessFinishedJob , 如果编译失败则调用本地的编译器编译 FShaderCompileUtilities::ExecuteShaderCompileJob

分布式Shader编译框架 FShaderCompileDistributedThreadRunnable_Interface

在UE5的版本中(不知道什么时候加入的), 实现了一个FShaderCompileDistributedThreadRunnable_Interface 类, 该类封装了分布式编译的框架 。

将编译控制逻辑抽象成 IDistributedBuildController 接口来实现 。

从而将XGE (IncreBuild) , FASTBuild 等第三方分布式工具从引擎核心源码(原来是Runtime/Engine/ShaderCompiler)中可以分离,通过ModularFeature 注册的方式可以由第三方插件来实现 。

因此, XGE的Shader编译, 在UE5中也是独立了一个Plugin 叫做 XGEController

消费获取编译任务

代码在 ShaderCompilerDistributed.cpp 中有实现

TArray<FShaderCommonCompileJobPtr> PendingJobs;
//if (LIKELY(!bIsHung))	// stop accepting jobs if we're hung - TODO: re-enable this after lockup detection logic is proved reliable and/or we have job resubmission in place
{
    for (int32 PriorityIndex = MaxPriorityIndex; PriorityIndex >= MinPriorityIndex; --PriorityIndex)
    {
        // Grab as many jobs from the job queue as we can
        const EShaderCompileJobPriority Priority = (EShaderCompileJobPriority)PriorityIndex;
        const int32 MinBatchSize = (Priority == EShaderCompileJobPriority::Low) ? 1 : DistributedShaderCompilerVariables::MinBatchSize;
        const int32 NumJobs = Manager->AllJobs.GetPendingJobs(EShaderCompilerWorkerType::XGE, Priority, MinBatchSize, INT32_MAX, PendingJobs);
        if (NumJobs > 0)
        {
            UE_LOG(LogShaderCompilers, Display, TEXT("Started %d 'XGE' shader compile jobs with '%s' priority"),
                NumJobs,
                ShaderCompileJobPriorityToString((EShaderCompileJobPriority)PriorityIndex));
        }
        if (PendingJobs.Num() >= DistributedShaderCompilerVariables::MinBatchSize)
        {
            break;
        }
    }
}

这段代码将按照优先级从高到低取出FShaderCommonCompileJobPtr , 然后按照ShaderType 分类做了Batch处理,
通过DispatchShaderCompileJobsBatch 分发给具体实现编译的Controler 编译 。

分派任务 DispatchShaderCompileJobsBatch

DispatchShaderCompileJobsBatch 函数是实现将一批任务打包处理封装成 FDistributedShaderCompilerTask 提交给
等待Contoller 处理。

FTaskCommandData TaskCommandData;
TaskCommandData.Command = Manager->ShaderCompileWorkerName;
TaskCommandData.CommandArgs = WorkerParameters;
TaskCommandData.InputFileName = InputFilePath;
TaskCommandData.Dependencies = GetDependencyFilesForJobs(JobsToSerialize);

DispatchedTasks.Add(
    new FDistributedShaderCompilerTask(
        CachedController.EnqueueTask(TaskCommandData),
        MoveTemp(JobsToSerialize),
        MoveTemp(InputFilePath),
        MoveTemp(OutputFilePath)
    )
);

其中使用到了 FShaderCompileUtilities::DoWriteTasks 将任务组打包到输入文件,结果需要输出到输出文件。
输入和输出文件由 Controller 提供方法生成, 只需要每次调用生成唯一路径即可 。

任务的数据类型是 FTaskCommandData , Controller 需要实现方法CachedController.EnqueueTask 将任务写入到自身的队列中消费,
并且返回一个 Future 用来通知任务完成 ,后面的流程会便利所有Future 来查询完成状态。

任务结果提交

for (auto Iter = DispatchedTasks.CreateIterator(); Iter; ++Iter)
{
    bool bOutputFileReadFailed = true;

    FDistributedShaderCompilerTask* Task = *Iter;
    if (!Task->Future.IsReady())
    {
        continue;
    }

    FDistributedBuildTaskResult Result = Task->Future.Get();
    NumDispatchedJobs -= Task->ShaderJobs.Num();
    LastTimeTaskCompleted = FPlatformTime::Seconds();

    if (Result.ReturnCode != 0)
    {
        UE_LOG(LogShaderCompilers, Error, TEXT("Shader compiler returned a non-zero error code (%d)."), Result.ReturnCode);
    }

    if (Result.bCompleted)
    {
        // Check the output file exists. If it does, attempt to open it and serialize in the completed jobs.
        if (IFileManager::Get().FileExists(*Task->OutputFilePath))
        {
            FArchive* OutputFileAr = IFileManager::Get().CreateFileReader(*Task->OutputFilePath, FILEREAD_Silent);
            if (OutputFileAr)
            {
                bOutputFileReadFailed = false;
                FShaderCompileUtilities::DoReadTaskResults(Task->ShaderJobs, *OutputFileAr);
                delete OutputFileAr;
            }
        }

        if (bOutputFileReadFailed)
        {
            // Reading result from XGE job failed, so recompile shaders in current job batch locally
            UE_LOG(LogShaderCompilers, Log, TEXT("Rescheduling shader compilation to run locally after XGE job failed: %s"), *Task->OutputFilePath);

            for (FShaderCommonCompileJobPtr Job : Task->ShaderJobs)
            {
                FShaderCompileUtilities::ExecuteShaderCompileJob(*Job);
            }
        }

        // Enter the critical section so we can access the input and output queues
        {
            FScopeLock Lock(&Manager->CompileQueueSection);
            for (const auto& Job : Task->ShaderJobs)
            {
                Manager->ProcessFinishedJob(Job);
            }
        }
    }
}

当查询到任务完成时 FShaderCompileUtilities::DoReadTaskResults 来从OutputFile 中读取任务结果, 如果OutputFile 不存在,表示任务失败, 那么就使用本地编译 FShaderCompileUtilities::ExecuteShaderCompileJob, 否则通过读取的结果Manager->ProcessFinishedJob(Job)标记所有完成的Shader编译。

FXGEControllerModule 实现XGE 分布式编译

FXGEControllerModule实现了IDistributedBuildController 接口, 作为独立插件,在引擎启动时注册了自身
IModularFeatures::Get().RegisterModularFeature(GetModularFeatureType(), this); , 因此 ShaderCompilerManager可以通过ModularFeature
取出可用的Contoller, 判定Contoller可用时调用InitializeController, Contoller 启动后会开启一个异步任务(线程)执行 WriteOutThreadProc

WriteOutThreadProc

FString XGConsoleArgs = FString::Printf(TEXT("/VIRTUALIZEDIRECTX /allowremote=\"%s\" %s /allowintercept=\"%s\" /title=\"Unreal Engine FASTBuild Tasks\" /monitordirs=\"%s\" /command=\"%s -FASTBuildController %s\""),
    XGE_INTERCEPT_EXE_NAMES,
    FASTBuildController::AvoidUsingLocalMachine() ? TEXT("/avoidlocal=ON") : TEXT(""),
    XGE_CONTROL_WORKER_NAME,
    *WorkingDirectory,
    XGE_CONTROL_WORKER_FILENAME,
    *PipeName);

// Create the output pipe as a server...
if (!OutputNamedPipe.Create(FString::Printf(TEXT("\\\\.\\pipe\\%s-A"), *PipeName), true, false))
{
    UE_LOG(LogFASTBuildController, Fatal, TEXT("Failed to create the output FASTBuild named pipe."));
}

const int32 PriorityModifier = 0;	// normal by default. Interactive use case shouldn't be affected as the jobs will avoid local machine
// Start the controller process
uint32 XGConsoleProcID = 0;
UE_LOG(LogFASTBuildController, Verbose, TEXT("Launching xgConsole"));
BuildProcessHandle = FPlatformProcess::CreateProc(*XGConsolePath, *XGConsoleArgs, false, false, true, &XGConsoleProcID, PriorityModifier, *ControlWorkerDirectory, nullptr);
if (!BuildProcessHandle.IsValid())
{
    UE_LOG(LogFASTBuildController, Fatal, TEXT("Failed to launch the FASTBuild control worker process."));
}

// If the engine crashes, we don't get a chance to kill the build process.
// Start up the build monitor process to monitor for engine crashes.
uint32 BuildMonitorProcessID;
UE_LOG(LogFASTBuildController, Verbose, TEXT("Launching FASTBuildController to fan out tasks"));
FString XGMonitorArgs = FString::Printf(TEXT("-xgemonitor %d %d"), FPlatformProcess::GetCurrentProcessId(), XGConsoleProcID);
FProcHandle BuildMonitorHandle = FPlatformProcess::CreateProc(*GetControlWorkerExePath(), *XGMonitorArgs, true, false, false, &BuildMonitorProcessID, PriorityModifier, nullptr, nullptr);
FPlatformProcess::CloseProc(BuildMonitorHandle);

// Wait for the controller to connect to the output pipe
if (!OutputNamedPipe.OpenConnection())
{
    UE_LOG(LogFASTBuildController, Fatal, TEXT("Failed to open a connection on the output FASTBuild named pipe."));
}

// Connect the input pipe (controller is the server)...
if (!InputNamedPipe.Create(FString::Printf(TEXT("\\\\.\\pipe\\%s-B"), *PipeName), false, false))
{
    UE_LOG(LogFASTBuildController, Fatal, TEXT("Failed to connect the input FASTBuild named pipe."));
}

// Pass the xgConsole process ID to the FASTBuild control worker, so it can terminate the build on exit
if (!OutputNamedPipe.WriteBytes(sizeof(XGConsoleProcID), &XGConsoleProcID))
{
    UE_LOG(LogFASTBuildController, Fatal, TEXT("Failed to pass xgConsole process ID to FASTBuild control worker."));
}

LastEventTime = FPlatformTime::Cycles();

// Launch the output thread
ReadBackThreadFuture = Async(EAsyncExecution::Thread, [this]() { ReadBackThreadProc(); });

...
while (true)
{
    ...
    // Take one task from the pending queue.
    FTask* Task = nullptr;
    {
        FScopeLock Lock(TasksCS);
        PendingTasks.Dequeue(Task);
    }

    if (Task)
    {
        WriteBuffer.Reset();
        WriteBuffer.AddUninitialized(sizeof(uint32));

        FMemoryWriter Writer(WriteBuffer, false, true);

        Writer << Task->ID;
        Writer << Task->CommandData.Command;
        Writer << Task->CommandData.CommandArgs;
        *reinterpret_cast<uint32*>(WriteBuffer.GetData()) = WriteBuffer.Num() - sizeof(uint32);

        // Move the tasks to the dispatched tasks map before launching it
        {
            FScopeLock Lock(TasksCS);
            DispatchedTasks.Add(Task->ID, Task);
        }

        if (!OutputNamedPipe.WriteBytes(WriteBuffer.Num(), WriteBuffer.GetData()))
        {
            // Error occurred whilst writing task args to the named pipe.
            // It's likely the controller process was terminated.
            bRestartWorker = true;
        }

        // Update the last event time.
        FPlatformAtomics::InterlockedExchange(reinterpret_cast<volatile int32*>(&LastEventTime), FPlatformTime::Cycles());
    }
}

这个函数会启动一个程序XGConsole (IncreBuild 提供 xgConsole.exe),传入管道名字和路径相关,并启动另一个XGControlWorker用来监控当引擎崩溃时,关闭XGConsole的任务 。
在线程中建立了WriteOut 管道(XXXX-A)和ReadIn管道 (XXXXX-B), 启动后立即发送Console的进程ID, 然后再启动另一个异步任务,执行ReadBackThreadProc , 之后则从PendingTask 取出任务, 热案后将任务数据直接写入传递给 XGConsole 进程。

ReadBackThreadProc 逻辑

可以想见此函数主要目的是从 XGConsole 的READ管道接受任务结果, 然后通知 编译器线程 任务完成( Promise 设置) .
代码如下:

FTaskResponse CompletedTaskResponse;
if (!InputNamedPipe.ReadBytes(sizeof(CompletedTaskResponse), &CompletedTaskResponse))
{
    // The named pipe was closed or had an error.
    // Instruct the write-out thread to restart the worker, then exit.
    bRestartWorker = true;
}
else
{
    // Update the last event time.
    FPlatformAtomics::InterlockedExchange(reinterpret_cast<volatile int32*>(&LastEventTime), FPlatformTime::Cycles());

    // We've read a completed task response from the controller.
    // Find the task in the map and complete the promise.
    FTask* Task;
    {
        FScopeLock Lock(TasksCS);
        Task = DispatchedTasks.FindAndRemoveChecked(CompletedTaskResponse.ID);
    }

    FDistributedBuildTaskResult Result;
    Result.ReturnCode = CompletedTaskResponse.ReturnCode;
    Result.bCompleted = true;

    Task->Promise.SetValue(Result);
    delete Task;
}

接收到任务数据后, 执行了DispatchedTasks.FindAndRemoveChecked 将任务从队列中取出, 然后调用任务的Promise设置Result, 这样编译器线程就可接受处理 。

XGConctolWorker 进程

源码参考Programs/ShaderCompilerWorker

编译Program

从模块的描述中可知, 当启用XGE时, 会将ShaderCompilerWorker复制成XGEContolWorker 因此代码相同 。

if (bUseXGEController && (Target.Platform == UnrealTargetPlatform.Win64) && Configuration == UnrealTargetConfiguration.Development)
{
    // The interception interface in XGE requires that the parent and child processes have different filenames on disk.
    // To avoid building an entire separate worker just for this, we duplicate the ShaderCompileWorker in a post build step.
    const string SrcPath  = "$(EngineDir)\\Binaries\\$(TargetPlatform)\\ShaderCompileWorker.exe";
    const string DestPath = "$(EngineDir)\\Binaries\\$(TargetPlatform)\\XGEControlWorker.exe";

    PostBuildSteps.Add(string.Format("echo Copying {0} to {1}", SrcPath, DestPath));
    PostBuildSteps.Add(string.Format("copy /Y /B \"{0}\" /B \"{1}\" >nul:", SrcPath, DestPath));

    AdditionalBuildProducts.Add(DestPath);
}

FXGEControlWorker

此类是先ContorlWorker的主要逻辑,Worker在Main中主要调用代码如下:

   FTaskTagScope Scope(ETaskTag::EGameThread);
GEngineLoop.PreInit(ArgC, ArgV, TEXT("-NOPACKAGECACHE -Multiprocess"));

if (ArgC != 3)
{
    // Invalid command line arguments.
    return 1;
}

FXGEControlWorker Instance(ArgV[2]);
if (!Instance.Init())
{
    // Failed to initialize connection with engine.
    return 2;
}

Instance.WaitForExit();

主要的逻辑是InitWaitForExit , WaitForExit逻辑就是等待子线程逻辑结束 。

FXGEControlWorker::Init

// Create the output pipe as a server...
    if (!OutputNamedPipe.Create(FString::Printf(TEXT("\\\\.\\pipe\\%s-B"), *PipeName), true, false))
        return false;

    // Connect the input pipe (engine is the server)...
    if (!InputNamedPipe.Create(FString::Printf(TEXT("\\\\.\\pipe\\%s-A"), *PipeName), false, false))
        return false;

    // Connect the output pipe (engine is the client)...
    if (!OutputNamedPipe.OpenConnection())
        return false;

    // Read the process ID of the parent xgConsole process
    uint32 XGConsoleProcID = 0;
    if (!InputNamedPipe.ReadBytes(sizeof(XGConsoleProcID), &XGConsoleProcID))
        return false;

    // Attempt to open the parent process handle
    XGConsoleProcHandle = FPlatformProcess::OpenProcess(XGConsoleProcID);
    if (!XGConsoleProcHandle.IsValid() || !FPlatformProcess::IsProcRunning(XGConsoleProcHandle))
        return false;

    // Connection successful, start the worker threads
    InputThreadFuture = Async(EAsyncExecution::Thread, [this]() { InputThreadProc(); });
    OutputThreadFuture = Async(EAsyncExecution::Thread, [this]() { OutputThreadProc(); });
    return true;

主要是建立与编译器相关的命名管道,并开启两个线程InputThreadProcOutputThreadProc

FXGEControlWorker::InputThreadProc

ReadBuffer.Reset(TotalLength);
ReadBuffer.AddUninitialized(TotalLength);

if (!InputNamedPipe.ReadBytes(TotalLength, ReadBuffer.GetData()))
    break;

FMemoryReader Reader(ReadBuffer);

FTask* Task = new FTask();
Reader << Task->ID;
Reader << Task->Executable;
Reader << Task->Arguments;

// Launch the process with normal priority.
Task->Handle = FPlatformProcess::CreateProc(*Task->Executable, *Task->Arguments, true, false, false, nullptr, 0, nullptr, nullptr);

FScopeLock Lock(CS);
CurrentTasks.Add(Task);

Input线程逻辑很简单, 就是从管道接受任务数据, 此时的任务数据已经只有执行程序和参数, 比较简单, 接受到参数后直接执行即可 。

FXGEControlWorker::OutputThreadProc

while (!bShutdown)
{
    FPlatformProcess::Sleep(0.1f);
    FScopeLock Lock(CS);

    for (auto TasksIter = CurrentTasks.CreateIterator(); TasksIter; ++TasksIter)
    {
        FTask* Task = *TasksIter;
        if (Task->Handle.IsValid() && !FPlatformProcess::IsProcRunning(Task->Handle))
        {
            // Process has completed. Remove the task from the map.
            TasksIter.RemoveCurrent();

            // Grab the process return code and close the handle
            int32 ReturnCode = 0;
            FPlatformProcess::GetProcReturnCode(Task->Handle, &ReturnCode);
            FPlatformProcess::CloseProc(Task->Handle);

            // Write the completion event to the output pipe
            WriteBuffer.Reset();
            FMemoryWriter Writer(WriteBuffer);
            Writer << Task->ID;
            Writer << ReturnCode;

            delete Task;

            if (!OutputNamedPipe.WriteBytes(WriteBuffer.Num(), WriteBuffer.GetData()))
            {
                // Writing to the pipe failed.
                bShutdown = true;
                break;
            }
        }
    }
}   

Output 线程则主要负责获取任务的执行结果, 并将结果通过管道发送给编译器。

扩展FASTBuild 加速Shader编译

以上内容描述了UE5.0EA 版本的分布式编译Shader的基本流程,由于IncreBuild授权费用不菲, 考虑使用开源工具FASTBuild来实现, 结合之前的4.24/26 版本的验证。有两个方式来扩展支持 :

  1. 类似XGEContoller方式扩展 (绿色)
  2. 类似4.26版本直接修改源码的扩展 (侵入源码)

扩展一个 FASTBuildContoller 插件

同XGEContoller 插件, 检测环境注册一个FASTBuild 插件, 在消费逻辑上替换掉调用xgConsole的逻辑, 改为使用FASTBuild 的工具 FBuild.exe , 主要需要生成 FASTBuild 的脚本。

此方式扩展性很好,可以作为插件提供给其他人使用 。

侵入代码修改

直接在Manager代码中添加类似4.26的扩展代码,增增加一个FASTBuildCompilerWorkerRunnableThread 类, 类似XGExxxx , 然后主要修改生成脚本部分 。

FASTBuild插件扩展方案实现

todo, 会开源提交到 Epic store