um number of action to include in claim. * @param \DateTime $before_date Jobs must be schedule before this date. Defaults to now. * @param array $hooks Hooks to filter for. * @param string $group Group to filter for. * * @return ActionScheduler_ActionClaim */ public function stake_claim( $max_actions = 10, \DateTime $before_date = null, $hooks = array(), $group = '' ) { $claim_id = $this->generate_claim_id(); $this->claim_before_date = $before_date; $this->claim_actions( $claim_id, $max_actions, $before_date, $hooks, $group ); $action_ids = $this->find_actions_by_claim_id( $claim_id ); $this->claim_before_date = null; return new ActionScheduler_ActionClaim( $claim_id, $action_ids ); } /** * Generate a new action claim. * * @return int Claim ID. */ protected function generate_claim_id() { /** @var \wpdb $wpdb */ global $wpdb; $now = as_get_datetime_object(); $wpdb->insert( $wpdb->actionscheduler_claims, array( 'date_created_gmt' => $now->format( 'Y-m-d H:i:s' ) ) ); return $wpdb->insert_id; } /** * Set a claim filter. * * @param string $filter_name Claim filter name. * @param mixed $filter_values Values to filter. * @return void */ public function set_claim_filter( $filter_name, $filter_values ) { if ( isset( $this->claim_filters[ $filter_name ] ) ) { $this->claim_filters[ $filter_name ] = $filter_values; } } /** * Get the claim filter value. * * @param string $filter_name Claim filter name. * @return mixed */ public function get_claim_filter( $filter_name ) { if ( isset( $this->claim_filters[ $filter_name ] ) ) { return $this->claim_filters[ $filter_name ]; } return ''; } /** * Mark actions claimed. * * @param string $claim_id Claim Id. * @param int $limit Number of action to include in claim. * @param \DateTime $before_date Should use UTC timezone. * @param array $hooks Hooks to filter for. * @param string $group Group to filter for. * * @return int The number of actions that were claimed. * @throws \InvalidArgumentException Throws InvalidArgumentException if group doesn't exist. * @throws \RuntimeException Throws RuntimeException if unable to claim action. */ protected function claim_actions( $claim_id, $limit, \DateTime $before_date = null, $hooks = array(), $group = '' ) { /** @var \wpdb $wpdb */ global $wpdb; $now = as_get_datetime_object(); $date = is_null( $before_date ) ? $now : clone $before_date; // can't use $wpdb->update() because of the <= condition. $update = "UPDATE {$wpdb->actionscheduler_actions} SET claim_id=%d, last_attempt_gmt=%s, last_attempt_local=%s"; $params = array( $claim_id, $now->format( 'Y-m-d H:i:s' ), current_time( 'mysql' ), ); // Set claim filters. if ( ! empty( $hooks ) ) { $this->set_claim_filter( 'hooks', $hooks ); } else { $hooks = $this->get_claim_filter( 'hooks' ); } if ( ! empty( $group ) ) { $this->set_claim_filter( 'group', $group ); } else { $group = $this->get_claim_filter( 'group' ); } $where = 'WHERE claim_id = 0 AND scheduled_date_gmt <= %s AND status=%s'; $params[] = $date->format( 'Y-m-d H:i:s' ); $params[] = self::STATUS_PENDING; if ( ! empty( $hooks ) ) { $placeholders = array_fill( 0, count( $hooks ), '%s' ); $where .= ' AND hook IN (' . join( ', ', $placeholders ) . ')'; $params = array_merge( $params, array_values( $hooks ) ); } $group_operator = 'IN'; if ( empty( $group ) ) { $group = $this->get_claim_filter( 'exclude-groups' ); $group_operator = 'NOT IN'; } if ( ! empty( $group ) ) { $group_ids = $this->get_group_ids( $group, false ); // throw exception if no matching group(s) found, this matches ActionScheduler_wpPostStore's behaviour. if ( empty( $group_ids ) ) { throw new InvalidArgumentException( sprintf( /* translators: %s: group name(s) */ _n( 'The group "%s" does not exist.', 'The groups "%s" do not exist.', is_array( $group ) ? count( $group ) : 1, 'action-scheduler' ), $group ) ); } $id_list = implode( ',', array_map( 'intval', $group_ids ) ); $where .= " AND group_id {$group_operator} ( $id_list )"; } /** * Sets the order-by clause used in the action claim query. * * @since 3.4.0 * * @param string $order_by_sql */ $order = apply_filters( 'action_scheduler_claim_actions_order_by', 'ORDER BY priority ASC, attempts ASC, scheduled_date_gmt ASC, action_id ASC' ); $params[] = $limit; $sql = $wpdb->prepare( "{$update} {$where} {$order} LIMIT %d", $params ); // phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQLPlaceholders $rows_affected = $wpdb->query( $sql ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared, WordPress.DB.DirectDatabaseQuery.DirectQuery, WordPress.DB.DirectDatabaseQuery.NoCaching if ( false === $rows_affected ) { $error = empty( $wpdb->last_error ) ? _x( 'unknown', 'database error', 'action-scheduler' ) : $wpdb->last_error; throw new \RuntimeException( sprintf( /* translators: %s database error. */ __( 'Unable to claim actions. Database error: %s.', 'action-scheduler' ), $error ) ); } return (int) $rows_affected; } /** * Get the number of active claims. * * @return int */ public function get_claim_count() { global $wpdb; $sql = "SELECT COUNT(DISTINCT claim_id) FROM {$wpdb->actionscheduler_actions} WHERE claim_id != 0 AND status IN ( %s, %s)"; $sql = $wpdb->prepare( $sql, array( self::STATUS_PENDING, self::STATUS_RUNNING ) ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared return (int) $wpdb->get_var( $sql ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared } /** * Return an action's claim ID, as stored in the claim_id column. * * @param string $action_id Action ID. * @return mixed */ public function get_claim_id( $action_id ) { /** @var \wpdb $wpdb */ global $wpdb; $sql = "SELECT claim_id FROM {$wpdb->actionscheduler_actions} WHERE action_id=%d"; $sql = $wpdb->prepare( $sql, $action_id ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared return (int) $wpdb->get_var( $sql ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared } /** * Retrieve the action IDs of action in a claim. * * @param int $claim_id Claim ID. * @return int[] */ public function find_actions_by_claim_id( $claim_id ) { /** @var \wpdb $wpdb */ global $wpdb; $action_ids = array(); $before_date = isset( $this->claim_before_date ) ? $this->claim_before_date : as_get_datetime_object(); $cut_off = $before_date->format( 'Y-m-d H:i:s' ); $sql = $wpdb->prepare( "SELECT action_id, scheduled_date_gmt FROM {$wpdb->actionscheduler_actions} WHERE claim_id = %d ORDER BY priority ASC, attempts ASC, scheduled_date_gmt ASC, action_id ASC", $claim_id ); // Verify that the scheduled date for each action is within the expected bounds (in some unusual // cases, we cannot depend on MySQL to honor all of the WHERE conditions we specify). foreach ( $wpdb->get_results( $sql ) as $claimed_action ) { // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared if ( $claimed_action->scheduled_date_gmt <= $cut_off ) { $action_ids[] = absint( $claimed_action->action_id ); } } return $action_ids; } /** * Release actions from a claim and delete the claim. * * @param ActionScheduler_ActionClaim $claim Claim object. */ public function release_claim( ActionScheduler_ActionClaim $claim ) { /** @var \wpdb $wpdb */ global $wpdb; /** * Deadlock warning: This function modifies actions to release them from claims that have been processed. Earlier, we used to it in a atomic query, i.e. we would update all actions belonging to a particular claim_id with claim_id = 0. * While this was functionally correct, it would cause deadlock, since this update query will hold a lock on the claim_id_.. index on the action table. * This allowed the possibility of a race condition, where the claimer query is also running at the same time, then the claimer query will also try to acquire a lock on the claim_id_.. index, and in this case if claim release query has already progressed to the point of acquiring the lock, but have not updated yet, it would cause a deadlock. * * We resolve this by getting all the actions_id that we want to release claim from in a separate query, and then releasing the claim on each of them. This way, our lock is acquired on the action_id index instead of the claim_id index. Note that the lock on claim_id will still be acquired, but it will only when we actually make the update, rather than when we select the actions. */ $action_ids = $wpdb->get_col( $wpdb->prepare( "SELECT action_id FROM {$wpdb->actionscheduler_actions} WHERE claim_id = %d", $claim->get_id() ) ); $row_updates = 0; if ( count( $action_ids ) > 0 ) { $action_id_string = implode( ',', array_map( 'absint', $action_ids ) ); $row_updates = $wpdb->query( "UPDATE {$wpdb->actionscheduler_actions} SET claim_id = 0 WHERE action_id IN ({$action_id_string})" ); // phpcs:ignore WordPress.DB.PreparedSQL.InterpolatedNotPrepared } $wpdb->delete( $wpdb->actionscheduler_claims, array( 'claim_id' => $claim->get_id() ), array( '%d' ) ); if ( $row_updates < count( $action_ids ) ) { throw new RuntimeException( sprintf( // translators: %d is an id. __( 'Unable to release actions from claim id %d.', 'action-scheduler' ), $claim->get_id() ) ); } } /** * Remove the claim from an action. * * @param int $action_id Action ID. * * @return void */ public function unclaim_action( $action_id ) { /** @var \wpdb $wpdb */ global $wpdb; $wpdb->update( $wpdb->actionscheduler_actions, array( 'claim_id' => 0 ), array( 'action_id' => $action_id ), array( '%s' ), array( '%d' ) ); } /** * Mark an action as failed. * * @param int $action_id Action ID. * @throws \InvalidArgumentException Throw an exception if action was not updated. */ public function mark_failure( $action_id ) { /** @var \wpdb $wpdb */ global $wpdb; $updated = $wpdb->update( $wpdb->actionscheduler_actions, array( 'status' => self::STATUS_FAILED ), array( 'action_id' => $action_id ), array( '%s' ), array( '%d' ) ); if ( empty( $updated ) ) { /* translators: %s is the action ID */ throw new \InvalidArgumentException( sprintf( __( 'Unidentified action %s: we were unable to mark this action as having failed. It may may have been deleted by another process.', 'action-scheduler' ), $action_id ) ); } } /** * Add execution message to action log. * * @throws Exception If the action status cannot be updated to self::STATUS_RUNNING ('in-progress'). * * @param int $action_id Action ID. * * @return void */ public function log_execution( $action_id ) { /** @var \wpdb $wpdb */ global $wpdb; $sql = "UPDATE {$wpdb->actionscheduler_actions} SET attempts = attempts+1, status=%s, last_attempt_gmt = %s, last_attempt_local = %s WHERE action_id = %d"; $sql = $wpdb->prepare( $sql, self::STATUS_RUNNING, current_time( 'mysql', true ), current_time( 'mysql' ), $action_id ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared $status_updated = $wpdb->query( $sql ); if ( ! $status_updated ) { throw new Exception( sprintf( /* translators: 1: action ID. 2: status slug. */ __( 'Unable to update the status of action %1$d to %2$s.', 'action-scheduler' ), $action_id, self::STATUS_RUNNING ) ); } } /** * Mark an action as complete. * * @param int $action_id Action ID. * * @return void * @throws \InvalidArgumentException Throw an exception if action was not updated. */ public function mark_complete( $action_id ) { /** @var \wpdb $wpdb */ global $wpdb; $updated = $wpdb->update( $wpdb->actionscheduler_actions, array( 'status' => self::STATUS_COMPLETE, 'last_attempt_gmt' => current_time( 'mysql', true ), 'last_attempt_local' => current_time( 'mysql' ), ), array( 'action_id' => $action_id ), array( '%s' ), array( '%d' ) ); if ( empty( $updated ) ) { /* translators: %s is the action ID */ throw new \InvalidArgumentException( sprintf( __( 'Unidentified action %s: we were unable to mark this action as having completed. It may may have been deleted by another process.', 'action-scheduler' ), $action_id ) ); } /** * Fires after a scheduled action has been completed. * * @since 3.4.2 * * @param int $action_id Action ID. */ do_action( 'action_scheduler_completed_action', $action_id ); } /** * Get an action's status. * * @param int $action_id Action ID. * * @return string * @throws \InvalidArgumentException Throw an exception if not status was found for action_id. * @throws \RuntimeException Throw an exception if action status could not be retrieved. */ public function get_status( $action_id ) { /** @var \wpdb $wpdb */ global $wpdb; $sql = "SELECT status FROM {$wpdb->actionscheduler_actions} WHERE action_id=%d"; $sql = $wpdb->prepare( $sql, $action_id ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared $status = $wpdb->get_var( $sql ); // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared if ( null === $status ) { throw new \InvalidArgumentException( __( 'Invalid action ID. No status found.', 'action-scheduler' ) ); } elseif ( empty( $status ) ) { throw new \RuntimeException( __( 'Unknown status found for action.', 'action-scheduler' ) ); } else { return $status; } } }