00001 package net.threebit.utils.sosc;
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 import java.io.*;
00023 import java.sql.*;
00024 import java.util.*;
00025 import javax.servlet.http.*;
00026 import java.lang.reflect.*;
00027
00113 public class JobTool {
00114
00116 private static JobTool jobTool = null;
00117
00119 private static boolean ready = false;
00120
00122 private DbTool db = null;
00123
00125 public final String JOB_TABLE = "jobToolJobs";
00126
00128 public final String JOB_ARGUMENTS = "jobToolArguments";
00129
00131 public final String JOB_LOG = "jobToolLog";
00132
00134 private ArrayList jobs = new ArrayList();
00135
00140 public JobTool (DbTool dbTool) {
00141 this.db = dbTool;
00142 }
00143
00148 public synchronized static JobTool initialize(DbTool dbTool) throws Exception {
00149 ready = true;
00150 jobTool = new JobTool(dbTool);
00151 return getInstance();
00152 }
00153
00158 public synchronized static JobTool getInstance() throws Exception {
00159 if (! ready) { throw new IllegalStateException("JobTool not ready. Call initialize() first."); }
00160 return jobTool;
00161 }
00162
00170 public void addJob (String queue, String description, String type, String className, Map args) throws Exception {
00171
00172
00173 if (queue == null || queue.length() == 0) { throw new Exception("Queue '" + queue + "' is not valid."); }
00174 if (! type.equals("java")) { throw new Exception("Type '" + type + "' is not supported."); }
00175
00176
00177 if (args == null) { args = new HashMap(); }
00178
00179 synchronized (this) {
00180
00181 db.update(
00182 " insert into " + JOB_TABLE +
00183 " (queue, description, type, class) \n" +
00184 " values (\n" +
00185 " " + db.quote(queue) + ", \n" +
00186 " " + db.quote(description) + ", \n" +
00187 " " + db.quote(type) + ", \n" +
00188 " " + db.quote(className) + " \n" +
00189 " ); "
00190 );
00191
00192
00193 int newId = db.getInt("select max(id) from " + JOB_TABLE + ";");
00194 System.out.println("Job " + newId + " created");
00195
00196 for (Iterator i = args.keySet().iterator(); i.hasNext(); ) {
00197 String key = (String) i.next();
00198 String val = (String) args.get(key);
00199 db.update(
00200 " insert into " + JOB_ARGUMENTS +
00201 " (job,k,v) \n" +
00202 " values (\n" +
00203 " " + newId + ", \n" +
00204 " " + db.quote(key) + ", \n" +
00205 " " + db.quote(val) + " \n" +
00206 " ); "
00207 );
00208 }
00209 db.commit();
00210
00211
00212 log(newId,"Job queued");
00213 }
00214 }
00215
00234 public Job runJob (String queue, Map args2) throws Exception {
00235
00236 int jobId;
00237 Map row = null;
00238 Map args1 = null;
00239 Job job = null;
00240
00241 synchronized (this) {
00242
00243
00244 String s = db.getString("select min(id) from " + JOB_TABLE + " where state = 0 and queue = " + db.quote(queue) + ";");
00245 if (s == null || s.equals("null")) {
00246 System.out.println("No jobs for queue: " + queue);
00247 return null;
00248 }
00249 else {
00250 jobId = Integer.valueOf(s).intValue();
00251 }
00252
00253
00254 ResultSet r = db.query(" select * from " + JOB_TABLE + " where id = " + jobId);
00255 row = db.toMap(r);
00256 r.close();
00257
00258
00259 r = db.query(" select * from " + JOB_ARGUMENTS + " where job = " + jobId);
00260 args1 = new HashMap();
00261 while (r.next()) {
00262 args1.put( r.getString("k"), r.getString("v"));
00263 }
00264 r.close();
00265
00266
00267 db.update(" update " + JOB_TABLE + " set updated = getdate(), state = 1 where id = " + jobId);
00268 db.commit();
00269
00270
00271 log(jobId,"initializing");
00272 String className = (String) row.get("class");
00273 Class classy = Class.forName(className);
00274 Constructor constructor = classy.getConstructor(new Class[] { int.class, String.class, Map.class, Map.class });
00275 job = (Job) constructor.newInstance(new Object[] { new Integer(jobId), row.get("description"), args1, args2 });
00276
00277
00278 jobs.add(job);
00279 }
00280
00281
00282 log(jobId,"executing");
00283 job.execute();
00284
00286
00288
00289 synchronized (this) {
00290 jobs.remove(job);
00291
00292 db.update(" update " + JOB_TABLE + " set updated = getdate(), state = 2 where id = " + jobId);
00293 log(jobId,"finished");
00294 db.commit();
00295 }
00296
00297
00298 return null;
00299
00300 }
00301
00306 public void log (int jobId, String text) throws Exception {
00307 synchronized (this) {
00308 db.update(" insert into " + JOB_LOG + " (job,text) values (" + jobId + "," + db.quote(text) + ");");
00309 }
00310 }
00311
00312 }