Commit ae04dfc7 authored by Hervé  MENAGER's avatar Hervé MENAGER
Browse files

store quickfixed version of Galaxy's `workflows.py`

from PR #105 (https://github.com/common-workflow-language/galaxy/pull/105)
allow to upload CWL workflows to Galaxy using the API
parent d9547154
from __future__ import absolute_import
import json
import logging
import os
import uuid
from collections import namedtuple
from gxformat2 import (
from_galaxy_native,
ImporterGalaxyInterface,
ImportOptions,
python_to_workflow,
)
from gxformat2.converter import ordered_load
from six import string_types
from sqlalchemy import and_
from sqlalchemy.orm import joinedload, subqueryload
from galaxy import (
exceptions,
model,
util
)
from galaxy.jobs.actions.post import ActionBox
from galaxy.model.item_attrs import UsesAnnotations
from galaxy.tools.cwl import workflow_proxy
from galaxy.tools.parameters import (
params_to_incoming,
visit_input_values
)
from galaxy.tools.parameters.basic import (
DataCollectionToolParameter,
DataToolParameter,
RuntimeValue,
workflow_building_modes
)
from galaxy.util.json import safe_loads
from galaxy.util.sanitize_html import sanitize_html
from galaxy.web import url_for
from galaxy.workflow.modules import (
is_tool_module_type,
module_factory,
ToolModule,
WorkflowModuleInjector
)
from galaxy.workflow.resources import get_resource_mapper_function
from galaxy.workflow.steps import attach_ordered_steps
from .base import decode_id
log = logging.getLogger(__name__)
class WorkflowsManager(object):
""" Handle CRUD type operations related to workflows. More interesting
stuff regarding workflow execution, step sorting, etc... can be found in
the galaxy.workflow module.
"""
def __init__(self, app):
self.app = app
def get_stored_workflow(self, trans, workflow_id):
""" Use a supplied ID (UUID or encoded stored workflow ID) to find
a workflow.
"""
if util.is_uuid(workflow_id):
# see if they have passed in the UUID for a workflow that is attached to a stored workflow
workflow_uuid = uuid.UUID(workflow_id)
workflow_query = trans.sa_session.query(trans.app.model.StoredWorkflow).filter(and_(
trans.app.model.StoredWorkflow.latest_workflow_id == trans.app.model.Workflow.id,
trans.app.model.Workflow.uuid == workflow_uuid
))
else:
workflow_id = decode_id(self.app, workflow_id)
workflow_query = trans.sa_session.query(trans.app.model.StoredWorkflow).\
filter(trans.app.model.StoredWorkflow.id == workflow_id)
stored_workflow = workflow_query.options(joinedload('annotations'),
joinedload('tags'),
subqueryload('latest_workflow').joinedload('steps').joinedload('*')).first()
if stored_workflow is None:
raise exceptions.ObjectNotFound("No such workflow found.")
return stored_workflow
def get_stored_accessible_workflow(self, trans, workflow_id):
""" Get a stored workflow from a encoded stored workflow id and
make sure it accessible to the user.
"""
stored_workflow = self.get_stored_workflow(trans, workflow_id)
# check to see if user has permissions to selected workflow
if stored_workflow.user != trans.user and not trans.user_is_admin and not stored_workflow.published:
if trans.sa_session.query(trans.app.model.StoredWorkflowUserShareAssociation).filter_by(user=trans.user, stored_workflow=stored_workflow).count() == 0:
message = "Workflow is not owned by or shared with current user"
raise exceptions.ItemAccessibilityException(message)
return stored_workflow
def get_owned_workflow(self, trans, encoded_workflow_id):
""" Get a workflow (non-stored) from a encoded workflow id and
make sure it accessible to the user.
"""
workflow_id = decode_id(self.app, encoded_workflow_id)
workflow = trans.sa_session.query(model.Workflow).get(workflow_id)
self.check_security(trans, workflow, check_ownership=True)
return workflow
def check_security(self, trans, has_workflow, check_ownership=True, check_accessible=True):
""" check accessibility or ownership of workflows, storedworkflows, and
workflowinvocations. Throw an exception or returns True if user has
needed level of access.
"""
if not check_ownership and not check_accessible:
return True
# If given an invocation verify ownership of invocation
if isinstance(has_workflow, model.WorkflowInvocation):
# We use the the owner of the history that is associated to the invocation as a proxy
# for the owner of the invocation.
if trans.user != has_workflow.history.user and not trans.user_is_admin:
raise exceptions.ItemOwnershipException()
else:
return True
# stored workflow contains security stuff - follow that workflow to
# that unless given a stored workflow.
if isinstance(has_workflow, model.Workflow):
stored_workflow = has_workflow.top_level_stored_workflow
else:
stored_workflow = has_workflow
if stored_workflow.user != trans.user and not trans.user_is_admin:
if check_ownership:
raise exceptions.ItemOwnershipException()
# else check_accessible...
if trans.sa_session.query(model.StoredWorkflowUserShareAssociation).filter_by(user=trans.user, stored_workflow=stored_workflow).count() == 0:
raise exceptions.ItemAccessibilityException()
return True
def get_invocation(self, trans, decoded_invocation_id):
workflow_invocation = trans.sa_session.query(
self.app.model.WorkflowInvocation
).get(decoded_invocation_id)
if not workflow_invocation:
encoded_wfi_id = trans.security.encode_id(decoded_invocation_id)
message = "'%s' is not a valid workflow invocation id" % encoded_wfi_id
raise exceptions.ObjectNotFound(message)
self.check_security(trans, workflow_invocation, check_ownership=True, check_accessible=False)
return workflow_invocation
def cancel_invocation(self, trans, decoded_invocation_id):
workflow_invocation = self.get_invocation(trans, decoded_invocation_id)
cancelled = workflow_invocation.cancel()
if cancelled:
trans.sa_session.add(workflow_invocation)
trans.sa_session.flush()
else:
# TODO: More specific exception?
raise exceptions.MessageException("Cannot cancel an inactive workflow invocation.")
return workflow_invocation
def get_invocation_step(self, trans, decoded_workflow_invocation_step_id):
try:
workflow_invocation_step = trans.sa_session.query(
model.WorkflowInvocationStep
).get(decoded_workflow_invocation_step_id)
except Exception:
raise exceptions.ObjectNotFound()
self.check_security(trans, workflow_invocation_step.workflow_invocation, check_ownership=True, check_accessible=False)
return workflow_invocation_step
def update_invocation_step(self, trans, decoded_workflow_invocation_step_id, action):
if action is None:
raise exceptions.RequestParameterMissingException("Updating workflow invocation step requires an action parameter. ")
workflow_invocation_step = self.get_invocation_step(trans, decoded_workflow_invocation_step_id)
workflow_invocation = workflow_invocation_step.workflow_invocation
if not workflow_invocation.active:
raise exceptions.RequestParameterInvalidException("Attempting to modify the state of an completed workflow invocation.")
step = workflow_invocation_step.workflow_step
module = module_factory.from_workflow_step(trans, step)
performed_action = module.do_invocation_step_action(step, action)
workflow_invocation_step.action = performed_action
trans.sa_session.add(workflow_invocation_step)
trans.sa_session.flush()
return workflow_invocation_step
def build_invocations_query(self, trans, stored_workflow_id=None, history_id=None, user_id=None):
"""Get invocations owned by the current user."""
sa_session = trans.sa_session
invocations_query = sa_session.query(model.WorkflowInvocation)
if stored_workflow_id is not None:
stored_workflow = sa_session.query(model.StoredWorkflow).get(stored_workflow_id)
if not stored_workflow:
raise exceptions.ObjectNotFound()
invocations_query = invocations_query.join(
model.Workflow
).filter(
model.Workflow.table.c.stored_workflow_id == stored_workflow_id
)
if user_id is not None:
invocations_query = invocations_query.join(
model.History
).filter(
model.History.table.c.user_id == user_id
)
if history_id is not None:
invocations_query = invocations_query.filter(
model.WorkflowInvocation.table.c.history_id == history_id
)
return [inv for inv in invocations_query if self.check_security(trans,
inv,
check_ownership=True,
check_accessible=False)]
def serialize_workflow_invocation(self, invocation, **kwd):
app = self.app
view = kwd.get("view", "element")
step_details = util.string_as_bool(kwd.get('step_details', False))
legacy_job_state = util.string_as_bool(kwd.get('legacy_job_state', False))
as_dict = invocation.to_dict(view, step_details=step_details, legacy_job_state=legacy_job_state)
return app.security.encode_all_ids(as_dict, recursive=True)
def serialize_workflow_invocations(self, invocations, **kwd):
if "view" not in kwd:
kwd["view"] = "collection"
return list(map(lambda i: self.serialize_workflow_invocation(i, **kwd), invocations))
CreatedWorkflow = namedtuple("CreatedWorkflow", ["stored_workflow", "workflow", "missing_tools"])
def artifact_class(trans, as_dict):
object_id = as_dict.get("object_id", None)
if as_dict.get("src", None) == "from_path":
if trans and not trans.user_is_admin:
raise exceptions.AdminRequiredException()
workflow_path = as_dict.get("path")
with open(workflow_path, "r") as f:
as_dict = ordered_load(f)
artifact_class = as_dict.get("class", None)
if artifact_class is None and "$graph" in as_dict:
object_id = object_id or "main"
graph = as_dict["$graph"]
target_object = None
if isinstance(graph, dict):
target_object = graph.get(object_id)
else:
for item in graph:
found_id = item.get("id")
if found_id == object_id or found_id == "#" + object_id:
target_object = item
if target_object and target_object.get("class"):
artifact_class = target_object["class"]
return artifact_class, as_dict, object_id
class WorkflowContentsManager(UsesAnnotations):
def __init__(self, app):
self.app = app
self._resource_mapper_function = get_resource_mapper_function(app)
def ensure_raw_description(self, dict_or_raw_description):
if not isinstance(dict_or_raw_description, RawWorkflowDescription):
dict_or_raw_description = RawWorkflowDescription(dict_or_raw_description)
return dict_or_raw_description
def normalize_workflow_format(self, trans, as_dict):
"""Process incoming workflow descriptions for consumption by other methods.
Currently this mostly means converting format 2 workflows into standard Galaxy
workflow JSON for consumption for the rest of this module. In the future we will
want to be a lot more percise about this - preserve the original description along
side the data model and apply updates in a way that largely preserves YAML structure
so workflows can be extracted.
"""
workflow_directory = None
workflow_path = None
if as_dict.get("src", None) == "from_path":
if not trans.user_is_admin:
raise exceptions.AdminRequiredException()
workflow_path = as_dict.get("path")
workflow_directory = os.path.normpath(os.path.dirname(workflow_path))
workflow_class, as_dict, object_id = artifact_class(trans, as_dict)
if workflow_class == "GalaxyWorkflow" or "yaml_content" in as_dict:
# Format 2 Galaxy workflow.
galaxy_interface = Format2ConverterGalaxyInterface()
import_options = ImportOptions()
import_options.deduplicate_subworkflows = True
as_dict = python_to_workflow(as_dict, galaxy_interface, workflow_directory=workflow_directory, import_options=import_options)
elif workflow_class == "Workflow":
from galaxy.tools.cwl import workflow_proxy
# create a temporary file for the workflow if it is provided
# as JSON, to make it parseable by the WorkflowProxy
if workflow_path is None:
import tempfile, os
f = tempfile.NamedTemporaryFile(delete=False)
json.dump(as_dict, f)
workflow_path = f.name
f.close()
if object_id:
workflow_path += "#" + object_id
wf_proxy = workflow_proxy(workflow_path)
os.unlink(f.name)
else:
# TODO: consume and use object_id...
if object_id:
workflow_path += "#" + object_id
wf_proxy = workflow_proxy(workflow_path)
tool_reference_proxies = wf_proxy.tool_reference_proxies()
for tool_reference_proxy in tool_reference_proxies:
# TODO: Namespace IDS in workflows.
representation = tool_reference_proxy.to_persistent_representation()
self.app.dynamic_tool_manager.create_tool(trans, {
"representation": representation,
}, allow_load=True)
as_dict = wf_proxy.to_dict()
return RawWorkflowDescription(as_dict, workflow_path)
def build_workflow_from_raw_description(
self,
trans,
raw_workflow_description,
source=None,
add_to_menu=False,
publish=False,
create_stored_workflow=True,
exact_tools=True,
fill_defaults=False,
):
data = raw_workflow_description.as_dict
# Put parameters in workflow mode
trans.workflow_building_mode = workflow_building_modes.ENABLED
# If there's a source, put it in the workflow name.
if 'name' not in data:
raise Exception("Invalid workflow format detected [%s]" % data)
workflow_input_name = data['name']
if source:
name = "%s (imported from %s)" % (workflow_input_name, source)
else:
name = workflow_input_name
workflow, missing_tool_tups = self._workflow_from_raw_description(
trans,
raw_workflow_description,
name=name,
exact_tools=exact_tools,
fill_defaults=fill_defaults,
)
if 'uuid' in data:
workflow.uuid = data['uuid']
if create_stored_workflow:
# Connect up
stored = model.StoredWorkflow()
stored.from_path = raw_workflow_description.workflow_path
stored.name = workflow.name
workflow.stored_workflow = stored
stored.latest_workflow = workflow
stored.user = trans.user
stored.published = publish
if data['annotation']:
annotation = sanitize_html(data['annotation'])
self.add_item_annotation(trans.sa_session, stored.user, stored, annotation)
workflow_tags = data.get('tags', [])
trans.app.tag_handler.set_tags_from_list(user=trans.user, item=stored, new_tags_list=workflow_tags)
# Persist
trans.sa_session.add(stored)
if add_to_menu:
if trans.user.stored_workflow_menu_entries is None:
trans.user.stored_workflow_menu_entries = []
menuEntry = model.StoredWorkflowMenuEntry()
menuEntry.stored_workflow = stored
trans.user.stored_workflow_menu_entries.append(menuEntry)
else:
stored = None
# Persist
trans.sa_session.add(workflow)
trans.sa_session.flush()
return CreatedWorkflow(
stored_workflow=stored,
workflow=workflow,
missing_tools=missing_tool_tups
)
def update_workflow_from_raw_description(self, trans, stored_workflow, raw_workflow_description, **kwds):
raw_workflow_description = self.ensure_raw_description(raw_workflow_description)
# Put parameters in workflow mode
trans.workflow_building_mode = workflow_building_modes.ENABLED
workflow, missing_tool_tups = self._workflow_from_raw_description(
trans,
raw_workflow_description,
name=stored_workflow.name,
**kwds
)
if missing_tool_tups:
errors = []
for missing_tool_tup in missing_tool_tups:
errors.append("Step %i: Requires tool '%s'." % (int(missing_tool_tup[3]) + 1, missing_tool_tup[0]))
raise MissingToolsException(workflow, errors)
# Connect up
workflow.stored_workflow = stored_workflow
stored_workflow.latest_workflow = workflow
# Persist
trans.sa_session.flush()
if stored_workflow.from_path:
self._sync_stored_workflow(trans, stored_workflow)
# Return something informative
errors = []
if workflow.has_errors:
errors.append("Some steps in this workflow have validation errors")
if workflow.has_cycles:
errors.append("This workflow contains cycles")
return workflow, errors
def _workflow_from_raw_description(self, trans, raw_workflow_description, name, **kwds):
data = raw_workflow_description.as_dict
if isinstance(data, string_types):
data = json.loads(data)
if "src" in data:
assert data["src"] == "path"
wf_proxy = workflow_proxy(data["path"])
data = wf_proxy.to_dict()
# Create new workflow from source data
workflow = model.Workflow()
workflow.name = name
# Assume no errors until we find a step that has some
workflow.has_errors = False
# Create each step
steps = []
# The editor will provide ids for each step that we don't need to save,
# but do need to use to make connections
steps_by_external_id = {}
# Preload dependent workflows with locally defined content_ids.
subworkflows = data.get("subworkflows")
subworkflow_id_map = None
if subworkflows:
subworkflow_id_map = {}
for key, subworkflow_dict in subworkflows.items():
subworkflow = self.__build_embedded_subworkflow(trans, subworkflow_dict, **kwds)
subworkflow_id_map[key] = subworkflow
# Keep track of tools required by the workflow that are not available in
# the local Galaxy instance. Each tuple in the list of missing_tool_tups
# will be ( tool_id, tool_name, tool_version ).
missing_tool_tups = []
for step_dict in self.__walk_step_dicts(data):
self.__load_subworkflows(trans, step_dict, subworkflow_id_map, **kwds)
for step_dict in self.__walk_step_dicts(data):
module, step = self.__module_from_dict(trans, steps, steps_by_external_id, step_dict, **kwds)
is_tool = is_tool_module_type(module.type)
if is_tool and module.tool is None:
missing_tool_tup = (module.tool_id, module.get_name(), module.tool_version, step_dict['id'])
if missing_tool_tup not in missing_tool_tups:
missing_tool_tups.append(missing_tool_tup)
if module.get_errors():
workflow.has_errors = True
# Second pass to deal with connections between steps
self.__connect_workflow_steps(steps, steps_by_external_id)
# Order the steps if possible
attach_ordered_steps(workflow, steps)
return workflow, missing_tool_tups
def workflow_to_dict(self, trans, stored, style="export", version=None):
""" Export the workflow contents to a dictionary ready for JSON-ification and to be
sent out via API for instance. There are three styles of export allowed 'export', 'instance', and
'editor'. The Galaxy team will do its best to preserve the backward compatibility of the
'export' style - this is the export method meant to be portable across Galaxy instances and over
time. The 'editor' style is subject to rapid and unannounced changes. The 'instance' export
option describes the workflow in a context more tied to the current Galaxy instance and includes
fields like 'url' and 'url' and actual unencoded step ids instead of 'order_index'.
"""
def to_format_2(wf_dict, **kwds):
return from_galaxy_native(wf_dict, None, **kwds)
if version == '':
version = None
if version is not None:
version = int(version)
workflow = stored.get_internal_version(version)
if style == "export":
style = self.app.config.default_workflow_export_format
if style == "editor":
wf_dict = self._workflow_to_dict_editor(trans, stored, workflow)
elif style == "legacy":
wf_dict = self._workflow_to_dict_instance(stored, workflow=workflow, legacy=True)
elif style == "instance":
wf_dict = self._workflow_to_dict_instance(stored, workflow=workflow, legacy=False)
elif style == "run":
wf_dict = self._workflow_to_dict_run(trans, stored, workflow=workflow)
elif style == "format2":
wf_dict = self._workflow_to_dict_export(trans, stored, workflow=workflow)
wf_dict = to_format_2(wf_dict)
elif style == "format2_wrapped_yaml":
wf_dict = self._workflow_to_dict_export(trans, stored, workflow=workflow)
wf_dict = to_format_2(wf_dict, json_wrapper=True)
elif style == "ga":
wf_dict = self._workflow_to_dict_export(trans, stored, workflow=workflow)
else:
raise exceptions.RequestParameterInvalidException('Unknown workflow style [%s]' % style)
if version:
wf_dict['version'] = version
else:
wf_dict['version'] = len(stored.workflows) - 1
return wf_dict
def _sync_stored_workflow(self, trans, stored_workflow):
workflow_path = stored_workflow.from_path
workflow = stored_workflow.latest_workflow
with open(workflow_path, "w") as f:
if workflow_path.endswith(".ga"):
wf_dict = self._workflow_to_dict_export(trans, stored_workflow, workflow=workflow)
json.dump(wf_dict, f, indent=4)
else:
wf_dict = self._workflow_to_dict_export(trans, stored_workflow, workflow=workflow)
wf_dict = from_galaxy_native(wf_dict, None, json_wrapper=True)
f.write(wf_dict["yaml_content"])
def _workflow_to_dict_run(self, trans, stored, workflow):
"""
Builds workflow dictionary used by run workflow form
"""
if len(workflow.steps) == 0:
raise exceptions.MessageException('Workflow cannot be run because it does not have any steps.')
if attach_ordered_steps(workflow, workflow.steps):
raise exceptions.MessageException('Workflow cannot be run because it contains cycles.')
trans.workflow_building_mode = workflow_building_modes.USE_HISTORY
module_injector = WorkflowModuleInjector(trans)
has_upgrade_messages = False
step_version_changes = []
missing_tools = []
errors = {}
for step in workflow.steps:
try:
module_injector.inject(step, steps=workflow.steps, exact_tools=False)
except exceptions.ToolMissingException as e:
# FIXME: if a subworkflow lacks multiple tools we report only the first missing tool
if e.tool_id not in missing_tools:
missing_tools.append(e.tool_id)
continue
if step.upgrade_messages:
has_upgrade_messages = True
if step.type == 'tool' or step.type is None:
if step.module.version_changes:
step_version_changes.extend(step.module.version_changes)
step_errors = step.module.get_errors()
if step_errors:
errors[step.id] = step_errors
if missing_tools:
workflow.annotation = self.get_item_annotation_str(trans.sa_session, trans.user, workflow)
raise exceptions.MessageException('Following tools missing: %s' % ', '.join(missing_tools))
workflow.annotation = self.get_item_annotation_str(trans.sa_session, trans.user, workflow)
step_order_indices = {}
for step in workflow.steps:
step_order_indices[step.id] = step.order_index
step_models = []
for step in workflow.steps:
step_model = None
if step.type == 'tool':
incoming = {}
tool = trans.app.toolbox.get_tool(step.tool_id, tool_version=step.tool_version, tool_uuid=step.tool_uuid)
params_to_incoming(incoming, tool.inputs, step.state.inputs, trans.app)
step_model = tool.to_json(trans, incoming, workflow_building_mode=workflow_building_modes.USE_HISTORY)
step_model['post_job_actions'] = [{
'short_str' : ActionBox.get_short_str(pja),
'action_type' : pja.action_type,
'output_name' : pja.output_name,
'action_arguments' : pja.action_arguments
} for pja in step.post_job_actions]
else:
inputs = step.module.get_runtime_inputs(connections=step.output_connections)
step_model = {
'inputs' : [input.to_dict(trans) for input in inputs.values()]
}
step_model['replacement_parameters'] = step.module.get_replacement_parameters(step)
step_model['step_type'] = step.type
step_model['step_label'] = step.label