Sunday, August 30, 2015

Creating scheduled job in Alfresco


Sometimes we need to perform some tasks or operations on regular basis or asynchronously after a certain time interval. It could be archival or transformation of contents or processing of jobs in Alfresco repository.

Another use case would be for example there is a third party system which is integrated with Alfresco and produces some output or transform contents for Alfresco, In this case you can’t rely on a synchronous call to the services on third party system. If the third party system gets hung or takes time to process the request your alfresco request will result into time out or failure. So in this case you can create job objects for each request into Alfresco inside some folder (job node) and let them processed by the job scheduler asynchronously. 

Job scheduler will fetch all jobs which are queued in job folder (job node) and process them. It keeps checking the status of the request placed at third party system every minute or 5 minute based on the scheduler configuration and do some operation once third party system returns desired response.
The simplest way would be to use Quartz framework with Cron. We will see how to create a simple job scheduler using the quartz and cron.

You can look the Chapter 22 in spring documentation and Quartz Enterprise documentation.
Quartz and Cron are explained here.

I have used ‘org.quartz.StatefulJob’ to create a simple scheduled job in Alfresco. Let’s see how to implement it.

Before that you need to implement the Action/Webscript which will actually create job objects at some location (job folder) in alfresco repository. Scheduler will fetch these jobs and process them.

Let say there is an Action in alfresco using which user triggers transformation of a big XML document on a third party system.  

Assume following statements are in place in order to implement scheduled job:

Ø  Alfresco action invokes transformation on third party system
Ø  The third party system sends the response immediately with current transformation status say ‘Created’ and a unique id (e.g. transaction id).
Ø  Alfresco action reads the response and based on that it creates a content less job object in alfresco repository with the help of content model (content model defines an aspect and some properties to hold the transformation status, unique id, action requester user name/email, date, time, etc. ) and apply the properties to job object.
Ø  Scheduled job in Alfresco will fetch all job objects and keep checking the status of the transformation using the unique id (saved as part of job object) provided by third party system.


We will not go into much detail about how to create the job object. Above statements are more than enough for a developer to understand how to create job object, isn’t it J

Now it’s time to look at the actual topic which we are talking about here.

StatefulJob

Follow the below given steps to implement a job scheduler using org.quartz.StatefulJob:

1- Create custom-scheduled-job-context.xml where we define job detail and job trigger bean

<bean id="customJobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">
 <property name="jobClass">
        <value>com.abhinav.jobs.CustomJobProcessor</value>
  </property>
  <property name="jobDataAsMap">
     <map>
   <entry key="serviceRegistry">
     <ref bean="ServiceRegistry"/>
   </entry>
   <entry key="globalProperties">
     <ref bean="global-properties"/>
   </entry>
    </map>
  </property>
 </bean>


Here, 

-- jobClass has a value ‘com.abhinav.jobs.CustomJobProcessor’. This class holds the actual job processing logic. 

-- The jobDataAsMap can be used to hold any number of (serializable) objects which you wish to have made available to the job instance when it executes. It is an implementation of the Java Map interface, and in jobDataAsMap you can pass alfresco services which will be required by your job to access repository.


It is subclass of Quartz' JobDetail class that eases bean-style usage. JobDetail itself is already a JavaBean but lacks sensible defaults.  

See more details here: JobDetailBean 

<bean id="customJobTrigger" class="org.alfresco.util.CronTriggerBean">
        <property name="jobDetail">
            <ref bean="customJobDetail"/>
        </property>
        <property name="scheduler">
            <ref bean="schedulerFactory"/>
        </property>
        <property name="cronExpression">
          <!-- Run the job every 1 minutes -->
          <value>0 0/1 * 1/1 * ? *</value>
        </property>
 </bean>


Here, 

-- jobDetail has a bean reference which we have defined above, it injects the job configuration defined above to org.alfresco.util.CronTriggerBean.

-- scheduler has a bean reference ‘schedulerFactory’, it injects the schedulerFactory (instance of org.quartz.Scheduler) to org.alfresco.util.CronTriggerBean

-- cronExpression value in customJobTrigger defines the time interval at which this job will be executed

-- org.alfresco.util.CronTriggerBean class is defined in Alfresco utility which extends ‘org.alfresco.util.AbstractTriggerBean’ and AbstractTriggerBean is the implementation of org.springframework.scheduling.quartz.JobDetailAwareTrigger’ 


See more details here:  CronTriggerBean

2- Import “custom-scheduled-job-context.xml” into module-context.xml if using amp, else place it inside <tomcat>/shared/classes/alfresco/extension

3- Now we have created custom Scheduled Job called customJobDetail and associating it with Job trigger called customJobTrigger.

4- Now, Create job class CustomJobProcessor  under package  com.abhinav.jobs and implements org.quartz.StatefulJob

5- We need to implement execute (signature given below) method and under that whole business logic will reside. 

public void execute(final JobExecutionContext jobCtx) throws JobExecutionException;

/*
* Author: Abhinav Kumar Mishra
* Copyright &COPY; 2015 Abhinav Kumar Mishra. All rights reserved.
*/
package com.abhinav.jobs;

