whattimeisitrightnow Action Provider

whattimeisitrightnow

From the makers of Philbert: Another exciting promotional tie-in for whattimeisitrightnow.com

This is a sample Flask application implementing the ActionProvider interface. It accepts requests with a “utc_offset” parameter indicating which timezone to return the current UTC time as. To demonstrate some degree of complexity, the application randomly assigns each request an estimated_completion_time so that an action’s results will not be available until the estimated_completion_time. To do this, we store and make use of “private” data fields which are never displayed to any requester. Additionally, some percentage of requests to the ActionProvider fail to demonstrate how to report errors back to the requesters.

Presteps

To run this example Action Provider, you will need to generate your own CLIENT_ID, CLIENT_SECRET, and SCOPE. It may be useful to follow the directions for generating each of these located at README.rst. Once you have those three values, place them into the example Action Provider’s config.py.

Starting the Action Provider

We recommend creating a virtualenvironment to install project dependencies and run the Action Provider. Once the virtualenvironment has been created and activated, run the following:

cd examples/whattimeisitrightnow
pip install -r requirements.txt
python app/app.py

Testing the Action Provider

We provide example tests to validate that your Action Provider is working and enable some form of continuous integration. To run the example test suite, once again activate the project’s virtualenvironment and run the following:

cd examples/whattimeisitrightnow
pytest

Within these tests, we provide examples of how to use a patch that is useful for testing your Action Provider without using a valid CLIENT_ID, CLIENT_SECRET or request Tokens. Only use this patch during testing.

Actually using the Action Provider

You’ll notice that just because its running doesn’t mean we can actually use the Action Provider. In particular, once the whattimeisitrightnow Action Provider is run, this will fail:

curl http://localhost:5000/

Why? It’s because the whattimeisitrightnow Provider has been set to be visible to only authenticated users (see the ActionProviderDescription initialization values). Therefore, requests need proper HTTP authorization headers (i.e. a token needs to be provided):

curl --request GET \
    --url http://localhost:5000/ \
    --header 'authorization: Bearer token'

But how to get the token? The recommended route to retrieve a token is to use the globus-automate-client CLI tool. Conveniently, the globus-automate-client CLI tool removes the need to create curl requests and the need to manually format Action request bodies. See the doc on downloading the CLI tool. Once downloaded, issue a command similar to to the one below. The first time you run the command, you will need to follow a flow to request the necessary grants for your Action Provider’s scopes. Later attempts to use the globus-automate-client tool will use locally cached tokens and transparently refresh expired tokens.

globus-automate action-provider-introspect \
    --action-url http://localhost:5000/ \
    --action-scope $YOUR_PROVIDERS_SCOPE

The globus-automate-client CLI tool can also make requests to endpoints besides the introspection endpoint, for example:

globus-automate action-run \
    --action-url http://localhost:5000/ \
    --action-scope $YOUR_PROVIDERS_SCOPE \
    --body '{"utc_offset": 1}'

Run the CLI tool with the –help option for more information.

Action Provider Implementation

import json
import logging
import os
from datetime import datetime, timedelta, timezone
from random import randint
from typing import Any, Dict, Tuple

from flask import Flask, Response, jsonify, request

from examples.whattimeisitrightnow.app import config
from examples.whattimeisitrightnow.app import error as err
from examples.whattimeisitrightnow.app.database import db
from globus_action_provider_tools.authentication import TokenChecker
from globus_action_provider_tools.authorization import (
    authorize_action_access_or_404,
    authorize_action_management_or_404,
)
from globus_action_provider_tools.data_types import (
    ActionProviderDescription,
    ActionStatus,
    ActionStatusValue,
)
from globus_action_provider_tools.flask import flask_validate_request
from globus_action_provider_tools.flask.helpers import assign_json_provider

app = Flask(__name__)
assign_json_provider(app)

token_checker = TokenChecker(
    config.client_id, config.client_secret, [config.our_scope], config.token_audience
)

COMPLETE_STATES = (ActionStatusValue.SUCCEEDED, ActionStatusValue.FAILED)
INCOMPLETE_STATES = (ActionStatusValue.ACTIVE, ActionStatusValue.INACTIVE)

with open(os.path.join(os.path.dirname(os.path.abspath(__file__)), "schema.json")) as f:
    schema = json.load(f)


@app.errorhandler(err.ApiError)
def handle_invalid_usage(error) -> Response:
    response = jsonify(error.to_dict())
    response.status_code = error.status
    return response


@app.before_request
def before_request() -> None:
    """
    Here we handle some authorization and request validation before the request
    ever makes it to our ActionProvider. We also attach authentication
    information to the request to make it easier to inspect.

    flask_validate_request ensures that we are receiving a valid request
    body from the user.

    token_checker.check_token ensure that the requester provided a valid,
    Globus recognized token for interacting with the Provider.
    """
    validation_result = flask_validate_request(request)
    if validation_result.errors:
        raise err.InvalidRequest(*validation_result.errors)

    token = request.headers.get("Authorization", "").replace("Bearer ", "")
    auth_state = token_checker.check_token(token)
    if not auth_state.identities:
        # Returning these authentication errors to the caller will make debugging
        # easier for this example. Consider whether this is appropriate
        # for your production use case or not.
        raise err.NoAuthentication(*auth_state.errors)
    request.auth = auth_state  # type: ignore


