Distributed computing

From 22112
Revision as of 11:52, 6 March 2024 by WikiSysop (talk | contribs) (Created page with "{| width=500 style="float:right; margin-left: 10px; margin-top: -56px;" |Previous: Queueing System |Next: What affects performance |} == Material for the lesson == Video: [https://panopto.dtu.dk/Panopto/Pages/Viewer.aspx?id=a67ca717-9598-4169-9b52-af2701246519 Distributed computing]<br> Video: [https://panopto.dtu.dk/Panopto/Pages/Viewer.aspx?id=e99121a6-df37-4d45-9891-af2701243c8a Profiling and subprocess]<br> Powerpoint: [https://teaching.healthtech.dtu.dk/mat...")
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search
Previous: Queueing System Next: What affects performance

Material for the lesson

Video: Distributed computing
Video: Profiling and subprocess
Powerpoint: Distributed computing
Video: Exercises

Do midterm evaluation <-- Important to do this week - Friday latest

Exercises

Using the Queueing System to do distributed computing. Warning: The most difficult part of these exercises is actually using the Queueing System. The python code itself is fairly easy, but getting the QS to work requires patience and experience. Check the QS examples in the powerpoint from last lecture.

1)
Return to last time’s exercise; read a fasta file, find the reverse complement for each entry and count the bases in the entry putting the numbers in the header line and save it all in one file. Now solve this using the method on slide 5, i.e. distributed programming in embarrassingly parallel fashion.
Test your programs on the small scale humantest.fsa file. When ready try the real human.fsa.

You have to make several programs; the administrator, the worker and the collector.

The administrator splits up the original input fasta file into several pieces (one fasta sequence per piece) and submits a job per piece (the worker) with the relevant file piece as input. The worker which reads a file with one fasta sequence (given), computes the complement strand and base count and outputs the result to a file (given). The collector program that collects all the result pieces and put them together in the original order in one file. This you run by yourself after the worker jobs finished. The structure of the administrator is like

foreach fastasequence in inputfile
    save fastasequence in file.x
    submit job with file.x

By naming/numbering the files in some systematic way, it is easier to collect them afterwards. Realize that you can test your code without using the QS, by simply running the worker directly. Also understand that this is an exercise in using the Queueing System, not in simple programming.

In order to lighten your workload and lessen the confusion and grief the Queueing System can give you, I have made a a fairly general submit to QS python function for you to use if you wish. The command you submit is a string (which you can construct), and the command itself. The command must be a "simple" one meaning a program with options and parameters as you would write it on the command line. It does not offer the the convenience of the shell, meaning that piping, IO-redirection and wildcards does not work. Realize that a great source of error is not using full path to programs and files.

def submit(command, directory='', modules='', runtime, cores, ram, group='pr_course',
    jobscript='jobscript', output='/dev/null', error='/dev/null'):
    """
    Function to submit a job to the Queueing System - with jobscript file
    Parameters are:
    command:   The command/program you want executed together with any parameters.
               Must use full path unless the directory is given and program is there. 
    directory: Working directory - where should your program run, place of your data.
               If not specified, uses current directory.
    modules:   String of space separated modules needed for the run.
    runtime:   Time in minutes set aside for execution of the job.
    cores:     How many cores are used for the job.
    ram:       How much memory in GB is used for the job.
    group:     Accounting - which group pays for the compute.
    jobscript: Standard name for the jobscript that needs to be made.
               You should number your jobscripts if you submit more than one.
    output:    Output file of your job.
    error:     Error file of your job.
    """
    runtime = int(runtime)
    cores = int(cores)
    ram = int(ram)
    if cores > 10:
        print("Can't use more than 10 cores on a node")
        sys.exit(1)
    if ram > 120:
        print("Can't use more than 120 GB on a node")
        sys.exit(1)
    if runtime < 1:
        print("Must allocate at least 1 minute runtime")
        sys.exit(1)
    minutes = runtime % 60
    hours = int(runtime/60)
    walltime = "{:d}:{:02d}:00".format(hours, minutes)
    if directory == '':
        directory = os.getcwd()
    # Making a jobscript
    script = '#!/bin/sh\n'
    script += '#PBS -A ' + group + ' -W group_list=' + group + '\n'
    script += '#PBS -e ' + error + ' -o ' + output + '\n'
    script += '#PBS -d ' + directory + '\n'
    script += '#PBS -l nodes=1:ppn=' + str(cores) + ',mem=' + str(ram) + 'GB' + '\n'
    script += '#PBS -l walltime=' + walltime + '\n'
    if modules != '':
        script += 'module load ' + modules + '\n'
    script += command + '\n'
    if not jobscript.startswith('/'):
        jobscript = directory + '/' + jobscript
    with open(jobscript, 'wt') as jobfile:
        jobfile.write(script)
    # The submit
    job = subprocess.run(['qsub', jobscript],stdout=subprocess.PIPE, universal_newlines=True) 
    jobid = job.stdout.split('.')[0]
    return jobid

