背景
最近的工程Shader编译时间太长, 之前4.24测试过Shader可以使用FASTBuild编译, 考虑利用下闲置机器算力增加编译效率 。
查看代码时候看的是UE5EA版本
的代码, 之前用4.24,4.26版本代码是与此不同的 。
Shader编译流程
FShaderCompilingManager
是管理Shader编译的对象, 虚幻编辑器会在启动后(PreInit
)构造该对象,构造函数中会检测平台特性,如果有分布式编译工具可用,那么就会启动编译线程 。
Shader编译器线程
编译器线程继承此类FShaderCompileThreadRunnableBase
, 实现CompilingLoop
方法执行编译逻辑。
CompilingLoop
是编译器线程中持续运行的函数, 运行一次, 则消费一次Manager->AllJobs.GetPendingJobs()
获取的任务, 然后执行批量编译分派给具体的编译器(分布式), 等待编译完成后, 调用Manager->ProcessFinishedJob
, 如果编译失败则调用本地的编译器编译 FShaderCompileUtilities::ExecuteShaderCompileJob
。
分布式Shader编译框架 FShaderCompileDistributedThreadRunnable_Interface
在UE5的版本中(不知道什么时候加入的), 实现了一个FShaderCompileDistributedThreadRunnable_Interface
类, 该类封装了分布式编译的框架 。
将编译控制逻辑抽象成 IDistributedBuildController
接口来实现 。
从而将XGE (IncreBuild) , FASTBuild 等第三方分布式工具从引擎核心源码(原来是Runtime/Engine/ShaderCompiler)中可以分离,通过ModularFeature
注册的方式可以由第三方插件来实现 。
因此, XGE的Shader编译, 在UE5中也是独立了一个Plugin 叫做 XGEController
。
消费获取编译任务
代码在 ShaderCompilerDistributed.cpp 中有实现
TArray<FShaderCommonCompileJobPtr> PendingJobs;
//if (LIKELY(!bIsHung)) // stop accepting jobs if we're hung - TODO: re-enable this after lockup detection logic is proved reliable and/or we have job resubmission in place
{
for (int32 PriorityIndex = MaxPriorityIndex; PriorityIndex >= MinPriorityIndex; --PriorityIndex)
{
// Grab as many jobs from the job queue as we can
const EShaderCompileJobPriority Priority = (EShaderCompileJobPriority)PriorityIndex;
const int32 MinBatchSize = (Priority == EShaderCompileJobPriority::Low) ? 1 : DistributedShaderCompilerVariables::MinBatchSize;
const int32 NumJobs = Manager->AllJobs.GetPendingJobs(EShaderCompilerWorkerType::XGE, Priority, MinBatchSize, INT32_MAX, PendingJobs);
if (NumJobs > 0)
{
UE_LOG(LogShaderCompilers, Display, TEXT("Started %d 'XGE' shader compile jobs with '%s' priority"),
NumJobs,
ShaderCompileJobPriorityToString((EShaderCompileJobPriority)PriorityIndex));
}
if (PendingJobs.Num() >= DistributedShaderCompilerVariables::MinBatchSize)
{
break;
}
}
}
这段代码将按照优先级从高到低取出FShaderCommonCompileJobPtr
, 然后按照ShaderType 分类做了Batch处理,
通过DispatchShaderCompileJobsBatch
分发给具体实现编译的Controler 编译 。
分派任务 DispatchShaderCompileJobsBatch
DispatchShaderCompileJobsBatch
函数是实现将一批任务打包处理封装成 FDistributedShaderCompilerTask
提交给
等待Contoller 处理。
FTaskCommandData TaskCommandData;
TaskCommandData.Command = Manager->ShaderCompileWorkerName;
TaskCommandData.CommandArgs = WorkerParameters;
TaskCommandData.InputFileName = InputFilePath;
TaskCommandData.Dependencies = GetDependencyFilesForJobs(JobsToSerialize);
DispatchedTasks.Add(
new FDistributedShaderCompilerTask(
CachedController.EnqueueTask(TaskCommandData),
MoveTemp(JobsToSerialize),
MoveTemp(InputFilePath),
MoveTemp(OutputFilePath)
)
);
其中使用到了 FShaderCompileUtilities::DoWriteTasks
将任务组打包到输入文件,结果需要输出到输出文件。
输入和输出文件由 Controller 提供方法生成, 只需要每次调用生成唯一路径即可 。
任务的数据类型是 FTaskCommandData
, Controller 需要实现方法CachedController.EnqueueTask
将任务写入到自身的队列中消费,
并且返回一个 Future 用来通知任务完成 ,后面的流程会便利所有Future 来查询完成状态。
任务结果提交
for (auto Iter = DispatchedTasks.CreateIterator(); Iter; ++Iter)
{
bool bOutputFileReadFailed = true;
FDistributedShaderCompilerTask* Task = *Iter;
if (!Task->Future.IsReady())
{
continue;
}
FDistributedBuildTaskResult Result = Task->Future.Get();
NumDispatchedJobs -= Task->ShaderJobs.Num();
LastTimeTaskCompleted = FPlatformTime::Seconds();
if (Result.ReturnCode != 0)
{
UE_LOG(LogShaderCompilers, Error, TEXT("Shader compiler returned a non-zero error code (%d)."), Result.ReturnCode);
}
if (Result.bCompleted)
{
// Check the output file exists. If it does, attempt to open it and serialize in the completed jobs.
if (IFileManager::Get().FileExists(*Task->OutputFilePath))
{
FArchive* OutputFileAr = IFileManager::Get().CreateFileReader(*Task->OutputFilePath, FILEREAD_Silent);
if (OutputFileAr)
{
bOutputFileReadFailed = false;
FShaderCompileUtilities::DoReadTaskResults(Task->ShaderJobs, *OutputFileAr);
delete OutputFileAr;
}
}
if (bOutputFileReadFailed)
{
// Reading result from XGE job failed, so recompile shaders in current job batch locally
UE_LOG(LogShaderCompilers, Log, TEXT("Rescheduling shader compilation to run locally after XGE job failed: %s"), *Task->OutputFilePath);
for (FShaderCommonCompileJobPtr Job : Task->ShaderJobs)
{
FShaderCompileUtilities::ExecuteShaderCompileJob(*Job);
}
}
// Enter the critical section so we can access the input and output queues
{
FScopeLock Lock(&Manager->CompileQueueSection);
for (const auto& Job : Task->ShaderJobs)
{
Manager->ProcessFinishedJob(Job);
}
}
}
}
当查询到任务完成时 FShaderCompileUtilities::DoReadTaskResults
来从OutputFile 中读取任务结果, 如果OutputFile 不存在,表示任务失败, 那么就使用本地编译 FShaderCompileUtilities::ExecuteShaderCompileJob
, 否则通过读取的结果Manager->ProcessFinishedJob(Job)
标记所有完成的Shader编译。
FXGEControllerModule 实现XGE 分布式编译
FXGEControllerModule实现了IDistributedBuildController
接口, 作为独立插件,在引擎启动时注册了自身IModularFeatures::Get().RegisterModularFeature(GetModularFeatureType(), this);
, 因此 ShaderCompilerManager可以通过ModularFeature
取出可用的Contoller, 判定Contoller可用时调用InitializeController
, Contoller 启动后会开启一个异步任务(线程)执行 WriteOutThreadProc
。
WriteOutThreadProc
FString XGConsoleArgs = FString::Printf(TEXT("/VIRTUALIZEDIRECTX /allowremote=\"%s\" %s /allowintercept=\"%s\" /title=\"Unreal Engine FASTBuild Tasks\" /monitordirs=\"%s\" /command=\"%s -FASTBuildController %s\""),
XGE_INTERCEPT_EXE_NAMES,
FASTBuildController::AvoidUsingLocalMachine() ? TEXT("/avoidlocal=ON") : TEXT(""),
XGE_CONTROL_WORKER_NAME,
*WorkingDirectory,
XGE_CONTROL_WORKER_FILENAME,
*PipeName);
// Create the output pipe as a server...
if (!OutputNamedPipe.Create(FString::Printf(TEXT("\\\\.\\pipe\\%s-A"), *PipeName), true, false))
{
UE_LOG(LogFASTBuildController, Fatal, TEXT("Failed to create the output FASTBuild named pipe."));
}
const int32 PriorityModifier = 0; // normal by default. Interactive use case shouldn't be affected as the jobs will avoid local machine
// Start the controller process
uint32 XGConsoleProcID = 0;
UE_LOG(LogFASTBuildController, Verbose, TEXT("Launching xgConsole"));
BuildProcessHandle = FPlatformProcess::CreateProc(*XGConsolePath, *XGConsoleArgs, false, false, true, &XGConsoleProcID, PriorityModifier, *ControlWorkerDirectory, nullptr);
if (!BuildProcessHandle.IsValid())
{
UE_LOG(LogFASTBuildController, Fatal, TEXT("Failed to launch the FASTBuild control worker process."));
}
// If the engine crashes, we don't get a chance to kill the build process.
// Start up the build monitor process to monitor for engine crashes.
uint32 BuildMonitorProcessID;
UE_LOG(LogFASTBuildController, Verbose, TEXT("Launching FASTBuildController to fan out tasks"));
FString XGMonitorArgs = FString::Printf(TEXT("-xgemonitor %d %d"), FPlatformProcess::GetCurrentProcessId(), XGConsoleProcID);
FProcHandle BuildMonitorHandle = FPlatformProcess::CreateProc(*GetControlWorkerExePath(), *XGMonitorArgs, true, false, false, &BuildMonitorProcessID, PriorityModifier, nullptr, nullptr);
FPlatformProcess::CloseProc(BuildMonitorHandle);
// Wait for the controller to connect to the output pipe
if (!OutputNamedPipe.OpenConnection())
{
UE_LOG(LogFASTBuildController, Fatal, TEXT("Failed to open a connection on the output FASTBuild named pipe."));
}
// Connect the input pipe (controller is the server)...
if (!InputNamedPipe.Create(FString::Printf(TEXT("\\\\.\\pipe\\%s-B"), *PipeName), false, false))
{
UE_LOG(LogFASTBuildController, Fatal, TEXT("Failed to connect the input FASTBuild named pipe."));
}
// Pass the xgConsole process ID to the FASTBuild control worker, so it can terminate the build on exit
if (!OutputNamedPipe.WriteBytes(sizeof(XGConsoleProcID), &XGConsoleProcID))
{
UE_LOG(LogFASTBuildController, Fatal, TEXT("Failed to pass xgConsole process ID to FASTBuild control worker."));
}
LastEventTime = FPlatformTime::Cycles();
// Launch the output thread
ReadBackThreadFuture = Async(EAsyncExecution::Thread, [this]() { ReadBackThreadProc(); });
...
while (true)
{
...
// Take one task from the pending queue.
FTask* Task = nullptr;
{
FScopeLock Lock(TasksCS);
PendingTasks.Dequeue(Task);
}
if (Task)
{
WriteBuffer.Reset();
WriteBuffer.AddUninitialized(sizeof(uint32));
FMemoryWriter Writer(WriteBuffer, false, true);
Writer << Task->ID;
Writer << Task->CommandData.Command;
Writer << Task->CommandData.CommandArgs;
*reinterpret_cast<uint32*>(WriteBuffer.GetData()) = WriteBuffer.Num() - sizeof(uint32);
// Move the tasks to the dispatched tasks map before launching it
{
FScopeLock Lock(TasksCS);
DispatchedTasks.Add(Task->ID, Task);
}
if (!OutputNamedPipe.WriteBytes(WriteBuffer.Num(), WriteBuffer.GetData()))
{
// Error occurred whilst writing task args to the named pipe.
// It's likely the controller process was terminated.
bRestartWorker = true;
}
// Update the last event time.
FPlatformAtomics::InterlockedExchange(reinterpret_cast<volatile int32*>(&LastEventTime), FPlatformTime::Cycles());
}
}
这个函数会启动一个程序XGConsole (IncreBuild 提供 xgConsole.exe),传入管道名字和路径相关,并启动另一个XGControlWorker用来监控当引擎崩溃时,关闭XGConsole的任务 。
在线程中建立了WriteOut 管道(XXXX-A)和ReadIn管道 (XXXXX-B), 启动后立即发送Console的进程ID, 然后再启动另一个异步任务,执行ReadBackThreadProc
, 之后则从PendingTask 取出任务, 热案后将任务数据直接写入传递给 XGConsole 进程。
ReadBackThreadProc 逻辑
可以想见此函数主要目的是从 XGConsole 的READ管道接受任务结果, 然后通知 编译器线程 任务完成( Promise 设置) .
代码如下:
FTaskResponse CompletedTaskResponse;
if (!InputNamedPipe.ReadBytes(sizeof(CompletedTaskResponse), &CompletedTaskResponse))
{
// The named pipe was closed or had an error.
// Instruct the write-out thread to restart the worker, then exit.
bRestartWorker = true;
}
else
{
// Update the last event time.
FPlatformAtomics::InterlockedExchange(reinterpret_cast<volatile int32*>(&LastEventTime), FPlatformTime::Cycles());
// We've read a completed task response from the controller.
// Find the task in the map and complete the promise.
FTask* Task;
{
FScopeLock Lock(TasksCS);
Task = DispatchedTasks.FindAndRemoveChecked(CompletedTaskResponse.ID);
}
FDistributedBuildTaskResult Result;
Result.ReturnCode = CompletedTaskResponse.ReturnCode;
Result.bCompleted = true;
Task->Promise.SetValue(Result);
delete Task;
}
接收到任务数据后, 执行了DispatchedTasks.FindAndRemoveChecked
将任务从队列中取出, 然后调用任务的Promise设置Result, 这样编译器线程就可接受处理 。
XGConctolWorker 进程
源码参考Programs/ShaderCompilerWorker
编译Program
从模块的描述中可知, 当启用XGE时, 会将ShaderCompilerWorker复制成XGEContolWorker 因此代码相同 。
if (bUseXGEController && (Target.Platform == UnrealTargetPlatform.Win64) && Configuration == UnrealTargetConfiguration.Development)
{
// The interception interface in XGE requires that the parent and child processes have different filenames on disk.
// To avoid building an entire separate worker just for this, we duplicate the ShaderCompileWorker in a post build step.
const string SrcPath = "$(EngineDir)\\Binaries\\$(TargetPlatform)\\ShaderCompileWorker.exe";
const string DestPath = "$(EngineDir)\\Binaries\\$(TargetPlatform)\\XGEControlWorker.exe";
PostBuildSteps.Add(string.Format("echo Copying {0} to {1}", SrcPath, DestPath));
PostBuildSteps.Add(string.Format("copy /Y /B \"{0}\" /B \"{1}\" >nul:", SrcPath, DestPath));
AdditionalBuildProducts.Add(DestPath);
}
FXGEControlWorker
此类是先ContorlWorker的主要逻辑,Worker在Main中主要调用代码如下:
FTaskTagScope Scope(ETaskTag::EGameThread);
GEngineLoop.PreInit(ArgC, ArgV, TEXT("-NOPACKAGECACHE -Multiprocess"));
if (ArgC != 3)
{
// Invalid command line arguments.
return 1;
}
FXGEControlWorker Instance(ArgV[2]);
if (!Instance.Init())
{
// Failed to initialize connection with engine.
return 2;
}
Instance.WaitForExit();
主要的逻辑是Init
和 WaitForExit
, WaitForExit逻辑就是等待子线程逻辑结束 。
FXGEControlWorker::Init
// Create the output pipe as a server...
if (!OutputNamedPipe.Create(FString::Printf(TEXT("\\\\.\\pipe\\%s-B"), *PipeName), true, false))
return false;
// Connect the input pipe (engine is the server)...
if (!InputNamedPipe.Create(FString::Printf(TEXT("\\\\.\\pipe\\%s-A"), *PipeName), false, false))
return false;
// Connect the output pipe (engine is the client)...
if (!OutputNamedPipe.OpenConnection())
return false;
// Read the process ID of the parent xgConsole process
uint32 XGConsoleProcID = 0;
if (!InputNamedPipe.ReadBytes(sizeof(XGConsoleProcID), &XGConsoleProcID))
return false;
// Attempt to open the parent process handle
XGConsoleProcHandle = FPlatformProcess::OpenProcess(XGConsoleProcID);
if (!XGConsoleProcHandle.IsValid() || !FPlatformProcess::IsProcRunning(XGConsoleProcHandle))
return false;
// Connection successful, start the worker threads
InputThreadFuture = Async(EAsyncExecution::Thread, [this]() { InputThreadProc(); });
OutputThreadFuture = Async(EAsyncExecution::Thread, [this]() { OutputThreadProc(); });
return true;
主要是建立与编译器相关的命名管道,并开启两个线程InputThreadProc
和 OutputThreadProc
FXGEControlWorker::InputThreadProc
ReadBuffer.Reset(TotalLength);
ReadBuffer.AddUninitialized(TotalLength);
if (!InputNamedPipe.ReadBytes(TotalLength, ReadBuffer.GetData()))
break;
FMemoryReader Reader(ReadBuffer);
FTask* Task = new FTask();
Reader << Task->ID;
Reader << Task->Executable;
Reader << Task->Arguments;
// Launch the process with normal priority.
Task->Handle = FPlatformProcess::CreateProc(*Task->Executable, *Task->Arguments, true, false, false, nullptr, 0, nullptr, nullptr);
FScopeLock Lock(CS);
CurrentTasks.Add(Task);
Input线程逻辑很简单, 就是从管道接受任务数据, 此时的任务数据已经只有执行程序和参数, 比较简单, 接受到参数后直接执行即可 。
FXGEControlWorker::OutputThreadProc
while (!bShutdown)
{
FPlatformProcess::Sleep(0.1f);
FScopeLock Lock(CS);
for (auto TasksIter = CurrentTasks.CreateIterator(); TasksIter; ++TasksIter)
{
FTask* Task = *TasksIter;
if (Task->Handle.IsValid() && !FPlatformProcess::IsProcRunning(Task->Handle))
{
// Process has completed. Remove the task from the map.
TasksIter.RemoveCurrent();
// Grab the process return code and close the handle
int32 ReturnCode = 0;
FPlatformProcess::GetProcReturnCode(Task->Handle, &ReturnCode);
FPlatformProcess::CloseProc(Task->Handle);
// Write the completion event to the output pipe
WriteBuffer.Reset();
FMemoryWriter Writer(WriteBuffer);
Writer << Task->ID;
Writer << ReturnCode;
delete Task;
if (!OutputNamedPipe.WriteBytes(WriteBuffer.Num(), WriteBuffer.GetData()))
{
// Writing to the pipe failed.
bShutdown = true;
break;
}
}
}
}
Output 线程则主要负责获取任务的执行结果, 并将结果通过管道发送给编译器。
扩展FASTBuild 加速Shader编译
以上内容描述了UE5.0EA 版本的分布式编译Shader的基本流程,由于IncreBuild授权费用不菲, 考虑使用开源工具FASTBuild来实现, 结合之前的4.24/26 版本的验证。有两个方式来扩展支持 :
- 类似XGEContoller方式扩展 (绿色)
- 类似4.26版本直接修改源码的扩展 (侵入源码)
扩展一个 FASTBuildContoller 插件
同XGEContoller 插件, 检测环境注册一个FASTBuild 插件, 在消费逻辑上替换掉调用xgConsole的逻辑, 改为使用FASTBuild 的工具 FBuild.exe
, 主要需要生成 FASTBuild 的脚本。
此方式扩展性很好,可以作为插件提供给其他人使用 。
侵入代码修改
直接在Manager代码中添加类似4.26的扩展代码,增增加一个FASTBuildCompilerWorkerRunnableThread 类, 类似XGExxxx , 然后主要修改生成脚本部分 。
FASTBuild插件扩展方案实现
todo, 会开源提交到 Epic store