@app.route("/", methods=["GET"])
def introspect() -> Tuple[Response, int]:
    """
    The base endpoint of an ActionProvider should serve as documentation,
    enabling users to introspect the JSON schema required to launch an action.
    This endpoint can be publicly accessible or not. The
    ActionProviderDescription visible_to field public access by default.
    """
    description = ActionProviderDescription(
        globus_auth_scope=config.our_scope,
        title="What Time Is It Right Now?",
        admin_contact="support@whattimeisrightnow.example",
        synchronous=False,
        input_schema=schema,
        api_version="1.0",
        subtitle=(
            "From the makers of Philbert: "
            "Another exciting promotional tie-in for whattimeisitrightnow.com"
        ),
        description="",
        keywords=["time", "whattimeisitnow", "productivity"],
        visible_to=["all_authenticated_users"],
        runnable_by=["all_authenticated_users"],
        administered_by=["support@whattimeisrightnow.example"],
    )
    if not request.auth.check_authorization(  # type: ignore
        description.visible_to, allow_all_authenticated_users=True
    ):
        raise err.NotAuthorized("Not visible_to this access token.")
    return jsonify(description), 200


@app.route("/run", methods=["POST"])
def run() -> Tuple[Response, int]:
    """
    This function implements Action Provider interface for launching an
    action instance.

    This function parses the request_id from a request body and from it
    determines whether a new action should be launched or whether to
    return the status of a previously launched action. To accomplish this,
    it is necessary to store a mapping of request_ids to action_ids in
    addition to a history of actions.
    """
    req = request.get_json(force=True)

    # Deduplicate multiple requests based on request_id
    action_id = db.query(req["request_id"])
    if action_id is not None:
        return status(action_id)
    else:
        action_status = run_action(req)
        # Remove any private data from the ActionStatus before
        # returning it to the requester
        action_status = _filter_private_fields(action_status)
        return jsonify(action_status), 202


def run_action(req) -> ActionStatus:
    """
    A wrapper function to handle executing 'business logic', creating the
    ActionStatus and storing both the request_id:action_id mapping and the
    action_id:action_status mapping.
    """
    now = datetime.now(tz=timezone.utc)

    # Kickoff whatever the action is actually doing
    results = _magic_business_logic(now, req["body"])

    # Create an ActionStatus object with result information
    action_status = ActionStatus(
        status=ActionStatusValue.ACTIVE,
        creator_id=request.auth.effective_identity,  # type: ignore
        label=req.get("label", None),
        monitor_by=req.get("monitor_by", request.auth.identities),  # type: ignore
        manage_by=req.get("manage_by", request.auth.identities),  # type: ignore
        start_time=str(now),
        completion_time=None,
        release_after=req.get("release_after", "P30D"),
        display_status=ActionStatusValue.ACTIVE,
        details=results,
    )

    # Store the request_id for deduplication
    db.persist(req["request_id"], action_status.action_id)

    # Store the details on the running job
    db.persist(action_status.action_id, action_status)
    return action_status


def _filter_private_fields(action_status: ActionStatus) -> ActionStatus:
    """
    Helper function to demonstrate how an ActionStatus object can
    hold private data in its details field and how to filter this
    data before returning an ActionStatus to the requester
    """
    if action_status.details is not None:
        assert isinstance(action_status.details, dict)
        action_status.details.pop("private", None)
    return action_status


def _magic_business_logic(now, request_body) -> Dict[str, Any]:
    """
    This function computes the current time in a different timezone.
    It accepts "utc_offset" to determine the target timezone before
    converting the time and storing it into the results dictionary.

    This function demonstrates how private data can be computed and
    stored.
    """
    try:
        tz = timezone(timedelta(hours=request_body["utc_offset"]))
    except (KeyError, ValueError) as exc:
        raise err.InvalidRequest("Invalid or missing 'utc_offset'", exc)

    # Simulate some amount of processing time
    estimated_processing_time = randint(5, 900)
    estimated_completion_time = now + timedelta(seconds=estimated_processing_time)

    # 30% of our jobs fail because the universe is an imperfect place
    # and we want clients to understand some providers may fail
    success = randint(1, 100) >= 30
    if success:
        results = {
            "estimated_completion_time": estimated_completion_time,
            "private": {
                "success": True,
                "details": {"whattimeisit": now.astimezone(tz)},
            },
        }
    else:
        results = {
            "estimated_completion_time": estimated_completion_time,
            "private": {
                "success": False,
                "details": {
                    "message": "We didn't know what time it was.",
                    "error": "WATCHLESS",
                },
            },
        }
    return results