def submit2(command, directory='', modules='', runtime, cores, ram, group='pr_course',
    output='/dev/null', error='/dev/null'):
    """
    Function to submit a job to the Queueing System - without jobscript file
    Parameters are:
    command:   The command/program you want executed together with any parameters.
               Must use full path unless the directory is given and program is there. 
    directory: Working directory - where should your program run, place of your data.
               If not specified, uses current directory.
    modules:   String of space separated modules needed for the run.
    runtime:   Time in minutes set aside for execution of the job.
    cores:     How many cores are used for the job.
    ram:       How much memory in GB is used for the job.
    group:     Accounting - which group pays for the compute.
    output:    Output file of your job.
    error:     Error file of your job.
    """
    runtime = int(runtime)
    cores = int(cores)
    ram = int(ram)
    if cores > 10:
        print("Can't use more than 10 cores on a node")
        sys.exit(1)
    if ram > 120:
        print("Can't use more than 120 GB on a node")
        sys.exit(1)
    if runtime < 1:
        print("Must allocate at least 1 minute runtime")
        sys.exit(1)
    minutes = runtime % 60
    hours = int(runtime/60)
    walltime = "{:d}:{:02d}:00".format(hours, minutes)
    if directory == '':
        directory = os.getcwd()
    # Making a jobscript
    script = '#!/bin/sh\n'
    script += '#PBS -A ' + group + ' -W group_list=' + group + '\n'
    script += '#PBS -e ' + error + ' -o ' + output + '\n'
    script += '#PBS -d ' + directory + '\n'
    script += '#PBS -l nodes=1:ppn=' + str(cores) + ',mem=' + str(ram) + 'GB' + '\n'
    script += '#PBS -l walltime=' + walltime + '\n'
    if modules != '':
        script += 'module load ' + modules + '\n'
    script += command + '\n'
    # The submit
    job = subprocess.run(['qsub'], input=script, stdout=subprocess.PIPE, universal_newlines=True) 
    jobid = job.stdout.split('.')[0]
    return jobid


# Is used like
jobid = submit('myprogram myfile.fsa', directory='/home/projects/pr_course/people/pwsa',
               modules='tools anaconda3/4.0.0', jobscript='job.1',  runtime=10, cores=1, ram=2)
jobid = submit2('myprogram myfile.fsa', directory='/home/projects/pr_course/people/pwsa',
                modules='tools anaconda3/4.0.0', runtime=10, cores=1, ram=2)

You can make almost the entire program on your own laptop if you substitute the below function with above function. Below just calls the worker sequentially not using the QS. You loose all the distribution the QS offers, but avoid using computerome (and waiting for time). It is great in the development phase.

# Submit to the queueing system, runtime in minutes, ram in GB
def unix_call(command):
    job = subprocess.run(command.split()) 

# Is used like
unix_call('myprogram myfile.fsa')


2)
Make the administrator and collector into one program.

foreach fastasequence in inputfile
    save fastasequence in file.x
    submit job with file.x
wait for all jobs to finish
collect data

It is more difficult to solve this exercise - so do number 1 first. You need to find a way of waiting for your worker jobs to be done before your start collecting. However, you can see if this distributed method is faster than last week’s sequential method.

Some ideas to wait for the jobs to be done.

  • Waiting and checking for all output files to appear. Cons; If a worker job breaks during execution you wait forever since the output file does not appear. If the output file is big, The worker might not have finished writing to it before collection starts. A trick to avoid this is for the worker to make an extra empty file at the end of the job and check for the presence of that file. An alternative is to write the file using a temporary filename, and rename the file to the correct name, when done writing.
  • Using showq/qstat to check that the jobs are gone from the queue. You need to find a way to recognize your jobs. That can be to name them or get the jobid from when you submit the job or perhaps just show your jobs (not everybody’s). When the list is empty, you are done. Cons; If you submit fast enough, the jobs might not have had time enough to show up in the queue, misleading you to think you are done. If you just use the ”your own jobs” method, you can only run one main job at a time, i.e. compute on one project only.
  • Using showq/qstat to show completed jobs. When your jobs are in the list, they are done. Cons; Jobs are only in the completed queue for 2 hours. So you need hourly checking and remembering what is finished.
  • Submit the collector being dependent on all worker jobs finishing OK. You probably need to look up details on qsub –W option before you can do this. This may not work if your jobs completes so quickly, that they are finished when the collector is queued.