Changeset 959:31aef7f8fff9
- Timestamp:
- 02/14/08 15:00:39 (1 year ago)
- Files:
-
- lib/galaxy/config.py (modified) (2 diffs)
- lib/galaxy/jobs/__init__.py (modified) (6 diffs)
- lib/galaxy/jobs/runners/pbs.py (modified) (11 diffs)
- lib/galaxy/tools/__init__.py (modified) (3 diffs)
- universe_wsgi.ini.sample (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
lib/galaxy/config.py
r883 r959 48 48 self.error_email_to = kwargs.get( 'error_email_to', None ) 49 49 self.smtp_server = kwargs.get( 'smtp_server', None ) 50 self.use_pbs = kwargs.get('use_pbs', False ) 51 self.pbs_server = kwargs.get('pbs_server', "" ) 52 self.pbs_instance_path = kwargs.get('pbs_instance_path', os.getcwd() ) 50 self.start_job_runners = kwargs.get( 'start_job_runners', None ) 51 self.default_cluster_job_runner = kwargs.get( 'default_cluster_job_runner', 'local:///' ) 53 52 self.pbs_application_server = kwargs.get('pbs_application_server', "" ) 54 53 self.pbs_dataset_server = kwargs.get('pbs_dataset_server', "" ) … … 68 67 if global_conf and "__file__" in global_conf: 69 68 global_conf_parser.read(global_conf['__file__']) 69 #Store per-tool runner config 70 try: 71 self.tool_runners = global_conf_parser.items("galaxy:tool_runners") 72 except ConfigParser.NoSectionError: 73 self.tool_runners = [] 70 74 #Store datatypes config 71 75 try: lib/galaxy/jobs/__init__.py
r958 r959 118 118 pbs_job_state.job_file = "%s/database/pbs/%s.sh" % (os.getcwd(), j.id) 119 119 pbs_job_state.job_id = str( j.job_runner_external_id ) 120 pbs_job_state.runner_url = job_wrapper.tool.job_runner 120 121 job_wrapper.command_line = j.command_line 121 122 pbs_job_state.job_wrapper = job_wrapper … … 125 126 pbs_job_state.old_state = 'R' 126 127 pbs_job_state.running = True 127 self.dispatcher. pbs_job_runner.queue.put( pbs_job_state )128 self.dispatcher.job_runners["pbs"].queue.put( pbs_job_state ) 128 129 elif j.state == model.Job.states.QUEUED: 129 130 # The job was in PBS but not yet running … … 132 133 pbs_job_state.old_state = 'Q' 133 134 pbs_job_state.running = False 134 self.dispatcher. pbs_job_runner.queue.put( pbs_job_state )135 self.dispatcher.job_runners["pbs"].queue.put( pbs_job_state ) 135 136 # The job had reached the pbs queue_job method, but not the PBS queue itself. 136 137 else: … … 363 364 job.flush() 364 365 365 def set_runner( self, runner_ name, external_id ):366 def set_runner( self, runner_url, external_id ): 366 367 job = model.Job.get( self.job_id ) 367 368 job.refresh() 368 job.job_runner_name = runner_ name369 job.job_runner_name = runner_url 369 370 job.job_runner_external_id = external_id 370 371 job.flush() … … 456 457 def cleanup( self ): 457 458 # remove temporary files 458 for fname in self.extra_filenames: 459 os.remove( fname ) 460 if self.working_directory is not None: 461 os.rmdir( self.working_directory ) 459 try: 460 for fname in self.extra_filenames: 461 os.remove( fname ) 462 if self.working_directory is not None: 463 os.rmdir( self.working_directory ) 464 except: 465 log.exception( "Unable to cleanup job %s" % self.job_id ) 462 466 463 467 def get_command_line( self ): … … 488 492 def __init__( self, app ): 489 493 self.app = app 490 self.use_pbs = asbool( app.config.use_pbs ) 491 import runners.local 492 self.local_job_runner = runners.local.LocalJobRunner( app ) 493 if self.use_pbs: 494 import runners.pbs 495 self.pbs_job_runner = runners.pbs.PBSJobRunner( app ) 496 self.put = self.dispatch_pbs 497 else: 498 self.put = self.dispatch_default 494 self.job_runners = {} 495 start_job_runners = ["local"] 496 if app.config.start_job_runners is not None: 497 start_job_runners.extend( app.config.start_job_runners.split(",") ) 498 for runner_name in start_job_runners: 499 if runner_name == "local": 500 import runners.local 501 self.job_runners[runner_name] = runners.local.LocalJobRunner( app ) 502 elif runner_name == "pbs": 503 import runners.pbs 504 self.job_runners[runner_name] = runners.pbs.PBSJobRunner( app ) 505 else: 506 log.error( "Unable to start unknown job runner: %s" %runner_name ) 499 507 500 def dispatch_default( self, job_wrapper ): 501 self.local_job_runner.put( job_wrapper ) 502 log.debug( "dispatch_default(): dispatching job %d to local runner" %job_wrapper.job_id ) 503 504 def dispatch_pbs( self, job_wrapper ): 505 # command_line = job_wrapper.get_command_line() 506 # HACK: Need a more robust way for tools to assert whether they should 507 # be run on the cluster. 508 command_line = job_wrapper.tool.command 509 if ( not command_line ) or ( "/tools/data_source" in command_line ): 510 log.debug( "dispatching job %d to local runner" %job_wrapper.job_id ) 511 self.local_job_runner.put( job_wrapper ) 512 else: 513 self.pbs_job_runner.put( job_wrapper ) 514 log.debug( "dispatch_pbs(): dispatching job %d to pbs runner" %job_wrapper.job_id ) 515 508 def put( self, job_wrapper ): 509 runner_name = ( job_wrapper.tool.job_runner.split(":", 1) )[0] 510 log.debug( "dispatching job %d to %s runner" %( job_wrapper.job_id, runner_name ) ) 511 self.job_runners[runner_name].put( job_wrapper ) 512 516 513 def shutdown( self ): 517 self.local_job_runner.shutdown() 518 if self.use_pbs: 519 self.pbs_job_runner.shutdown() 514 for runner in self.job_runners.itervalues(): 515 runner.shutdown() lib/galaxy/jobs/runners/pbs.py
r958 r959 53 53 self.ofile = None 54 54 self.efile = None 55 self.runner_url = None 55 56 56 57 class PBSJobRunner( object ): … … 72 73 self.watched = [] 73 74 self.queue = Queue() 74 self.determine_pbs_server() 75 self.default_pbs_server = None 76 self.determine_pbs_server( "pbs:///" ) 75 77 self.monitor_thread = threading.Thread( target=self.monitor ) 76 78 self.monitor_thread.start() 77 79 log.debug( "ready" ) 78 80 79 def determine_pbs_server( self ):81 def determine_pbs_server( self, url ): 80 82 """Determine what PBS server we are connecting to""" 81 if self.app.config.pbs_server: 82 self.pbs_server = self.app.config.pbs_server 83 else: 84 self.pbs_server = pbs.pbs_default() 85 if self.pbs_server is None: 83 url_split = url.split("/") 84 server = url_split[2] 85 if server == "": 86 if not self.default_pbs_server: 87 server = pbs.pbs_default() 88 else: 89 server = self.default_pbs_server 90 if server is None: 86 91 raise Exception( "Could not find torque server" ) 92 return server 93 94 def determine_pbs_queue( self, url ): 95 """Determine what PBS queue we are submitting to""" 96 url_split = url.split("/") 97 queue = url_split[3] 98 if queue == "": 99 # None = server's default queue 100 queue = None 101 return queue 87 102 88 103 def queue_job( self, job_wrapper ): … … 90 105 job_wrapper.prepare() 91 106 command_line = job_wrapper.get_command_line() 107 runner_url = job_wrapper.tool.job_runner 92 108 93 109 # This is silly, why would we queue a job with no command line? … … 98 114 job_wrapper.change_state( 'queued' ) 99 115 100 conn = pbs.pbs_connect( self.pbs_server ) 116 pbs_server = self.determine_pbs_server( runner_url ) 117 pbs_queue = self.determine_pbs_queue( runner_url ) 118 conn = pbs.pbs_connect( pbs_server ) 101 119 if conn <= 0: 102 120 raise Exception( "Connection to PBS server for submit failed" ) … … 106 124 efile = "%s/database/pbs/%s.e" % (os.getcwd(), job_wrapper.job_id) 107 125 108 # to get returned via scp/rcp we need this126 # If an application server is set, we're staging 109 127 if self.app.config.pbs_application_server: 110 # separate vars because we need un-munged ofile/efile for collection/cleanup111 128 pbs_ofile = self.app.config.pbs_application_server + ':' + ofile 112 129 pbs_efile = self.app.config.pbs_application_server + ':' + efile … … 124 141 job_attrs[4].name = pbs.ATTR_N 125 142 job_attrs[4].value = "%s" % job_wrapper.job_id 126 exec_dir = self.app.config.pbs_instance_path127 # FIXME: pbs_instance_path is broken until we munge PATH and PYTHONPATH here143 exec_dir = os.path.abspath( os.getcwd() ) 144 # If not, we're using NFS 128 145 else: 129 146 job_attrs = pbs.new_attropl(2) … … 150 167 151 168 galaxy_job_id = job_wrapper.job_id 152 log.debug("submitting file %s" % job_file ) 153 log.debug("command is: %s" % command_line) 154 job_id = pbs.pbs_submit(conn, job_attrs, job_file, None, None) 155 log.debug("queued %s: %s" % (galaxy_job_id, job_id) ) 169 log.debug("(%s) submitting file %s" % ( galaxy_job_id, job_file ) ) 170 log.debug("(%s) command is: %s" % ( galaxy_job_id, command_line ) ) 171 job_id = pbs.pbs_submit(conn, job_attrs, job_file, pbs_queue, None) 172 if pbs_queue is None: 173 log.debug("(%s) queued in default queue as %s" % (galaxy_job_id, job_id) ) 174 else: 175 log.debug("(%s) queued in %s queue as %s" % (galaxy_job_id, pbs_queue, job_id) ) 156 176 pbs.pbs_disconnect(conn) 157 177 158 178 if not job_id: 179 errno, text = pbs.error() 180 log.debug( "(%s) pbs_submit failed, PBS error %d: %s" % (galaxy_job_id, errno, text) ) 159 181 stdout = '' 160 stderr = "Job (%s) was not queued, PBS error %d: %s" % (galaxy_job_id, pbs.error() ) 161 log.debug(stderr) 162 163 # FIXME: queue needs to be configurable 164 job_wrapper.set_runner( 'pbs://%s/batch' % self.pbs_server, job_id ) 182 stderr = "Unable to run this job due to a cluster configuration error" 183 # Run failed, finish immediately 184 try: 185 job_wrapper.finish( stdout, stderr ) 186 except: 187 log.exception("Job wrapper finish method failed") 188 return 189 190 job_wrapper.set_runner( runner_url, job_id ) 165 191 166 192 # Get initial job state 167 193 stat_attrl = pbs.new_attrl(1) 168 194 stat_attrl[0].name = 'job_state' 169 conn = pbs.pbs_connect( self.pbs_server )195 conn = pbs.pbs_connect( pbs_server ) 170 196 if conn > 0: 171 197 jobs = pbs.pbs_statjob(conn, job_id, stat_attrl, 'NULL') … … 201 227 pbs_job_state.old_state = old_state 202 228 pbs_job_state.running = running 229 pbs_job_state.runner_url = runner_url 203 230 204 231 # Add to our 'queue' of jobs to monitor … … 237 264 old_state = pbs_job_state.old_state 238 265 running = pbs_job_state.running 239 conn = pbs.pbs_connect( self.pbs_server ) 266 pbs_server = self.determine_pbs_server( pbs_job_state.runner_url ) 267 conn = pbs.pbs_connect( pbs_server ) 240 268 if conn <= 0: 241 269 log.debug("(%s/%s) connection to PBS server for state check failed" % (galaxy_job_id, job_id) ) … … 262 290 if killed and state != "E": 263 291 log.debug( "(%s/%s) all output datasets deleted by user, dequeueing" % (galaxy_job_id, job_id) ) 264 self.delete_job( job_id )292 self.delete_job( job_id, pbs_server ) 265 293 old_state = state 266 294 pbs_job_state.old_state = old_state … … 314 342 log.info( "pbs job runner stopped" ) 315 343 316 def delete_job( self, job_id ):344 def delete_job( self, job_id, pbs_server ): 317 345 """Attempts to delete a job from the PBS queue""" 318 conn = pbs.pbs_connect( self.pbs_server )346 conn = pbs.pbs_connect( pbs_server ) 319 347 if conn <= 0: 320 348 log.debug("(%s) connection to PBS server for job delete failed" % job_id ) lib/galaxy/tools/__init__.py
r946 r959 201 201 interpreter = command.get("interpreter") 202 202 if interpreter: 203 #if not using pbs, change command to absolute path 204 if not self.app.config.use_pbs or "/tools/data_source" in self.tool_dir: 205 executable = self.command.split()[0] 206 abs_executable = os.path.abspath(os.path.join(self.tool_dir, executable)) 207 self.command = self.command.replace(executable, abs_executable, 1) 208 self.command = interpreter + " " + self.command 209 else: 210 self.command = interpreter + " " + os.path.join(self.tool_dir, self.command) 203 # TODO: path munging for cluster/dataset server relocatability 204 executable = self.command.split()[0] 205 abs_executable = os.path.abspath(os.path.join(self.tool_dir, executable)) 206 self.command = self.command.replace(executable, abs_executable, 1) 207 self.command = interpreter + " " + self.command 211 208 else: 212 209 self.command = '' 213 210 # Short description of the tool 214 211 self.description = util.xml_text(root, "description") 212 # Job runner 213 if self.app.config.start_job_runners is None: 214 # Jobs are always local regardless of tool config if no additional 215 # runners are started 216 self.job_runner = "local:///" 217 else: 218 # Set job runner to the cluster default 219 self.job_runner = self.app.config.default_cluster_job_runner 220 for tup in self.app.config.tool_runners: 221 if tup[0] == self.id: 222 self.job_runner = tup[1] 223 break 215 224 # Is this a 'hidden' tool (hidden in tool menu) 216 225 self.hidden = util.xml_text(root, "hidden") … … 900 909 param_dict[name] = DatasetFilenameWrapper( data ) 901 910 # Provide access to a path to store additional files 902 if self.app.config.use_pbs and "/tools/data_source" not in self.tool_dir: 903 param_dict[name].files_path = os.path.join(self.app.config.new_file_path, "dataset_%s_files" % (data.id) ) 904 else: 905 param_dict[name].files_path = os.path.abspath(os.path.join(self.app.config.new_file_path, "dataset_%s_files" % (data.id) )) 911 # TODO: path munging for cluster/dataset server relocatability 912 param_dict[name].files_path = os.path.abspath(os.path.join(self.app.config.new_file_path, "dataset_%s_files" % (data.id) )) 906 913 907 914 for child_association in data.children: … … 913 920 # More convienent access to app.config.new_file_path; we don't need to wrap a string 914 921 # But this method of generating additional datasets should be considered DEPRECATED 915 if self.app.config.use_pbs and "/tools/data_source" not in self.tool_dir: 916 param_dict['__new_file_path__'] = self.app.config.new_file_path 917 else: 918 param_dict['__new_file_path__'] = os.path.abspath(self.app.config.new_file_path) 922 # TODO: path munging for cluster/dataset server relocatability 923 param_dict['__new_file_path__'] = os.path.abspath(self.app.config.new_file_path) 919 924 # Return the dictionary of parameters 920 925 return param_dict universe_wsgi.ini.sample
r865 r959 113 113 #wiki_url=/path/to/my/local/wiki 114 114 #bugs_email=mailto:bugmaster@this.site.com 115 116 # ---- Job Runners ---------------------------------------------------------- 117 118 # Leave everything in this section commented unless you are running Galaxy on a 119 # cluster. Clustering Galaxy is not a straightforward process and requires a 120 # lot of pre-configuration. Please see the Galaxy Wiki before attempting to 121 # set any of these options. 122 123 # start_job_runners: Comma-separated list of job runners to start. local is 124 # always started. If left commented, no jobs will be run on the cluster, even 125 # if a cluster URL is explicitly defined in the [galaxy:tool_runners] section 126 # below. The only runner currently available is 'pbs'. 127 #start_job_runners = pbs 128 129 # default_cluster_job_runner: The URL for the default runner to use when a tool 130 # doesn't explicity define a runner below. For help on the cluster URL format, 131 # see the Galaxy Wiki. Leave commented if not using a cluster job runner. 132 #default_cluster_job_runner = pbs:/// 133 134 # The PBS options are described in detail on the Galaxy Wiki 135 #pbs_application_server = 136 #pbs_stage_path = 137 #pbs_dataset_server = 138 139 # Individual per-tool job runner overrides. If not listed here, a tool will 140 # run with the runner defined with default_cluster_job_runner. 141 [galaxy:tool_runners] 142 143 biomart = local:/// 144 encode_db1 = local:/// 145 encode_import_all_latest_datasets1 = local:/// 146 encode_import_chromatin_and_chromosomes1 = local:/// 147 encode_import_gencode1 = local:/// 148 encode_import_genes_and_transcripts1 = local:/// 149 encode_import_multi-species_sequence_analysis1 = local:/// 150 encode_import_transcription_regulation1 = local:/// 151 hbvar = local:/// 152 microbial_import1 = local:/// 153 ucsc_table_direct1 = local:/// 154 ucsc_table_direct_archaea1 = local:/// 155 ucsc_table_direct_test1 = local:/// 156 upload1 = local:/// 115 157 116 158 # ---- Datatypes ------------------------------------------------------------