Changeset 959:31aef7f8fff9

Show
Ignore:
Timestamp:
02/14/08 15:00:39 (1 year ago)
Author:
Nate Coraor <nate@bx.psu.edu>
branch:
default
convert_revision:
svn:9bcadc22-80f8-0310-8a53-c8f022958886/galaxy/trunk@2315
Message:

Some job runner changes, tools may now be run with a defined job
runner. Tools can choose which PBS server/queue to run on. See
changes to the ini file for hints on how this is done.

Some of the PBS/cluster options have changed, be sure to change these
when updating test/main!

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • lib/galaxy/config.py

    r883 r959  
    4848        self.error_email_to = kwargs.get( 'error_email_to', None ) 
    4949        self.smtp_server = kwargs.get( 'smtp_server', None ) 
    50         self.use_pbs = kwargs.get('use_pbs', False ) 
    51         self.pbs_server = kwargs.get('pbs_server', "" ) 
    52         self.pbs_instance_path = kwargs.get('pbs_instance_path', os.getcwd() ) 
     50        self.start_job_runners = kwargs.get( 'start_job_runners', None ) 
     51        self.default_cluster_job_runner = kwargs.get( 'default_cluster_job_runner', 'local:///' ) 
    5352        self.pbs_application_server = kwargs.get('pbs_application_server', "" ) 
    5453        self.pbs_dataset_server = kwargs.get('pbs_dataset_server', "" ) 
     
    6867        if global_conf and "__file__" in global_conf: 
    6968            global_conf_parser.read(global_conf['__file__']) 
     69        #Store per-tool runner config 
     70        try: 
     71            self.tool_runners = global_conf_parser.items("galaxy:tool_runners") 
     72        except ConfigParser.NoSectionError: 
     73            self.tool_runners = [] 
    7074        #Store datatypes config 
    7175        try: 
  • lib/galaxy/jobs/__init__.py

    r958 r959  
    118118                pbs_job_state.job_file = "%s/database/pbs/%s.sh" % (os.getcwd(), j.id) 
    119119                pbs_job_state.job_id = str( j.job_runner_external_id ) 
     120                pbs_job_state.runner_url = job_wrapper.tool.job_runner 
    120121                job_wrapper.command_line = j.command_line 
    121122                pbs_job_state.job_wrapper = job_wrapper 
     
    125126                    pbs_job_state.old_state = 'R' 
    126127                    pbs_job_state.running = True 
    127                     self.dispatcher.pbs_job_runner.queue.put( pbs_job_state ) 
     128                    self.dispatcher.job_runners["pbs"].queue.put( pbs_job_state ) 
    128129                elif j.state == model.Job.states.QUEUED: 
    129130                    # The job was in PBS but not yet running 
     
    132133                        pbs_job_state.old_state = 'Q' 
    133134                        pbs_job_state.running = False 
    134                         self.dispatcher.pbs_job_runner.queue.put( pbs_job_state ) 
     135                        self.dispatcher.job_runners["pbs"].queue.put( pbs_job_state ) 
    135136                    # The job had reached the pbs queue_job method, but not the PBS queue itself. 
    136137                    else: 
     
    363364        job.flush() 
    364365 
    365     def set_runner( self, runner_name, external_id ): 
     366    def set_runner( self, runner_url, external_id ): 
    366367        job = model.Job.get( self.job_id ) 
    367368        job.refresh() 
    368         job.job_runner_name = runner_name 
     369        job.job_runner_name = runner_url 
    369370        job.job_runner_external_id = external_id 
    370371        job.flush() 
     
    456457    def cleanup( self ): 
    457458        # remove temporary files 
    458         for fname in self.extra_filenames:  
    459             os.remove( fname ) 
    460         if self.working_directory is not None: 
    461             os.rmdir( self.working_directory )  
     459        try: 
     460            for fname in self.extra_filenames:  
     461                os.remove( fname ) 
     462            if self.working_directory is not None: 
     463                os.rmdir( self.working_directory )  
     464        except: 
     465            log.exception( "Unable to cleanup job %s" % self.job_id ) 
    462466         
    463467    def get_command_line( self ): 
     
    488492    def __init__( self, app ): 
    489493        self.app = app 
    490         self.use_pbs = asbool( app.config.use_pbs ) 
    491         import runners.local 
    492         self.local_job_runner = runners.local.LocalJobRunner( app ) 
    493         if self.use_pbs: 
    494             import runners.pbs 
    495             self.pbs_job_runner = runners.pbs.PBSJobRunner( app ) 
    496             self.put = self.dispatch_pbs 
    497         else: 
    498             self.put = self.dispatch_default 
     494        self.job_runners = {} 
     495        start_job_runners = ["local"] 
     496        if app.config.start_job_runners is not None: 
     497            start_job_runners.extend( app.config.start_job_runners.split(",") ) 
     498        for runner_name in start_job_runners: 
     499            if runner_name == "local": 
     500                import runners.local 
     501                self.job_runners[runner_name] = runners.local.LocalJobRunner( app ) 
     502            elif runner_name == "pbs": 
     503                import runners.pbs 
     504                self.job_runners[runner_name] = runners.pbs.PBSJobRunner( app ) 
     505            else: 
     506                log.error( "Unable to start unknown job runner: %s" %runner_name ) 
    499507             
    500     def dispatch_default( self, job_wrapper ): 
    501         self.local_job_runner.put( job_wrapper ) 
    502         log.debug( "dispatch_default(): dispatching job %d to local runner" %job_wrapper.job_id ) 
    503              
    504     def dispatch_pbs( self, job_wrapper ): 
    505         # command_line = job_wrapper.get_command_line() 
    506         # HACK: Need a more robust way for tools to assert whether they should 
    507         #       be run on the cluster. 
    508         command_line = job_wrapper.tool.command 
    509         if ( not command_line ) or ( "/tools/data_source" in command_line ): 
    510             log.debug( "dispatching job %d to local runner" %job_wrapper.job_id ) 
    511             self.local_job_runner.put( job_wrapper ) 
    512         else: 
    513             self.pbs_job_runner.put( job_wrapper ) 
    514             log.debug( "dispatch_pbs(): dispatching job %d to pbs runner" %job_wrapper.job_id ) 
    515          
     508    def put( self, job_wrapper ): 
     509        runner_name = ( job_wrapper.tool.job_runner.split(":", 1) )[0] 
     510        log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) 
     511        self.job_runners[runner_name].put( job_wrapper ) 
     512 
    516513    def shutdown( self ): 
    517         self.local_job_runner.shutdown() 
    518         if self.use_pbs: 
    519             self.pbs_job_runner.shutdown()   
     514        for runner in self.job_runners.itervalues(): 
     515            runner.shutdown() 
  • lib/galaxy/jobs/runners/pbs.py

    r958 r959  
    5353        self.ofile = None 
    5454        self.efile = None 
     55        self.runner_url = None 
    5556 
    5657class PBSJobRunner( object ): 
     
    7273        self.watched = [] 
    7374        self.queue = Queue() 
    74         self.determine_pbs_server() 
     75        self.default_pbs_server = None 
     76        self.determine_pbs_server( "pbs:///" ) 
    7577        self.monitor_thread = threading.Thread( target=self.monitor ) 
    7678        self.monitor_thread.start() 
    7779        log.debug( "ready" ) 
    7880 
    79     def determine_pbs_server( self ): 
     81    def determine_pbs_server( self, url ): 
    8082        """Determine what PBS server we are connecting to""" 
    81         if self.app.config.pbs_server: 
    82             self.pbs_server = self.app.config.pbs_server 
    83         else: 
    84             self.pbs_server = pbs.pbs_default() 
    85         if self.pbs_server is None: 
     83        url_split = url.split("/") 
     84        server = url_split[2] 
     85        if server == "": 
     86            if not self.default_pbs_server: 
     87                server = pbs.pbs_default() 
     88            else: 
     89                server = self.default_pbs_server 
     90        if server is None: 
    8691            raise Exception( "Could not find torque server" ) 
     92        return server 
     93 
     94    def determine_pbs_queue( self, url ): 
     95        """Determine what PBS queue we are submitting to""" 
     96        url_split = url.split("/") 
     97        queue = url_split[3] 
     98        if queue == "": 
     99            # None = server's default queue 
     100            queue = None 
     101        return queue 
    87102 
    88103    def queue_job( self, job_wrapper ): 
     
    90105        job_wrapper.prepare() 
    91106        command_line = job_wrapper.get_command_line() 
     107        runner_url = job_wrapper.tool.job_runner 
    92108         
    93109        # This is silly, why would we queue a job with no command line? 
     
    98114        job_wrapper.change_state( 'queued' ) 
    99115         
    100         conn = pbs.pbs_connect( self.pbs_server ) 
     116        pbs_server = self.determine_pbs_server( runner_url ) 
     117        pbs_queue = self.determine_pbs_queue( runner_url ) 
     118        conn = pbs.pbs_connect( pbs_server ) 
    101119        if conn <= 0: 
    102120            raise Exception( "Connection to PBS server for submit failed" ) 
     
    106124        efile = "%s/database/pbs/%s.e" % (os.getcwd(), job_wrapper.job_id) 
    107125 
    108         # to get returned via scp/rcp we need this 
     126        # If an application server is set, we're staging 
    109127        if self.app.config.pbs_application_server: 
    110             # separate vars because we need un-munged ofile/efile for collection/cleanup 
    111128            pbs_ofile = self.app.config.pbs_application_server + ':' + ofile 
    112129            pbs_efile = self.app.config.pbs_application_server + ':' + efile 
     
    124141            job_attrs[4].name = pbs.ATTR_N 
    125142            job_attrs[4].value = "%s" % job_wrapper.job_id 
    126             exec_dir = self.app.config.pbs_instance_path 
    127             # FIXME: pbs_instance_path is broken until we munge PATH and PYTHONPATH here 
     143            exec_dir = os.path.abspath( os.getcwd() ) 
     144        # If not, we're using NFS 
    128145        else: 
    129146            job_attrs = pbs.new_attropl(2) 
     
    150167 
    151168        galaxy_job_id = job_wrapper.job_id 
    152         log.debug("submitting file %s" % job_file ) 
    153         log.debug("command is: %s" % command_line) 
    154         job_id = pbs.pbs_submit(conn, job_attrs, job_file, None, None) 
    155         log.debug("queued %s: %s" % (galaxy_job_id, job_id) ) 
     169        log.debug("(%s) submitting file %s" % ( galaxy_job_id, job_file ) ) 
     170        log.debug("(%s) command is: %s" % ( galaxy_job_id, command_line ) ) 
     171        job_id = pbs.pbs_submit(conn, job_attrs, job_file, pbs_queue, None) 
     172        if pbs_queue is None: 
     173            log.debug("(%s) queued in default queue as %s" % (galaxy_job_id, job_id) ) 
     174        else: 
     175            log.debug("(%s) queued in %s queue as %s" % (galaxy_job_id, pbs_queue, job_id) ) 
    156176        pbs.pbs_disconnect(conn) 
    157177 
    158178        if not job_id: 
     179            errno, text = pbs.error() 
     180            log.debug( "(%s) pbs_submit failed, PBS error %d: %s" % (galaxy_job_id, errno, text) ) 
    159181            stdout = '' 
    160             stderr = "Job (%s) was not queued, PBS error %d: %s" % (galaxy_job_id, pbs.error() ) 
    161             log.debug(stderr) 
    162  
    163         # FIXME: queue needs to be configurable 
    164         job_wrapper.set_runner( 'pbs://%s/batch' % self.pbs_server, job_id ) 
     182            stderr = "Unable to run this job due to a cluster configuration error" 
     183            # Run failed, finish immediately 
     184            try: 
     185                job_wrapper.finish( stdout, stderr ) 
     186            except: 
     187                log.exception("Job wrapper finish method failed") 
     188            return 
     189 
     190        job_wrapper.set_runner( runner_url, job_id ) 
    165191 
    166192        # Get initial job state 
    167193        stat_attrl = pbs.new_attrl(1) 
    168194        stat_attrl[0].name = 'job_state' 
    169         conn = pbs.pbs_connect( self.pbs_server ) 
     195        conn = pbs.pbs_connect( pbs_server ) 
    170196        if conn > 0: 
    171197            jobs = pbs.pbs_statjob(conn, job_id, stat_attrl, 'NULL') 
     
    201227        pbs_job_state.old_state = old_state 
    202228        pbs_job_state.running = running 
     229        pbs_job_state.runner_url = runner_url 
    203230         
    204231        # Add to our 'queue' of jobs to monitor 
     
    237264            old_state = pbs_job_state.old_state 
    238265            running = pbs_job_state.running 
    239             conn = pbs.pbs_connect( self.pbs_server ) 
     266            pbs_server = self.determine_pbs_server( pbs_job_state.runner_url ) 
     267            conn = pbs.pbs_connect( pbs_server ) 
    240268            if conn <= 0: 
    241269                log.debug("(%s/%s) connection to PBS server for state check failed" % (galaxy_job_id, job_id) ) 
     
    262290                        if killed and state != "E": 
    263291                            log.debug( "(%s/%s) all output datasets deleted by user, dequeueing" % (galaxy_job_id, job_id) ) 
    264                             self.delete_job( job_id
     292                            self.delete_job( job_id, pbs_server
    265293                        old_state = state 
    266294                    pbs_job_state.old_state = old_state 
     
    314342        log.info( "pbs job runner stopped" ) 
    315343 
    316     def delete_job( self, job_id ): 
     344    def delete_job( self, job_id, pbs_server ): 
    317345        """Attempts to delete a job from the PBS queue""" 
    318         conn = pbs.pbs_connect( self.pbs_server ) 
     346        conn = pbs.pbs_connect( pbs_server ) 
    319347        if conn <= 0: 
    320348            log.debug("(%s) connection to PBS server for job delete failed" % job_id ) 
  • lib/galaxy/tools/__init__.py

    r946 r959  
    201201            interpreter  = command.get("interpreter") 
    202202            if interpreter: 
    203                 #if not using pbs, change command to absolute path 
    204                 if not self.app.config.use_pbs or "/tools/data_source" in self.tool_dir: 
    205                     executable = self.command.split()[0] 
    206                     abs_executable = os.path.abspath(os.path.join(self.tool_dir, executable)) 
    207                     self.command = self.command.replace(executable, abs_executable, 1) 
    208                     self.command = interpreter + " " + self.command 
    209                 else: 
    210                     self.command = interpreter + " " + os.path.join(self.tool_dir, self.command) 
     203                # TODO: path munging for cluster/dataset server relocatability 
     204                executable = self.command.split()[0] 
     205                abs_executable = os.path.abspath(os.path.join(self.tool_dir, executable)) 
     206                self.command = self.command.replace(executable, abs_executable, 1) 
     207                self.command = interpreter + " " + self.command 
    211208        else: 
    212209            self.command = '' 
    213210        # Short description of the tool 
    214211        self.description = util.xml_text(root, "description") 
     212        # Job runner 
     213        if self.app.config.start_job_runners is None: 
     214            # Jobs are always local regardless of tool config if no additional 
     215            # runners are started 
     216            self.job_runner = "local:///" 
     217        else: 
     218            # Set job runner to the cluster default 
     219            self.job_runner = self.app.config.default_cluster_job_runner 
     220            for tup in self.app.config.tool_runners: 
     221                if tup[0] == self.id: 
     222                    self.job_runner = tup[1] 
     223                    break 
    215224        # Is this a 'hidden' tool (hidden in tool menu) 
    216225        self.hidden = util.xml_text(root, "hidden") 
     
    900909            param_dict[name] = DatasetFilenameWrapper( data ) 
    901910            # Provide access to a path to store additional files 
    902             if self.app.config.use_pbs and "/tools/data_source" not in self.tool_dir: 
    903                 param_dict[name].files_path = os.path.join(self.app.config.new_file_path, "dataset_%s_files" % (data.id) ) 
    904             else: 
    905                 param_dict[name].files_path = os.path.abspath(os.path.join(self.app.config.new_file_path, "dataset_%s_files" % (data.id) )) 
     911            # TODO: path munging for cluster/dataset server relocatability 
     912            param_dict[name].files_path = os.path.abspath(os.path.join(self.app.config.new_file_path, "dataset_%s_files" % (data.id) )) 
    906913             
    907914            for child_association in data.children: 
     
    913920        # More convienent access to app.config.new_file_path; we don't need to wrap a string 
    914921        # But this method of generating additional datasets should be considered DEPRECATED 
    915         if self.app.config.use_pbs and "/tools/data_source" not in self.tool_dir: 
    916             param_dict['__new_file_path__'] = self.app.config.new_file_path 
    917         else: 
    918             param_dict['__new_file_path__'] = os.path.abspath(self.app.config.new_file_path) 
     922        # TODO: path munging for cluster/dataset server relocatability 
     923        param_dict['__new_file_path__'] = os.path.abspath(self.app.config.new_file_path) 
    919924        # Return the dictionary of parameters 
    920925        return param_dict 
  • universe_wsgi.ini.sample

    r865 r959  
    113113#wiki_url=/path/to/my/local/wiki 
    114114#bugs_email=mailto:bugmaster@this.site.com 
     115 
     116# ---- Job Runners ---------------------------------------------------------- 
     117 
     118# Leave everything in this section commented unless you are running Galaxy on a 
     119# cluster.  Clustering Galaxy is not a straightforward process and requires a 
     120# lot of pre-configuration.  Please see the Galaxy Wiki before attempting to 
     121# set any of these options. 
     122 
     123# start_job_runners: Comma-separated list of job runners to start.  local is 
     124# always started.  If left commented, no jobs will be run on the cluster, even 
     125# if a cluster URL is explicitly defined in the [galaxy:tool_runners] section 
     126# below.  The only runner currently available is 'pbs'. 
     127#start_job_runners = pbs 
     128 
     129# default_cluster_job_runner: The URL for the default runner to use when a tool 
     130# doesn't explicity define a runner below.  For help on the cluster URL format, 
     131# see the Galaxy Wiki.  Leave commented if not using a cluster job runner. 
     132#default_cluster_job_runner = pbs:/// 
     133 
     134# The PBS options are described in detail on the Galaxy Wiki 
     135#pbs_application_server =  
     136#pbs_stage_path =  
     137#pbs_dataset_server =  
     138 
     139# Individual per-tool job runner overrides.  If not listed here, a tool will 
     140# run with the runner defined with default_cluster_job_runner. 
     141[galaxy:tool_runners] 
     142 
     143biomart = local:/// 
     144encode_db1 = local:/// 
     145encode_import_all_latest_datasets1 = local:/// 
     146encode_import_chromatin_and_chromosomes1 = local:/// 
     147encode_import_gencode1 = local:/// 
     148encode_import_genes_and_transcripts1 = local:/// 
     149encode_import_multi-species_sequence_analysis1 = local:/// 
     150encode_import_transcription_regulation1 = local:/// 
     151hbvar = local:/// 
     152microbial_import1 = local:/// 
     153ucsc_table_direct1 = local:/// 
     154ucsc_table_direct_archaea1 = local:/// 
     155ucsc_table_direct_test1 = local:/// 
     156upload1 = local:/// 
    115157 
    116158# ---- Datatypes ------------------------------------------------------------