watchasay Action Provider

watchasay

This is a sample Flask application implementing the simple “echo” Unix command line utility as an ActionProvider. This example uses the Flask API helpers in globus_action_provider_tools. Rather than defining each of the Action Provider Interface routes in the Flask application, the helpers declare the necessary routes to Flask, perform the serialization, validation and authentication on the request, and pass only those requests which have satisfied these conditions on to a user-defined implementation of the routes.

To run a watchasay Action, provide a “input_string” parameter indicating the text to have “echoed” back. Since this ActionProvider is synchronous, each Action has its status set to “SUCCEEDED” immediately. Note that this Action Provider is configured to run at the /skeleton endpoint.

Presteps

To run this example Action Provider, you will need to generate your own CLIENT_ID, CLIENT_SECRET, and SCOPE. It may be useful to follow the directions for generating each of these located at README.rst. Once you have those three values, place them into the example Action Provider’s config.py.

Starting the Action Provider

We recommend creating a virtualenvironment to install project dependencies and run the Action Provider. Once the virtualenvironment has been created and activated, run the following:

cd examples/watchasay
pip install -r requirements.txt
python app/provider.py

Testing the Action Provider

We provide example tests to validate that your Action Provider is working and enable some form of continuous integration. To run the example test suite, once again activate the project’s virtualenvironment and run the following:

cd examples/watchasay
pytest

Within these tests, we provide examples of how to use a patch that is useful for testing your Action Provider without using a valid CLIENT_ID, CLIENT_SECRET or request Tokens. Only use this patch during testing.

Actually using the Action Provider

You’ll notice that the only endpoint we can reach without a valid token is the introspect endpoint (/skeleton). Issuing the below command will report the expected request schema and the required scope for using the Provider:

curl http://localhost:5000/skeleton/

Why? It’s because the watchasay Provider has been set to be publicly visible. Setting the introspection endpoint to be publicly visible is useful way of providing documentation on how to interact with the ActionProvider. All other operations on the Action Provider will require a valid token:

curl --request POST \
    --url http://localhost:5000/skeleton/run \
    --header 'authorization: Bearer token' \
    --data '{"request_id": "some-id","body": {"input_string": "hey"}}'

But how to get the token? The recommended route to retrieve a token is to use the globus-automate-client CLI tool. Conveniently, the globus-automate-client CLI tool removes the need to create curl requests and the need to manually format Action request bodies. See the doc on downloading the CLI tool. Once downloaded, issue a command similar to to the one below. The first time you run the command, you will need to follow a flow to request the necessary grants for your Action Provider’s scopes. Later attempts to use the globus-automate-client tool will use locally cached tokens and transparently refresh expired tokens.

globus-automate action-run \
    --action-url http://localhost:5000/skeleton/run \
    --action-scope $YOUR_PROVIDERS_SCOPE \
    --body '{"input_string":"hi"}'

Run the CLI tool with the –help option for more information.

Action Provider Implementation

import datetime
import json
import os
from typing import Dict, Optional, Set, Tuple

from flask import Blueprint, Flask

from examples.watchasay.app import config
from globus_action_provider_tools import (
    ActionProviderDescription,
    ActionRequest,
    ActionStatus,
    ActionStatusValue,
    AuthState,
)
from globus_action_provider_tools.authorization import (
    authorize_action_access_or_404,
    authorize_action_management_or_404,
)
from globus_action_provider_tools.flask import add_action_routes_to_blueprint
from globus_action_provider_tools.flask.exceptions import ActionConflict, ActionNotFound
from globus_action_provider_tools.flask.helpers import assign_json_provider
from globus_action_provider_tools.flask.types import ActionCallbackReturn

# A simulated database mapping input user action requests identifiers to a previously
# seen request id and the corresponding action id
_fake_request_db: Dict[str, Tuple[ActionRequest, str]] = {}

_fake_action_db: Dict[str, ActionStatus] = {}


def _retrieve_action_status(action_id: str) -> ActionStatus:
    status = _fake_action_db.get(action_id)
    if status is None:
        raise ActionNotFound(f"No Action with id {action_id}")
    return status


def load_schema():
    with open(
        os.path.join(os.path.dirname(os.path.abspath(__file__)), "schema.json")
    ) as f:
        schema = json.load(f)
    return schema


def action_enumerate(auth: AuthState, params: Dict[str, Set]):
    """
    This is an optional endpoint, useful for allowing requesters to enumerate
    actions filtered by ActionStatus and role.

    The params argument will always be a dict containing the incoming request's
    validated query arguments. There will be two keys, 'statuses' and 'roles',
    where each maps to a set containing the filter values for the key. A typical
    params object will look like:

        {
            "statuses": {<ActionStatusValue.ACTIVE: 3>},
            "roles": {"creator_id"}
        }

    Notice that the value for the "statuses" key is an Enum value.
    """
    statuses = params["statuses"]
    roles = params["roles"]
    matches = []

    for _, action in _fake_action_db.items():
        if action.status in statuses:
            # Create a set of identities that are allowed to access this action,
            # based on the roles being queried for
            allowed_set = set()
            for role in roles:
                identities = getattr(action, role)
                if isinstance(identities, str):
                    allowed_set.add(identities)
                else:
                    allowed_set.update(identities)

            # Determine if this request's auth allows access based on the
            # allowed_set
            authorized = auth.check_authorization(allowed_set)
            if authorized:
                matches.append(action)

    return matches


