Monday, April 16, 2012

Interrupt a Quartz job that doing IO

If you want the ability to interrupt a Quartz job that is doing IO, it's possible if you make use of InterruptibleChannel.

This is the quote from Oracle link above:

A channel that implements this interface is also interruptible: If a thread is blocked in an I/O operation on an interruptible channel then another thread may invoke the blocked thread's interrupt method. This will cause the channel to be closed, the blocked thread to receive a ClosedByInterruptException, and the blocked thread's interrupt status to be set.

As it turns out, you can create channel easily from a stream, socket, or a pipe which already implements InterruptibleChannel interface.

So the plan is to get the reference to the executing thread of your own job upon entry and save it for later use. When Quartz scheduler interrupts the job, you could then call interrupt() method on that thread to stop the read/write action.

Here's a simple example of such job: downloading a big file

package demo;

// import statements excluded for brevity

public class MyJob implements InterruptableJob {

  private static Logger    LOG              = LoggerFactory.getLogger(MyJob.class);

  private volatile boolean isJobInterrupted = false;

  private JobKey           jobKey           = null;

  private volatile Thread  thisThread;

  public MyJob() {
  }

  public void execute(JobExecutionContext context) throws JobExecutionException {
    thisThread = Thread.currentThread();
    LOG.info("Thread name of the current job: " + thisThread.getName());

    jobKey = context.getJobDetail().getKey();
    LOG.info("Job " + jobKey + " executing at " + new Date());

    try {
      String fileUrl = "http://d2zwv9pap9ylyd.cloudfront.net/terracotta-3.6.1.tar.gz"; // 59 MB
      String localFile = "terracotta-3.6.1.tar.gz";
      download(fileUrl, localFile);
    } catch (ClosedByInterruptException e) {
      LOG.info("Caught ClosedByInterruptException... exiting job.");
    } catch (IOException e) {
      LOG.info("Caught IOException... exiting job.", e);
    } finally {
      if (isJobInterrupted) {
        LOG.info("Job " + jobKey + " did not complete");
      } else {
        LOG.info("Job " + jobKey + " completed at " + new Date());
      }
    }
  }
  
  // this method is called by the scheduler
  public void interrupt() throws UnableToInterruptJobException {
    LOG.info("Job " + jobKey + "  -- INTERRUPTING --");
    isJobInterrupted = true;
    if (thisThread != null) {
      // this call causes the ClosedByInterruptException to happen
      thisThread.interrupt(); 
    }
  }

  private void download(String address, String localFileName) throws ClosedByInterruptException, IOException {
    URL url = new URL(address);
    ReadableByteChannel src = Channels.newChannel(url.openStream());
    WritableByteChannel dest = new FileOutputStream(new File(localFileName)).getChannel();
    try {
      System.out.println("Downloading " + address + " to " + new File(localFileName).getCanonicalPath());
      int size = fastChannelCopy(src, dest);
      System.out.println("Download completed! " + (size / 1024 / 1024) + " MB");
    } finally {
      src.close();
      dest.close();
    }
  }
  
  // Code copied from http://thomaswabner.wordpress.com/2007/10/09/fast-stream-copy-using-javanio-channels/
  private static int fastChannelCopy(final ReadableByteChannel src, final WritableByteChannel dest) throws IOException {
    final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);
    int count = 0;
    int total = 0;
    while ((count = src.read(buffer)) != -1) {
      total += count;
      // prepare the buffer to be drained
      buffer.flip();
      // write to the channel, may block
      dest.write(buffer);
      // If partial transfer, shift remainder down
      // If buffer is empty, same as doing clear()
      buffer.compact();
    }
    // EOF will leave buffer in fill state
    buffer.flip();
    // make sure the buffer is fully drained.
    while (buffer.hasRemaining()) {
      dest.write(buffer);
    }
    return total;
  }
}
And this is my main class that creates Quartz scheduler and simulates the intended interruption. The download will take about 40 seconds to finish (a 59MB file). To see that our job is indeed interrupted during download, we start the scheduler and sleep for 5 seconds then interrupt the job.
NOTE: If you want to see the job to finish, sleep for about 40 seconds.
package demo;

import static org.quartz.DateBuilder.nextGivenSecondDate;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;

// other imports excluded for brevity

public class InterruptExample {

  public void run() throws Exception {
    final Logger log = LoggerFactory.getLogger(InterruptExample.class);

    log.info("------- Initializing ----------------------");

    // First we must get a reference to a scheduler
    SchedulerFactory sf = new StdSchedulerFactory();
    Scheduler sched = sf.getScheduler();

    log.info("------- Initialization Complete -----------");

    log.info("------- Scheduling Jobs -------------------");

    // get a "nice round" time a few seconds in the future...
    Date startTime = nextGivenSecondDate(null, 1);

    JobDetail job = newJob(MyJob.class).withIdentity("myJob", "group1").build();

    SimpleTrigger trigger = newTrigger().withIdentity("trigger1", "group1").startAt(startTime)
        .withSchedule(simpleSchedule()).build();

    sched.scheduleJob(job, trigger);

    // start up the scheduler (jobs do not start to fire until
    // the scheduler has been started)
    sched.start();
    log.info("Scheduler thread's name: " + Thread.currentThread().getName());
    log.info("------- Started Scheduler -----------------");

    try {
      // if you want to see the job to finish successfully, sleep for about 40 seconds
      Thread.sleep(5 * 1000L);
      // tell the scheduler to interrupt our job
      sched.interrupt(job.getKey());
      Thread.sleep(3 * 1000L);
    } catch (Exception e) {
      e.printStackTrace();
    }

    log.info("------- Shutting Down ---------------------");

    sched.shutdown(true);

    log.info("------- Shutdown Complete -----------------");
  }

  public static void main(String[] args) throws Exception {
    InterruptExample example = new InterruptExample();
    example.run();
  }
}
And this is the excerpt from the log, stating that our job is intterupted and exit prematurely upon ClosedByInterruptException
INFO [main] ------- Scheduling Jobs -------------------
INFO [main] Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
INFO [main] Scheduler thread's name: main
INFO [main] ------- Started Scheduler -----------------
INFO [DefaultQuartzScheduler_Worker-1] Thread name of the current job: DefaultQuartzScheduler_Worker-1
INFO [DefaultQuartzScheduler_Worker-1] Job group1.myJob executing at Mon Apr 16 16:24:40 PDT 2012
 Downloading http://d2zwv9pap9ylyd.cloudfront.net/terracotta-3.6.1.tar.gz to S:\quartz-interrupt-demo\terracotta-3.6.1.tar.gz
INFO [main] Job group1.myJob  -- INTERRUPTING --
INFO [DefaultQuartzScheduler_Worker-1] Caught ClosedByInterruptException... exiting job.
INFO [DefaultQuartzScheduler_Worker-1] Job group1.myJob did not complete
ERROR [DefaultQuartzScheduler_Worker-1] Worker thread was interrupt()'ed.
 java.lang.InterruptedException
 at java.lang.Object.wait(Native Method)
 at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:552)
INFO [main] ------- Shutting Down ---------------------
INFO [main] Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED shutting down.
INFO [main] Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED paused.
INFO [main] Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED shutdown complete.
INFO [main] ------- Shutdown Complete -----------------
The whole example can be downloaded here