import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.service.ServiceRegistry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

/**
 * The Class CustomJobProcessor.<br/>
 */
public class CustomJobProcessor implements org.quartz.StatefulJob{
 
 /** The Constant LOG. */
 private static final Log LOG = LogFactory.getLog(CustomJobProcessor.class);
    
 /** The global properties. */
 private Properties globalProperties;
 
 /** The service registry. */
 private ServiceRegistry serviceRegistry;

 @Override
 public void execute(final JobExecutionContext jobCtx) throws JobExecutionException {
    LOG.info("CustomJobProcessor Started..");
    try{
      //Run as system user since this job is user independent hence
       // permission is required on repo
     AuthenticationUtil.setRunAsUserSystem();
     // TODO:: Put job processing logic here..
     // Get the job space where all jobs are stored 
                  // using  serviceRegistry.getFileFolderService()
                  // Read the jobs in a list as given below:
                  // List<FileInfo> fileInfoList =    fileFolderService.listFiles(jobSpaceNode);
    // Read the uniuqe id from the job property and process
   } catch (RuntimeException excp){
      LOG.error("Exception occured while processing job", excp);
   }
   LOG.info("CustomJobProcessor End!");

 }// execute end

 /**
  * Sets the global properties.
  *
  * @param globalProperties the global properties
  */
 public void setGlobalProperties(final Properties globalProperties) {
  this.globalProperties = globalProperties;
 }
 
 /**
  * Sets the service registry.
  *
  * @param serviceRegistry the service registry
  */
 public void setServiceRegistry(final ServiceRegistry serviceRegistry) {
  this.serviceRegistry = serviceRegistry;
 }
}


6- Prepare the amp and apply to alfresco.war or prepare the jar file and place it under <tomcat>/webapps/alfresco/WEB-INF/lib

7- Restart the server and your scheduled job will be executed on defined time intervals. You can define the cron expression based on your requirements. Refer this guide to know more about cron expressions and cron maker.


A cron expression is a string comprised of 6 or 7 fields separated by white space. Fields can contain any of the allowed values, along with various combinations of the allowed special characters for that field. The fields are as follows:

Field Name
Mandatory
Allowed Values
Allowed Special Characters
Seconds
YES
0-59
, - * /
Minutes
YES
0-59
, - * /
Hours
YES
0-23
, - * /
Day of month
YES
1-31
, - * ? / L W
Month
YES
1-12 or JAN-DEC
, - * /
Day of week
YES
1-7 or SUN-SAT
, - * ? / L #
Year
NO
empty, 1970-2099
, - * /


So cron expressions can be as simple as this: * * * * ? *


So, we saw that how to implement a simple scheduled job. But suppose that your alfresco server is running in Clustered environment (this is likely in production environment). In this case there will be multiple instances of the same schedulers running which is nothing but the multiple threads. Now to handle this case Alfresco has implemented ‘org.alfresco.schedule.AbstractScheduledLockedJob

It makes the cluster aware locking of the job transparent to the implementation. 
On the job's spring JobExecutionContext it will still always have to be passed as parameter the JobLockService.

The name to be used for locking of the job is optional, If none is passed a name will be composed using the simple name of the implementation class.
In general if it may make sense to have more than one job setup using the same class you should always use a different name on each JobExecutionContext to differentiate the jobs, unless you want the lock to be shared between the different instances.

AbstractScheduledLockedJob (Cluster Aware Job Scheduler Implementation)

Follow the below given steps to implement a job scheduler using org.alfresco.schedule.AbstractScheduledLockedJob:

1- Create custom-cluster-aware-scheduled-job-context.xml where we define job detail and job trigger bean

<bean id="customClusterAwareJobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">

 <property name="jobClass">
  <value>com.abhinav.jobs.CustomClusterAwareJobProcessor</value>
 </property>
 <property name="jobDataAsMap">
  <map>
   <entry key="fileFolderService">
     <ref bean="fileFolderService" />
   </entry>
   <entry key="nodeService">
      <ref bean="nodeService" />
   </entry>
   <entry key="transactionService">
    <ref bean="transactionService"/>
   </entry>
   <!-- JobLockService used to aquire the lock on jobs while they are 
        being processed to avoid other thread to modify the jobs state-->
   <entry key="jobLockService">
    <ref bean="jobLockService"/>
   </entry>
  </map>
 </property>
</bean>
                       
<bean id="customClusterAwareJobTrigger" class="org.alfresco.util.CronTriggerBean">

 <property name="jobDetail">
  <ref bean="customClusterAwareJobDetail" />
 </property>
 <property name="scheduler">
  <ref bean="schedulerFactory" />
 </property>
 <property name="cronExpression">
     <!-- Provided the cron expession in alfresco-global.propeties file -->
  <value>${customjob.cron.expression}</value>
 </property>
</bean>



2- Import “custom-cluster-aware-scheduled-job-context.xml” into module-context.xml if using amp, else place it inside <tomcat>/shared/classes/alfresco/extension