def action_run(request: ActionRequest, auth: AuthState) -> ActionCallbackReturn:
    """
    Asynchronous actions most likely need to implement retry logic here to
    prevent duplicate requests with matching request_ids from launching
    another job. In the event that a request with an existing request_id
    and creator_id arrives, this function should simply return the action's
    status via the action_status function.

    Synchronous actions or actions where it makes sense to execute repeated
    runs with the same parameters need not implement retry logic.
    """

    caller_id = auth.effective_identity
    request_id = request.request_id
    full_request_id = f"{caller_id}:{request_id}"
    prev_request = _fake_request_db.get(full_request_id)
    if prev_request is not None:
        # If the a matching ActionRequest has been found, deduplicate the
        # requests and return the Action's status
        if prev_request[0] == request:
            return action_status(prev_request[1], auth)
        # If a pre-existing ActionRequest with different parameters has been
        # found, throw an error as we can't modify an already running Action
        else:
            raise ActionConflict(
                f"Request with id {request_id} already present with different parameters "
            )

    # Local processing happens here
    result_details = {
        # This is safe because the input has been validated against the input schema
        "you_said": request.body["input_string"]
    }

    # Create an ActionStatus that contains the computed results
    status = ActionStatus(
        status=ActionStatusValue.SUCCEEDED,
        creator_id=caller_id or "UNKNOWN",
        monitor_by=request.monitor_by,
        manage_by=request.manage_by,
        start_time=str(datetime.datetime.now().isoformat()),
        completion_time=str(datetime.datetime.now().isoformat()),
        release_after=request.release_after or "P30D",
        display_status=ActionStatusValue.SUCCEEDED,
        details=result_details,
    )

    # Store the request and action_status
    _fake_request_db[full_request_id] = (request, status.action_id)
    _fake_action_db[status.action_id] = status
    return status


def action_status(action_id: str, auth: AuthState) -> ActionCallbackReturn:
    """
    action_status retrieves the most recent state of the action. This endpoint
    requires the user authenticate with a principal value which is in the
    monitor_by list established when the Action was started.
    """
    status = _retrieve_action_status(action_id)
    authorize_action_access_or_404(status, auth)
    return status, 200


def action_cancel(action_id: str, auth: AuthState) -> ActionCallbackReturn:
    """
    Asynchronous actions need not ensure a running action is immediately
    completed or terminated. In this scenario, action_cancel should return
    an action in a non-completion state. If it has completed, return the action's
    status.

    Synchronous actions need not implement any logic in action_cancel. All
    processing happens in the action_run callback so that action_cancel
    simply returns the action_id's status.

    This endpoint requires the user authenticate with a principal value which is
    in the manage_by list established when the Action was started.
    """
    status = _retrieve_action_status(action_id)
    authorize_action_management_or_404(status, auth)

    # If action is already in complete state, return completion details
    if status.status in (ActionStatusValue.SUCCEEDED, ActionStatusValue.FAILED):
        return status

    # Process Action cancellation
    status.status = ActionStatusValue.FAILED
    status.display_status = "Canceled by user request"
    return status


def action_release(action_id: str, auth: AuthState) -> ActionCallbackReturn:
    """
    If the Action is not already in a completion state, action_release should
    return an error as this operation does not attempt to stop execution.
    Synchronous actions need not determine if the action_id is still in a
    processing state. All processing starts and completes in the action_run
    callback so that action_release simply removes the action_id and request_id
    from history and returns the action_id's completion status.

    This endpoint requires the user authenticate with a principal value which is
    in the manage_by list established when the Action was started.
    """
    status = _retrieve_action_status(action_id)
    authorize_action_management_or_404(status, auth)

    # Error if attempt to release an active Action
    if status.status not in (ActionStatusValue.SUCCEEDED, ActionStatusValue.FAILED):
        raise ActionConflict("Action is not complete")

    _fake_action_db.pop(action_id)
    # Both fake and badly inefficient
    remove_req_id: Optional[str] = None
    for req_id, req_and_action_id in _fake_request_db.items():
        if req_and_action_id[1] == action_id:
            remove_req_id = req_id
            break
    if remove_req_id is not None:
        _fake_request_db.pop(remove_req_id)
    return status, 200


def create_app():
    app = Flask(__name__)
    assign_json_provider(app)
    app.url_map.strict_slashes = False

    # Create and define a blueprint onto which the routes will be added
    skeleton_blueprint = Blueprint("skeleton", __name__, url_prefix="/skeleton")

    # Create the ActionProviderDescription with the correct scope and schema
    provider_description = ActionProviderDescription(
        globus_auth_scope=config.our_scope,
        title="skeleton_action_provider",
        admin_contact="support@globus.org",
        synchronous=True,
        input_schema=load_schema(),
        log_supported=False,  # This provider doesn't implement the log callback
        visible_to=["public"],
    )

    # Use the flask helper function to register the endpoint callbacks onto the
    # blueprint
    add_action_routes_to_blueprint(
        blueprint=skeleton_blueprint,
        client_id=config.client_id,
        client_secret=config.client_secret,
        provider_description=provider_description,
        action_run_callback=action_run,
        action_status_callback=action_status,
        action_cancel_callback=action_cancel,
        action_release_callback=action_release,
        action_enumeration_callback=action_enumerate,
        additional_scopes=[
            "https://auth.globus.org/scopes/d3a66776-759f-4316-ba55-21725fe37323/secondary_scope"
        ],
    )

    # Register the blueprint with your flask app before returning it
    app.register_blueprint(skeleton_blueprint)
    return app


def main():
    app = create_app()
    app.run(debug=True)


if __name__ == "__main__":
    main()