@app.route("/<action_id>/status", methods=["GET"])
def status(action_id) -> Tuple[Response, int]:
    """
    This function implements Action Provider interface for looking up an
    action's status. This endpoint is used to query actions that may
    still be executing or may have completed.
    """
    # Ensure the requested action_id exists
    action_status = _get_action_status_or_404(action_id)

    # Ensure the user is authorized to view the action status
    authorize_action_access_or_404(action_status, request.auth)  # type: ignore

    action_status = _reconcile_action_status(action_status)

    # Remove any private data from the ActionStatus before
    # returning it to the requester
    action_status = _filter_private_fields(action_status)
    return jsonify(action_status), 200


def _get_action_status_or_404(action_id: str) -> ActionStatus:
    """
    Retrieves an action_status from the database
    """
    action_status = db.query(action_id)

    # Since we're using the same database to store action_ids and
    # request_ids, it's possible a user may make a request for an
    # action_id's status using the request_id. In that event, the
    # db lookup for a request_id will return a str.
    if action_status is None or isinstance(action_status, str):
        raise err.NotFound(f"No action instance found for {action_id}")
    return action_status


def _reconcile_action_status(action_status: ActionStatus) -> ActionStatus:
    """
    Helper function to determine if an Action should have completed, and to
    update its status if necessary. If the Action is already in a completed
    state, its record is returned. If the action is still not scheduled to
    complete, its record is returned unmodified. If the record was scheduled
    to complete, its status is updated, stored, and returned.
    """
    # If status is in a completion state, return
    if action_status.status in COMPLETE_STATES:
        return action_status

    # Make mypy happy...
    if action_status.details is None:
        raise err.DeveloperError(f"{action_status.action_id} has no details.")

    # If it is not yet time for the action to complete, return
    now = datetime.now(tz=timezone.utc)
    assert isinstance(action_status.details, dict)
    if action_status.details["estimated_completion_time"] > now:
        return action_status

    # If the action was scheduled to complete by now, update the ActionStatus
    # object with completion data
    assert isinstance(action_status.details, dict)
    private = action_status.details.pop("private", {})
    action_status.completion_time = action_status.details["estimated_completion_time"]
    action_status.details = private["details"]

    if private["success"]:
        action_status.status = ActionStatusValue.SUCCEEDED
        action_status.display_status = ActionStatusValue.SUCCEEDED
    else:
        action_status.status = ActionStatusValue.FAILED
        action_status.display_status = ActionStatusValue.FAILED

    # Persist updates to the ActionStatus
    db.persist(action_status.action_id, action_status)
    return action_status


@app.route("/<action_id>/cancel", methods=["POST"])
def cancel(action_id: str) -> Tuple[Response, int]:
    """
    This function implements the ActionProvider interface for cancelling an
    action. As noted in the documentation, this operation does not need to
    force the action to immediately cancel.
    """
    # Ensure the requested action_id exists
    action_status = _get_action_status_or_404(action_id)

    # Ensure the user is authorized to manage the action's state
    authorize_action_management_or_404(action_status, request.auth)  # type: ignore

    # Reconcile before cancelling to determine if action already completed
    action_status = _reconcile_action_status(action_status)

    if action_status.status in COMPLETE_STATES:
        raise err.InvalidState(f"Cannot cancel, {action_id} already completed.")

    # Interrupt / cancel the job if it's still running
    action_status = _cancel_job(action_status)
    return jsonify(action_status), 200


def _cancel_job(action_status) -> ActionStatus:
    """
    Helper function used to set an action's status fields to cancel.
    Once cancelled, updates are persisted to the database.
    """
    action_status.status = ActionStatusValue.FAILED
    action_status.display_status = ActionStatusValue.FAILED
    action_status.completion_time = datetime.now(tz=timezone.utc)
    action_status.details = {"message": "Job cancelled", "error": "CANCELLED"}

    db.persist(action_status.action_id, action_status)
    return action_status


@app.route("/<action_id>/release", methods=["POST"])
def release(action_id: str) -> Tuple[Response, int]:
    """
    Releasing an Action erases all records of its execution from the
    Provider's history. Subsequent lookups for the Action's execution
    will fail.
    """
    # Ensure the requested action_id exists
    action_status = _get_action_status_or_404(action_id)

    # Ensure the user is authorized to manage the action's state
    authorize_action_management_or_404(action_status, request.auth)  # type: ignore

    # Reconcile before cancelling to determine if action already completed
    action_status = _reconcile_action_status(action_status)

    if action_status.status in INCOMPLETE_STATES:
        raise err.InvalidState(f"Cannot release, {action_id} has not completed.")

    db.delete(action_id)
    return jsonify(action_status), 200


def main():
    logging.basicConfig(level=logging.INFO)
    app.run(debug=True)


if __name__ == "__main__":
    main()