Multithreading in a Large, Open World Space Game

Multithreading in a Large, Open World Space Game

This blog post is by Stephanie Rancourt. After a successful career in AAA engine and game development, she left to work on her own game, Deep Space Settlement (D.S.S.). This one-woman engine team built a 250,000 line engine for a vast and impressive space game in a matter of a few years. She inspires me greatly. Please follow her on Twitter at @StephanieRct and buy her game at http://dss.stephanierct.com/ .

In D.S.S. I have to handle tons and tons of ships and projectiles in many star-systems. Consequently, if I want a decent framerate I'll have to multi-process all this to take advantage of new multi-core CPUs.

So without further ado, here is how I did it. It's actually a pretty standard way to do it. It's just the simplest way I could think to implement it. I'll be using common primitive like mutex, event and thread so I won't go into details for these classes. I'm using FastDelegate library as well cause I just like it a lot.

The model is basically: queue a bunch of tasks (or jobs) and process them with several threads. The magic happens in 3 classes: JobQueue, MPWorker and MultiProcess.

JobQueue:
This one synchronizes job addition and retrieving. It really just acts as a synchronization primitive.

MPWorker:
This guy process job from the JobQueue. There are usually several of them spawned at initialization time. They literally fight each other to get and execute jobs. They are pretty good workers indeed.

MultiProcess just ties everything together.

So here goes a wall of code:

JobQueue:

//This job queue is implemented with a mutex for simplicity. A lock free queue would yield better performances.
class JobQueue{
public:
    JobQueue()
    : muiNbWorkingJob(0)
    , muiMaxSimultaneous(0){}
    void addJob(const fastdelegate::FastDelegate0<>& a){
        Mutex::Lock lock(mJobMutex);
        mAllDone.reset();
        mJobs.push_back(a);
    }
    fastdelegate::FastDelegate0<> retrieveNextJob(){
        Mutex::Lock lock(mJobMutex);
        if(mJobs.size()>0){
            fastdelegate::FastDelegate0<> job = mJobs.front();
            mJobs.pop_front();
            _InterlockedIncrement(&muiNbWorkingJob);
            updateMaxSimultanious();
            return job;
        } else{
            return nullptr;
        }
    }
    // called when a job is done
    void markJobDone(fastdelegate::FastDelegate0<>& a){
        Mutex::Lock lock(mJobMutex);
        // _InterlockedDecrement is used to force updating local variable
        // with up-to-date variable changed by other thread
        // a volatile pointer would probably work as well.
        uint32 uiNbWorking = _InterlockedDecrement(&muiNbWorkingJob);
        if(uiNbWorking == 0 && mJobs.size()==0 ){
            mAllDone.raise();
        }
    }
    // waits until all job are done
    void waitForAllDone(){
        mAllDone.wait();
    }
    uint32 getMaxSimultanious()const {return muiMaxSimultaneous;}
    void resetMaxSimultanious(){muiMaxSimultaneous=0;}
private:
    // prevent copying this class
    JobQueue(const JobQueue&);
    JobQueue& operator=(const JobQueue&);
    void updateMaxSimultanious(){
        if(muiNbWorkingJob>muiMaxSimultaneous) muiMaxSimultaneous=muiNbWorkingJob;
    }
    std::list< fastdelegate::FastDelegate0<> > mJobs;
    Mutex mJobMutex;
    long muiNbWorkingJob; // number of job currently executing at the moment
    Event mAllDone; // Event raised when all jobs have been processed
    uint32 muiMaxSimultaneous; // max number of simultaneous worker working at the same time. use for statistics
};

MPWorker:

class MPWorker{
public:
    MPWorker(uint32 auiId, JobQueue& aJq)
        :muiId(auiId)
        ,mJobQueue(aJq) {
        mpThread = new Thread(fastdelegate::MakeDelegate(this,& MPWorker::workerFunc));
        mpThread->start();
    }
    ~MPWorker(){
        delete mpThread;
    }
    void workerFunc(){
        while(1){
            fastdelegate::FastDelegate0<> job = mJobQueue.retrieveNextJob();
            if(job){
                job();
                mJobQueue.markJobDone(job);
            }
        }
    }
    uint32 muiId; // don't really need this, it is just for debug purpose
    Thread * mpThread;
    JobQueue& mJobQueue; // the queue from which to get jobs
private:
    MPWorker(const MPWorker& a);
    MPWorker& operator=(const MPWorker& a);
};

MultiProcess:

class MultiProcess{
public:
    MultiProcess()
        :mbWorkInSingleProcess(true){}
    // initialize system with number of desired workers
    void init(uint32 auiNbWroker){
        if(auiNbWroker==0){
            mbWorkInSingleProcess=true;
        } else {
            mbWorkInSingleProcess=false;
            for(uint32 i = 0; i < auiNbWroker; ++i){
                mWorkers.push_back( new MPWorker(i, mJobQueue));
            }
        }
    }
    ~MultiProcess(){
        for(uint32 i = 0; i < mWorkers.size(); ++i){
            delete mWorkers[i];
        }
    }
    void addJob(const fastdelegate::FastDelegate0<>& aJob){
        if(mbWorkInSingleProcess){
            aJob();
        } else {
            mJobQueue.addJob(aJob);
        }
    }

    // called when the user want all jobs to be processed
    void flush(){
        fastdelegate::FastDelegate0<> job = mJobQueue.retrieveNextJob();
        while(job){
            job();
            mJobQueue.markJobDone(job);
            job = mJobQueue.retrieveNextJob();
        }
        if(!mbWorkInSingleProcess)
            mJobQueue.waitForAllDone();
    }
    // some stats
    uint32 getMaxSimultaniousWorker()const {return mJobQueue.getMaxSimultanious();}
    void resetMaxSimultaniousWorker(){mJobQueue.resetMaxSimultanious();}
private:
    JobQueue mJobQueue;
    std::vector< MPWorker* > mWorkers;
    bool mbWorkInSingleProcess; // if work in single process, job are executed when added. otherwise they are put in the job queue.
    //prevent copying this class
    MultiProcess(const MultiProcess&a);
    MultiProcess& operator=(const MultiProcess&a);
};

Now to use it, do something like:

class MPUser{
public:
    void expensiveJob(){
        float fValue=0;
        for(uint32 i = 0; i < 50000; ++i){
            fValue+=rand()/(float) RAND_MAX;
        }
        printf("", fValue);
    }
    void use(){
        MultiProcess mp;
        mp.init(3);
        for(uint32 i = 0; i < 100; ++i){
            mp.addJob(fastdelegate::MakeDelegate(this, &MPUser::expensiveJob));
        }
        mp.flush();
    }
};

If you don't know how much worker you should spawn, I suggest using the CPU number of processor - 1

    SYSTEM_INFO sysinfo;
    GetSystemInfo( &sysinfo );
    uint32 nbWorker = sysinfo.dwNumberOfProcessors-1;

Be aware that if your jobs are too small, the overhead of adding and getting job will be quite high. For instance, in D.S.S., I update whole star-systems in a job. Updating each ships with its own jobs would be terrible. Also, as ships usually interact only with other ships in the same star-system, I don't have a lot of synchronization to handle.

Here's a screenshot of the multiprocessing at work. It's running on a 6-core HT Intel CPU, there are 11 worker threads updating 100 star-system in this galaxy. Each "CLJob" box is a system being updated.

On the Software Engineering Interview

Deriving Stochastic Processes from Fractional Calculus Equations