3- Now we have created custom Scheduled Job called customClusterAwareJobDetail and associating it with Job trigger called customClusterAwareJobTrigger.

4- Now, Create job class CustomClusterAwareJobProcessor under package  com.abhinav.jobs and extend org.alfresco.schedule.AbstractScheduledLockedJob class.

5- We need to implement executeJob (signature given below) method and under that whole business logic will reside. 

public void executeJob(final JobExecutionContext jobCtx) throws JobExecutionException;


/*
* Author: Abhinav Kumar Mishra
* Copyright &COPY; 2015 Abhinav Kumar Mishra. All rights reserved.
*/
package com.abhinav.jobs;

import org.alfresco.repo.security.authentication.AuthenticationUtil;
import org.alfresco.repo.transaction.RetryingTransactionHelper.RetryingTransactionCallback;
import org.alfresco.schedule.AbstractScheduledLockedJob;
import org.alfresco.service.cmr.model.FileFolderService;
import org.alfresco.service.cmr.repository.NodeService;
import org.alfresco.service.transaction.TransactionService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

/**
 * The Class CustomClusterAwareJobProcessor.<br/>
 * 
 * This class extends AbstractScheduledLockedJob to execute jobs using JobLockService. <br/>
 * It makes the cluster aware locking of the job transparent to the implementation. On the job's spring
 * JobExecutionContext it will still always have to be passed as parameter the
 * jobLockService.<br/> The name to be used for locking of the job is optional, if
 * none is passed a name will be composed using the simple name of the
 * implementation class. <br/>In general if it may make sense to have more than one
 * job setup using the same class you should always use a different name on each
 * JobExecutionContext to differentiate the jobs, unless you want the lock to be
 * shared between the different instances.<br/>
 * 
 * The only method to be implemented when extending this class is
 * executeJob(JobExecutionContext).
 * 
 * @see org.alfresco.schedule.AbstractScheduledLockedJob
 * @see org.alfresco.repo.lock.JobLockService
 */
public class CustomClusterAwareJobProcessor extends AbstractScheduledLockedJob {

 /** The Constant LOG. */
 private static final Log LOG = LogFactory.getLog(CustomClusterAwareJobProcessor.class);

 /** The file folder service. */
 private FileFolderService fileFolderService;

 /** The node service. */
 private NodeService nodeService;

 /** The transaction service. */
 private TransactionService transactionService;

 /**
  * This method will process the job by taking the lock on jobs. Since its an
  * extension of {@link org.alfresco.schedule.AbstractScheduledLockedJob} it
  * should also receive reference to the service
  * {@link org.alfresco.repo.lock.JobLockService}.
  *
  * @param jobContext the job execution context
  * @throws JobExecutionException if there is an exception while executing the job.
  */
 @Override
 public void executeJob(final JobExecutionContext jobCtx)
   throws JobExecutionException {
  LOG.info("CustomClusterAwareJobProcessor  Started..");
  try {
   // Run as system user since this job is user independent hence
   // permission is required on repository
   AuthenticationUtil.setRunAsUserSystem();

   final RetryingTransactionCallback<Object> txnWork = 
                                                     new RetryingTransactionCallback<Object>() {

      public Object execute() throws Exception {
        // TODO:: Put job processing logic here..
       // Get the job space where all jobs are stored 
       // using  serviceRegistry.getFileFolderService()
       // Read the jobs in a list as given below:
       // List<FileInfo> fileInfoList = fileFolderService.listFiles(jobSpaceNode);
       // Read the uniuqe id from the job property and process
        return null;
    }
       };
    transactionService.getRetryingTransactionHelper().doInTransaction(txnWork);  

  } catch (RuntimeException excp) {
      LOG.error("Exception occured while processing job", excp);
  }

    LOG.info("CustomClusterAwareJobProcessor End!");
 }

 /**
  * Gets the file folder service.
  *
  * @return the file folder service
  */
 public FileFolderService getFileFolderService() {
  return fileFolderService;
 }

 /**
  * Sets the file folder service.
  *
  * @param fileFolderService the file folder service
  */
 public void setFileFolderService(final FileFolderService fileFolderService) {
  this.fileFolderService = fileFolderService;
 }

 /**
  * Gets the node service.
  *
  * @return the node service
  */
 public NodeService getNodeService() {
  return nodeService;
 }

 /**
  * Sets the node service.
  *
  * @param nodeService the node service
  */
 public void setNodeService(final NodeService nodeService) {
  this.nodeService = nodeService;
 }


 /**
  * Gets the transaction service.
  *
  * @return the transaction service
  */
 public TransactionService getTransactionService() {
  return transactionService;
 }


 /**
  * Sets the transaction service.
  *
  * @param transactionService the transaction service
  */
 public void setTransactionService(final TransactionService transactionService) {
  this.transactionService = transactionService;
 }
}


6- Prepare the amp and apply amp to alfresco.war or prepare the jar file and copy it under <tomcat>/webapps/alfresco/WEB-INF/lib

7- Restart the server and your scheduled job will be executed on defined time intervals.




References:








2 comments:

Thanks for your comments/Suggestions.