'use strict'
const NodeResque = require('node-resque')
const glob = require('glob')
const path = require('path')
const ActionHero = require('./../index.js')
const api = ActionHero.api
/**
* This callback is displayed as part of the Requester class.
* @callback ActionHero~TaskCallback
* @param {Object} this.worker - The task worker, if this is a pre or post process step.
* @param {Object} this.args - If this is a queue step, the arguemnts to the task
* @param {Object} this.queue - The queue to be used / is being used.
* @see ActionHero~TaskMiddleware
*/
/**
* Middleware definition for Actions
*
* @async
* @typedef {Object} ActionHero~TaskMiddleware
* @property {string} name - Unique name for the middleware.
* @property {Boolean} global - Is this middleware applied to all tasks?
* @property {Number} priority - Module load order. Defaults to `api.config.general.defaultMiddlewarePriority`.
* @property {ActionHero~TaskCallback} preProcessor - Called berore the action runs. Has access to all params, before sanitizartion. Can modify the data object for use in actions.
* @property {ActionHero~TaskCallback} postProcessor - Called after the action runs.
* @property {ActionHero~TaskCallback} preEnqueue - Called before a task using this middleware is enqueud.
* @property {ActionHero~TaskCallback} postEnqueue - Called after a task using this middleware is enqueud.
* @see api.actions.addMiddleware
* @example
const middleware = {
name: 'timer',
global: true,
priority: 90,
preProcessor: async function () {
const worker = this.worker
worker.startTime = process.hrtime()
},
postProcessor: async function () {
const worker = this.worker
const elapsed = process.hrtime(worker.startTime)
const seconds = elapsed[0]
const millis = elapsed[1] / 1000000
api.log(worker.job.class + ' done in ' + seconds + ' s and ' + millis + ' ms.', 'info')
},
preEnqueue: async function () {
const arg = this.args[0]
return (arg === 'ok') // returing `false` will prevent the task from enqueing
},
postEnqueue: async function () {
api.log("Task successfully enqueued!")
}
}
api.tasks.addMiddleware(middleware)
*/
/**
* Tools for enquing and inspecting the task sytem (delayed jobs).
*
* @namespace api.tasks
* @property {Object} tasks - The tasks defined on this server.
* @property {Object} jobs - The tasks defined on this server, converted into Node Resque jobs.
* @property {Object} middleware - Available Task Middleware.
* @property {Array} globalMiddleware - Array of global middleware modules.
* @extends ActionHero.Initializer
*/
module.exports = class Tasks extends ActionHero.Initializer {
constructor () {
super()
this.name = 'tasks'
this.loadPriority = 699
this.startPriority = 900
}
initialize () {
api.tasks = {
tasks: {},
jobs: {},
middleware: {},
globalMiddleware: []
}
/**
* @private
*/
api.tasks.loadFile = (fullFilePath, reload) => {
if (!reload) { reload = false }
api.watchFileAndAct(fullFilePath, () => {
api.tasks.loadFile(fullFilePath, true)
})
let task
let collection = require(fullFilePath)
if (typeof collection === 'function') { collection = [collection] }
for (let i in collection) {
const TaskClass = collection[i]
task = new TaskClass()
task.validate()
if (api.tasks.tasks[task.name] && !reload) {
api.log(`an existing task with the same name \`${task.name}\` will be overridden by the file ${fullFilePath}`, 'warning')
}
api.tasks.tasks[task.name] = task
api.tasks.jobs[task.name] = api.tasks.jobWrapper(task.name)
api.log(`task ${(reload ? '(re)' : '')} loaded: ${task.name}, ${fullFilePath}`, 'debug')
}
}
/**
* @private
*/
api.tasks.jobWrapper = (taskName) => {
const task = api.tasks.tasks[taskName]
let middleware = task.middleware || []
let plugins = task.plugins || []
let pluginOptions = task.pluginOptions || []
if (task.frequency > 0) {
if (plugins.indexOf('JobLock') < 0) { plugins.push('JobLock') }
if (plugins.indexOf('QueueLock') < 0) { plugins.push('QueueLock') }
if (plugins.indexOf('DelayQueueLock') < 0) { plugins.push('DelayQueueLock') }
}
// load middleware into plugins
const processMiddleware = (m) => {
if (api.tasks.middleware[m]) {
class Plugin extends NodeResque.Plugin {}
if (api.tasks.middleware[m].preProcessor) { Plugin.prototype.beforePerform = api.tasks.middleware[m].preProcessor }
if (api.tasks.middleware[m].postProcessor) { Plugin.prototype.afterPerform = api.tasks.middleware[m].postProcessor }
if (api.tasks.middleware[m].preEnqueue) { Plugin.prototype.beforeEnqueue = api.tasks.middleware[m].preEnqueue }
if (api.tasks.middleware[m].postEnqueue) { Plugin.prototype.afterEnqueue = api.tasks.middleware[m].postEnqueue }
plugins.push(Plugin)
}
}
api.tasks.globalMiddleware.forEach(processMiddleware)
middleware.forEach(processMiddleware)
return {
plugins: plugins,
pluginOptions: pluginOptions,
perform: async function () {
let combinedArgs = [].concat(Array.prototype.slice.call(arguments))
combinedArgs.push(this)
let response = await task.run.apply(task, combinedArgs)
await api.tasks.enqueueRecurrentTask(taskName)
return response
}
}
}
/**
* Enqueue a task to be performed in the background.
* Will throw an error if redis cannot be reached.
*
* @async
* @param {String} taskName The name of the task.
* @param {Object} params Params to pass to the task.
* @param {string} queue (Optional) Which queue/priority to run this instance of the task on.
* @return {Promise<Boolean>} Did the task enqueue?
*/
api.tasks.enqueue = async (taskName, params, queue) => {
if (!params) { params = {} }
if (!queue) { queue = api.tasks.tasks[taskName].queue }
return api.resque.queue.enqueue(queue, taskName, params)
}
/**
* Enqueue a task to be performed in the background, at a certain time in the future.
* Will throw an error if redis cannot be reached.
*
* @async
* @param {Number} timestamp At what time the task is able to be run. Does not gaurentee that the task will be run at this time. (in ms)
* @param {String} taskName The name of the task.
* @param {Object} params Params to pass to the task.
* @param {string} queue (Optional) Which queue/priority to run this instance of the task on.
* @return {Promise}
*/
api.tasks.enqueueAt = async (timestamp, taskName, params, queue) => {
if (!params) { params = {} }
if (!queue) { queue = api.tasks.tasks[taskName].queue }
return api.resque.queue.enqueueAt(timestamp, queue, taskName, params)
}
/**
* Enqueue a task to be performed in the background, at a certain number of ms from now.
* Will throw an error if redis cannot be reached.
*
* @async
* @param {Number} time How long from now should we wait until it is OK to run this task? (in ms)
* @param {String} taskName The name of the task.
* @param {Object} params Params to pass to the task.
* @param {string} queue (Optional) Which queue/priority to run this instance of the task on.
* @return {Promise}
*/
api.tasks.enqueueIn = async (time, taskName, params, queue) => {
if (!params) { params = {} }
if (!queue) { queue = api.tasks.tasks[taskName].queue }
return api.resque.queue.enqueueIn(time, queue, taskName, params)
}
/**
* Delete a previously enqueued task, which hasn't been run yet, from a queue.
* Will throw an error if redis cannot be reached.
*
* @async
* @param {string} q Which queue/priority is the task stored on?
* @param {string} taskName The name of the job, likley to be the same name as a tak.
* @param {Object|Array} args The arguments of the job. Note, arguments passed to a Task initially may be modified when enqueuing.
* It is best to read job properties first via `api.tasks.queued` or similar method.
* @param {Number} count Of the jobs that match q, taskName, and args, up to what position should we delete? (Default 0; this command is 0-indexed)
* @return {Promise}
*/
api.tasks.del = async (q, taskName, args, count) => {
return api.resque.queue.del(q, taskName, args, count)
}
/**
* Delete all previously enqueued tasks, which haven't been run yet, from all possible delayed timestamps.
* Will throw an error if redis cannot be reached.
*
* @async
* @param {string} q Which queue/priority is to run on?
* @param {string} taskName The name of the job, likley to be the same name as a tak.
* @param {Object|Array} args The arguments of the job. Note, arguments passed to a Task initially may be modified when enqueuing.
* It is best to read job properties first via `api.tasks.delayedAt` or similar method.
* @return {Promise}
*/
api.tasks.delDelayed = async (q, taskName, args) => {
return api.resque.queue.delDelayed(q, taskName, args)
}
/**
* Return the timestamps a task is scheduled for.
* Will throw an error if redis cannot be reached.
*
* @async
* @param {string} q Which queue/priority is to run on?
* @param {string} taskName The name of the job, likley to be the same name as a tak.
* @param {Object|Array} args The arguments of the job. Note, arguments passed to a Task initially may be modified when enqueuing.
* It is best to read job properties first via `api.tasks.delayedAt` or similar method.
* @return {Promise<Array>} Returns an array of timestamps.
*/
api.tasks.scheduledAt = async (q, taskName, args) => {
return api.resque.queue.scheduledAt(q, taskName, args)
}
/**
* Return all resque stats for this namespace (how jobs failed, jobs succeded, etc)
* Will throw an error if redis cannot be reached.
*
* @async
* @return {Promise<Object>} (varies on your redis instance)
*/
api.tasks.stats = async () => {
return api.resque.queue.stats()
}
/**
* Retrieve the details of jobs enqueued on a certain queue between start and stop (0-indexed)
* Will throw an error if redis cannot be reached.
*
* @async
* @param {string} q The name of the queue.
* @param {Number} start The index of the first job to return.
* @param {Number} stop The index of the last job to return.
* @return {Promise<Array>} An array of the jobs enqueued.
*/
api.tasks.queued = async (q, start, stop) => {
return api.resque.queue.queued(q, start, stop)
}
/**
* Delete a queue in redis, and all jobs stored on it.
* Will throw an error if redis cannot be reached.
*
* @async
* @param {string} q The name of the queue.
* @return {Promise}
*/
api.tasks.delQueue = async (q) => {
return api.resque.queue.delQueue(q)
}
/**
* Return any locks, as created by resque plugins or task middleware, in this redis namespace.
* Will contain locks with keys like `resque:lock:{job}` and `resque:workerslock:{workerId}`
* Will throw an error if redis cannot be reached.
*
* @async
* @return {Promise<Object>} Locks, orginzed by type.
*/
api.tasks.locks = async () => {
return api.resque.queue.locks()
}
/**
* Delete a lock on a job or worker. Locks can be found via `api.tasks.locks`
* Will throw an error if redis cannot be reached.
*
* @async
* @param {string} lock The name of the lock.
* @return {Promise}
* @see api.tasks.locks
*/
api.tasks.delLock = async (lock) => {
return api.resque.queue.delLock(lock)
}
/**
* List all timestamps for which tasks are enqueued in the future, via `api.tasks.enqueueIn` or `api.tasks.enqueueAt`
* Will throw an error if redis cannot be reached.
*
* @async
* @return {Promise<Array>} An array of timetamps. Note: These timestamps will be in unix timestamps, not javascript MS timestamps.
* @see api.tasks.enqueueIn
* @see api.tasks.enqueueAt
*/
api.tasks.timestamps = async () => {
return api.resque.queue.timestamps()
}
/**
* Return all jobs which have been enqueued to run at a certain timestamp.
* Will throw an error if redis cannot be reached.
*
* @async
* @param {Number} timestamp The timestamp to return jobs from. Note: timestamp will be a unix timestamp, not javascript MS timestamp.
* @return {Promise<Array>} An array of jobs.
*/
api.tasks.delayedAt = async (timestamp) => {
return api.resque.queue.delayedAt(timestamp)
}
/**
* Retrun all delayed jobs, orginized by the timetsamp at where they are to run at.
* Note: This is a very slow command.
* Will throw an error if redis cannot be reached.
*
* @async
* @return {Promise<Object>}
*/
api.tasks.allDelayed = async () => {
return api.resque.queue.allDelayed()
}
/**
* Retrun all workers registered by all members of this cluster.
* Note: MultiWorker processors each register as a unique worker.
* Will throw an error if redis cannot be reached.
*
* @async
* @return {Promise<Object>}
*/
api.tasks.workers = async () => {
return api.resque.queue.workers()
}
/**
* What is a given worker working on? If the worker is idle, 'started' will be returned.
* Will throw an error if redis cannot be reached.
*
* @async
* @param {string} workerName The worker base name, usually a function of the PID.
* @param {string} queues The queues the worker is assigned to work.
* @return {Promise<Object>}
*/
api.tasks.workingOn = async (workerName, queues) => {
return api.resque.queue.workingOn(workerName, queues)
}
/**
* Return all workers and what job they might be working on.
* Will throw an error if redis cannot be reached.
*
* @async
* @return {Promise<Object>} An Object, with worker names as keys, containing the job they are working on.
* If the worker is idle, 'started' will be returned.
*/
api.tasks.allWorkingOn = async () => {
return api.resque.queue.allWorkingOn()
}
/**
* How many jobs are in the failed queue.
* Will throw an error if redis cannot be reached.
*
* @async
* @return {Promise<Number>} The number of failed jobs at this moment.
*/
api.tasks.failedCount = async () => {
return api.resque.queue.failedCount()
}
/**
* Retrieve the details of failed jobs between start and stop (0-indexed).
* Will throw an error if redis cannot be reached.
*
* @async
* @param {Number} start The index of the first job to return.
* @param {Number} stop The index of the last job to return.
* @return {Promise<Array>} An array of the failed jobs.
*/
api.tasks.failed = async (start, stop) => {
return api.resque.queue.failed(start, stop)
}
/**
* Remove a specific job from the failed queue.
* Will throw an error if redis cannot be reached.
*
* @async
* @param {Object} failedJob The failed job, as defined by `api.tasks.failed`
* @return {Promise}
* @see api.tasks.failed
*/
api.tasks.removeFailed = async (failedJob) => {
return api.resque.queue.removeFailed(failedJob)
}
/**
* Remove a specific job from the failed queue, and retry it by placing it back into its original queue.
* Will throw an error if redis cannot be reached.
*
* @async
* @param {Object} failedJob The failed job, as defined by `api.tasks.failed`
* @return {Promise}
* @see api.tasks.failed
*/
api.tasks.retryAndRemoveFailed = async (failedJob) => {
return api.resque.queue.retryAndRemoveFailed(failedJob)
}
/**
* If a worker process crashes, it will leave its state in redis as "working".
* You can remove workers from redis you know to be over, by specificing an age which would make them too old to exist.
* This method will remove the data created by a 'stuck' worker and move the payload to the error queue.
* However, it will not actually remove any processes which may be running. A job *may* be running that you have removed.
* Will throw an error if redis cannot be reached.
*
* @async
* @param {Number} age The age of workers you know to be over, in seconds.
* @return {Promise<Object>} Details about workers which were removed.
*/
api.tasks.cleanOldWorkers = async (age) => {
return api.resque.queue.cleanOldWorkers(age)
}
/**
* Ensures that a task which has a frequency is either running, or already enqueued.
* This is run automatically at boot for all tasks which have a frequency, via `api.tasks.enqueueAllRecurrentTasks`.
* Will throw an error if redis cannot be reached.
*
* @async
* @param {string} taskName The name of the task.
* @return {Promise}
* @see api.tasks.enqueueAllRecurrentTasks
*/
api.tasks.enqueueRecurrentTask = async (taskName) => {
const task = api.tasks.tasks[taskName]
if (task.frequency > 0) {
await api.tasks.del(task.queue, taskName)
await api.tasks.delDelayed(task.queue, taskName)
await api.tasks.enqueueIn(task.frequency, taskName)
api.log(`re-enqueued recurrent job ${taskName}`, api.config.tasks.schedulerLogging.reEnqueue)
}
}
/**
* This is run automatically at boot for all tasks which have a frequency, calling `api.tasks.enqueueRecurrentTask`
* Will throw an error if redis cannot be reached.
*
* @async
* @return {Promise}
* @see api.tasks.enqueueRecurrentTask
*/
api.tasks.enqueueAllRecurrentTasks = async () => {
let jobs = []
let loadedTasks = []
Object.keys(api.tasks.tasks).forEach((taskName) => {
const task = api.tasks.tasks[taskName]
if (task.frequency > 0) {
jobs.push(async () => {
let toRun = await api.tasks.enqueue(taskName)
if (toRun === true) {
api.log(`enqueuing periodic task: ${taskName}`, api.config.tasks.schedulerLogging.enqueue)
loadedTasks.push(taskName)
}
})
}
})
await api.utils.asyncWaterfall(jobs)
return loadedTasks
}
/**
* Stop a task with a frequency by removing it from all possible queues.
* Will throw an error if redis cannot be reached.
*
* @async
* @param {string} taskName The name of the task.
* @return {Promise<Number>} How many tasks were removed.
*/
api.tasks.stopRecurrentTask = async (taskName) => {
// find the jobs in either the normal queue or delayed queues
const task = api.tasks.tasks[taskName]
if (task.frequency > 0) {
let removedCount = 0
let count = await api.tasks.del(task.queue, task.name, {}, 1)
removedCount = removedCount + count
let timestamps = await api.tasks.delDelayed(task.queue, task.name, {})
removedCount = removedCount + timestamps.length
return removedCount
}
}
/**
* Return wholistic details about the task system, including failures, queues, and workers.
* Will throw an error if redis cannot be reached.
*
* @async
* @return {Promise<Object>} Details about the task system.
*/
api.tasks.details = async () => {
let details = { 'queues': {}, 'workers': {} }
details.workers = await api.tasks.allWorkingOn()
details.stats = await api.tasks.stats()
let queues = await api.resque.queue.queues()
for (let i in queues) {
let queue = queues[i]
let length = await api.resque.queue.length(queue)
details.queues[queue] = { length: length }
}
return details
}
api.tasks.loadTasks = (reload) => {
api.config.general.paths.task.forEach((p) => {
glob.sync(path.join(p, '**', '*.js')).forEach((f) => {
api.tasks.loadFile(f, reload)
})
})
for (let pluginName in api.config.plugins) {
if (api.config.plugins[pluginName].tasks !== false) {
let pluginPath = api.config.plugins[pluginName].path
glob.sync(path.join(pluginPath, 'tasks', '**', '*.js')).forEach((f) => {
api.tasks.loadFile(f, reload)
})
}
}
}
api.tasks.addMiddleware = (middleware) => {
if (!middleware.name) { throw new Error('middleware.name is required') }
if (!middleware.priority) { middleware.priority = api.config.general.defaultMiddlewarePriority }
middleware.priority = Number(middleware.priority)
api.tasks.middleware[middleware.name] = middleware
if (middleware.global === true) {
api.tasks.globalMiddleware.push(middleware.name)
api.utils.sortGlobalMiddleware(api.tasks.globalMiddleware, api.tasks.middleware)
}
api.tasks.loadTasks(true)
}
api.tasks.loadTasks(false)
}
async start () {
if (api.config.redis.enabled === false) { return }
if (api.config.tasks.scheduler === true) {
await api.tasks.enqueueAllRecurrentTasks()
}
}
}