Skip to content
Snippets Groups Projects
Select Git revision
  • main
1 result

README.md

Blame
  • representation.py 22.46 KiB
    """ This module is responsible for converting between Galaxy's tool
    input description and the CWL description for a job json. """
    
    import collections
    import json
    import logging
    import os
    
    from six import string_types
    
    from galaxy.exceptions import RequestParameterInvalidException
    from galaxy.util import safe_makedirs, string_as_bool
    from galaxy.util.bunch import Bunch
    from .util import set_basename_and_derived_properties
    
    from galaxy.util.none_like import NoneDataset
    from galaxy.util.object_wrapper import SafeStringWrapper
    
    log = logging.getLogger(__name__)
    
    NOT_PRESENT = object()
    
    NO_GALAXY_INPUT = object()
    
    INPUT_TYPE = Bunch(
        DATA="data",
        INTEGER="integer",
        FLOAT="float",
        TEXT="text",
        BOOLEAN="boolean",
        SELECT="select",
        FIELD="field",
        CONDITIONAL="conditional",
        DATA_COLLECTON="data_collection",
    )
    
    # There are two approaches to mapping CWL tool state to Galaxy tool state
    # one is to map CWL types to compound Galaxy tool parameters combinations
    # with conditionals and the other is to use a new Galaxy parameter type that
    # allows unions, optional specifications, etc.... The problem with the former
    # is that it doesn't work with the workflow parameters for instance and is
    # very complex on the backend. The problem with the latter is that the GUI
    # for this parameter type is undefined curently.
    USE_FIELD_TYPES = True
    
    # There are two approaches to mapping CWL workflow inputs to Galaxy workflow
    # steps. The first is to simply map everything to expressions and stick them into
    # files and use data inputs - the second is to use parameter_input steps with
    # fields types. We are dispatching on USE_FIELD_TYPES for now - to choose but
    # may diverge later?
    # There are open issues with each approach:
    #  - Mapping everything to files makes the GUI harder to imagine but the backend
    #     easier to manage in someways.
    USE_STEP_PARAMETERS = USE_FIELD_TYPES
    
    TypeRepresentation = collections.namedtuple("TypeRepresentation", ["name", "galaxy_param_type", "label", "collection_type"])
    TYPE_REPRESENTATIONS = [
        TypeRepresentation("null", NO_GALAXY_INPUT, "no input", None),
        TypeRepresentation("integer", INPUT_TYPE.INTEGER, "an integer", None),
        TypeRepresentation("float", INPUT_TYPE.FLOAT, "a decimal number", None),
        TypeRepresentation("double", INPUT_TYPE.FLOAT, "a decimal number", None),
        TypeRepresentation("file", INPUT_TYPE.DATA, "a dataset", None),
        TypeRepresentation("directory", INPUT_TYPE.DATA, "a directory", None),
        TypeRepresentation("boolean", INPUT_TYPE.BOOLEAN, "a boolean", None),
        TypeRepresentation("text", INPUT_TYPE.TEXT, "a simple text field", None),
        TypeRepresentation("record", INPUT_TYPE.DATA_COLLECTON, "record as a dataset collection", "record"),
        TypeRepresentation("json", INPUT_TYPE.TEXT, "arbitrary JSON structure", None),
        TypeRepresentation("array", INPUT_TYPE.DATA_COLLECTON, "as a dataset list", "list"),
        TypeRepresentation("enum", INPUT_TYPE.TEXT, "enum value", None),  # TODO: make this a select...
        TypeRepresentation("field", INPUT_TYPE.FIELD, "arbitrary JSON structure", None),
    ]
    FIELD_TYPE_REPRESENTATION = TYPE_REPRESENTATIONS[-1]
    TypeRepresentation.uses_param = lambda self: self.galaxy_param_type is not NO_GALAXY_INPUT
    
    if not USE_FIELD_TYPES:
        CWL_TYPE_TO_REPRESENTATIONS = {
            "Any": ["integer", "float", "file", "boolean", "text", "record", "json"],
            "array": ["array"],
            "string": ["text"],
            "boolean": ["boolean"],
            "int": ["integer"],
            "float": ["float"],
            "File": ["file"],
            "Directory": ["directory"],
            "null": ["null"],
            "record": ["record"],
        }
    else:
        CWL_TYPE_TO_REPRESENTATIONS = {
            "Any": ["field"],
            "array": ["array"],
            "string": ["text"],
            "boolean": ["boolean"],
            "int": ["integer"],
            "float": ["float"],
            "File": ["file"],
            "Directory": ["directory"],
            "null": ["null"],
            "record": ["record"],
            "enum": ["enum"],
            "double": ["double"],
        }
    
    
    def type_representation_from_name(type_representation_name):
        for type_representation in TYPE_REPRESENTATIONS:
            if type_representation.name == type_representation_name:
                return type_representation
    
        assert False
    
    
    def type_descriptions_for_field_types(field_types):
        type_representation_names = set([])
        for field_type in field_types:
            if isinstance(field_type, dict) and field_type.get("type"):
                field_type = field_type.get("type")
    
            try:
                type_representation_names_for_field_type = CWL_TYPE_TO_REPRESENTATIONS.get(field_type)
            except TypeError:
                raise Exception("Failed to convert field_type %s" % field_type)
            if type_representation_names_for_field_type is None:
                raise Exception("Failed to convert type %s" % field_type)
            type_representation_names.update(type_representation_names_for_field_type)
        type_representations = []
        for type_representation in TYPE_REPRESENTATIONS:
            if type_representation.name in type_representation_names:
                type_representations.append(type_representation)
        return type_representations
    
    
    def dataset_wrapper_to_file_json(inputs_dir, dataset_wrapper):
        if dataset_wrapper.ext == "expression.json":
            with open(dataset_wrapper.file_name, "r") as f:
                return json.load(f)
    
        if dataset_wrapper.ext == "directory":
            return dataset_wrapper_to_directory_json(inputs_dir, dataset_wrapper)
    
        extra_files_path = dataset_wrapper.extra_files_path
        secondary_files_path = os.path.join(extra_files_path, "__secondary_files__")
        path = str(dataset_wrapper)
        raw_file_object = {"class": "File"}
    
        if os.path.exists(secondary_files_path):
            safe_makedirs(inputs_dir)
            name = os.path.basename(path)
            new_input_path = os.path.join(inputs_dir, name)
            os.symlink(path, new_input_path)
            secondary_files = []
            for secondary_file_name in os.listdir(secondary_files_path):
                secondary_file_path = os.path.join(secondary_files_path, secondary_file_name)
                target = os.path.join(inputs_dir, secondary_file_name)
                log.info("linking [%s] to [%s]" % (secondary_file_path, target))
                os.symlink(secondary_file_path, target)
                is_dir = os.path.isdir(os.path.realpath(secondary_file_path))
                secondary_files.append({"class": "File" if not is_dir else "Directory", "location": target})
    
            raw_file_object["secondaryFiles"] = secondary_files
            path = new_input_path
    
        raw_file_object["location"] = path
    
        if not isinstance(dataset_wrapper.unsanitized, NoneDataset):
            raw_file_object["size"] = int(dataset_wrapper.get_size())
    
        set_basename_and_derived_properties(raw_file_object, str(dataset_wrapper.created_from_basename or dataset_wrapper.name))
        return raw_file_object
    
    
    def dataset_wrapper_to_directory_json(inputs_dir, dataset_wrapper):
        assert dataset_wrapper.ext == "directory"
    
        # get directory name
        archive_name = str(dataset_wrapper.created_from_basename or dataset_wrapper.name)
        nameroot, nameext = os.path.splitext(archive_name)
        directory_name = nameroot # assume archive file name contains the directory name
    
        # get archive location
        #
        # note
        #   when user uploads a tar file with 'directory' type,
        #   tar file location ends up in dataset_wrapper.unsanitized.file_name
        #
        try:
            archive_location = dataset_wrapper.unsanitized.file_name
        except:
            archive_location = None
        
        directory_json = {"location": dataset_wrapper.extra_files_path,
                          "class": "Directory",
                          "name": directory_name,
                          "archive_location": archive_location,
                          "archive_nameext": nameext,
                          "archive_nameroot": nameroot}
    
        return directory_json
    
    
    def collection_wrapper_to_array(inputs_dir, wrapped_value):
        rval = []
        for value in wrapped_value:
            rval.append(dataset_wrapper_to_file_json(inputs_dir, value))
        return rval
    
    
    def collection_wrapper_to_record(inputs_dir, wrapped_value):
        rval = dict()  # TODO: THIS NEEDS TO BE ORDERED BUT odict not json serializable!
        for key, value in wrapped_value.items():
            rval[key] = dataset_wrapper_to_file_json(inputs_dir, value)
        return rval
    
    
    def galactic_flavored_to_cwl_job(tool, param_dict, local_working_directory):
    
        def simple_value(input, param_dict_value, type_representation_name=None):
            type_representation = type_representation_from_name(type_representation_name)
            # Hmm... cwl_type isn't really the cwl type in every case,
            # like in the case of json for instance.
    
            if type_representation.galaxy_param_type == NO_GALAXY_INPUT:
                assert param_dict_value is None
                return None
    
            if type_representation.name == "file":
                dataset_wrapper = param_dict_value
                return dataset_wrapper_to_file_json(inputs_dir, dataset_wrapper)
            elif type_representation.name == "directory":
                dataset_wrapper = param_dict_value
                return dataset_wrapper_to_directory_json(inputs_dir, dataset_wrapper)
            elif type_representation.name == "integer":
                return int(str(param_dict_value))
            elif type_representation.name == "long":
                return int(str(param_dict_value))
            elif type_representation.name in ["float", "double"]:
                return float(str(param_dict_value))
            elif type_representation.name == "boolean":
                return string_as_bool(param_dict_value)
            elif type_representation.name == "text":
                return str(param_dict_value)
            elif type_representation.name == "enum":
                return input.from_json(param_dict_value, trans=None, other_values={})
            elif type_representation.name == "json":
                raw_value = param_dict_value.value
                return json.loads(raw_value)
            elif type_representation.name == "field":
                if param_dict_value is None:
                    return None
                if hasattr(param_dict_value, "value"):
                    # Is InputValueWrapper
                    return param_dict_value.value
                elif not param_dict_value.is_collection:
                    # Is DatasetFilenameWrapper
                    return dataset_wrapper_to_file_json(inputs_dir, param_dict_value)
                else:
                    # Is DatasetCollectionWrapper
                    hdca_wrapper = param_dict_value
                    if hdca_wrapper.collection_type == "list":
                        # TODO: generalize to lists of lists and lists of non-files...
                        return collection_wrapper_to_array(inputs_dir, hdca_wrapper)
                    elif hdca_wrapper.collection_type.collection_type == "record":
                        return collection_wrapper_to_record(inputs_dir, hdca_wrapper)
    
            elif type_representation.name == "array":
                # TODO: generalize to lists of lists and lists of non-files...
                return collection_wrapper_to_array(inputs_dir, param_dict_value)
            elif type_representation.name == "record":
                return collection_wrapper_to_record(inputs_dir, param_dict_value)
            else:
                return str(param_dict_value)
    
    
        inputs_dir = os.path.join(local_working_directory, "_inputs")
    
        inputs = {}
    
        # TODO: walk tree
        for input_name, input_param in tool.inputs.items():
            if input_param.type == "data":
                # Probably need to be passing in the wrappers and using them - this seems to be
                # an HDA.
                map_to = input_param.map_to
                inputs_at_depth = inputs
                if map_to:
    
                    while "/" in map_to:
                        first, map_to = map_to.split("/", 1)
                        if first not in inputs_at_depth:
                            inputs_at_depth[first] = {}
                        inputs_at_depth = inputs_at_depth[first]
                else:
                    map_to = input_param.name
                inputs_at_depth[map_to] = dataset_wrapper_to_file_json(inputs_dir, param_dict[input_name])
            else:
                matched_field = None
                for field in tool._cwl_tool_proxy.input_fields():
                    if field["name"] == input_name: # CWL <=> Galaxy 
                        matched_field = field
                field_type = field_to_field_type(matched_field)
                if isinstance(field_type, list):
                    assert USE_FIELD_TYPES
                    type_descriptions = [FIELD_TYPE_REPRESENTATION]
                else:
                    type_descriptions = type_descriptions_for_field_types([field_type])
                assert len(type_descriptions) == 1
                type_description_name = type_descriptions[0].name
    
                inputs[input_name] = simple_value(input_param, param_dict[input_name], type_description_name)
    
        log.info("job inputs is %s" % inputs)
        return inputs
    
    
    def to_cwl_job(tool, param_dict, local_working_directory):
        """ tool is Galaxy's representation of the tool and param_dict is the
        parameter dictionary with wrapped values.
        """
        tool_proxy = tool._cwl_tool_proxy
        input_fields = tool_proxy.input_fields()
        inputs = tool.inputs
        input_json = {}
    
        inputs_dir = os.path.join(local_working_directory, "_inputs")
    
        def simple_value(input, param_dict_value, type_representation_name=None):
            type_representation = type_representation_from_name(type_representation_name)
            # Hmm... cwl_type isn't really the cwl type in every case,
            # like in the case of json for instance.
    
            if type_representation.galaxy_param_type == NO_GALAXY_INPUT:
                assert param_dict_value is None
                return None
    
            if type_representation.name == "file":
                dataset_wrapper = param_dict_value
                return dataset_wrapper_to_file_json(inputs_dir, dataset_wrapper)
            elif type_representation.name == "directory":
                dataset_wrapper = param_dict_value
                return dataset_wrapper_to_directory_json(inputs_dir, dataset_wrapper)
            elif type_representation.name == "integer":
                return int(str(param_dict_value))
            elif type_representation.name == "long":
                return int(str(param_dict_value))
            elif type_representation.name in ["float", "double"]:
                return float(str(param_dict_value))
            elif type_representation.name == "boolean":
                return string_as_bool(param_dict_value)
            elif type_representation.name == "text":
                return str(param_dict_value)
            elif type_representation.name == "enum":
                return str(param_dict_value)
            elif type_representation.name == "json":
                raw_value = param_dict_value.value
                return json.loads(raw_value)
            elif type_representation.name == "field":
                if param_dict_value is None:
                    return None
                if hasattr(param_dict_value, "value"):
                    # Is InputValueWrapper
                    rval = param_dict_value.value
                    if isinstance(rval, dict) and "src" in rval and rval["src"] == "json":
                        # needed for wf_step_connect_undeclared_param, so non-file defaults?
                        return rval["value"]
                    return rval
                elif not param_dict_value.is_collection:
                    # Is DatasetFilenameWrapper
                    return dataset_wrapper_to_file_json(inputs_dir, param_dict_value)
                else:
                    # Is DatasetCollectionWrapper
                    hdca_wrapper = param_dict_value
                    if hdca_wrapper.collection_type == "list":
                        # TODO: generalize to lists of lists and lists of non-files...
                        return collection_wrapper_to_array(inputs_dir, hdca_wrapper)
                    elif hdca_wrapper.collection_type.collection_type == "record":
                        return collection_wrapper_to_record(inputs_dir, hdca_wrapper)
    
            elif type_representation.name == "array":
                # TODO: generalize to lists of lists and lists of non-files...
                return collection_wrapper_to_array(inputs_dir, param_dict_value)
            elif type_representation.name == "record":
                return collection_wrapper_to_record(inputs_dir, param_dict_value)
            else:
                return str(param_dict_value)
    
        for input_name, input in inputs.items():
            if input.type == "repeat":
                only_input = next(iter(input.inputs.values()))
                array_value = []
                for instance in param_dict[input_name]:
                    array_value.append(simple_value(only_input, instance[input_name[:-len("_repeat")]]))
                input_json[input_name[:-len("_repeat")]] = array_value
            elif input.type == "conditional":
                assert input_name in param_dict, "No value for %s in %s" % (input_name, param_dict)
                current_case = param_dict[input_name]["_cwl__type_"]
                if str(current_case) != "null":  # str because it is a wrapped...
                    case_index = input.get_current_case(current_case)
                    case_input = input.cases[case_index].inputs["_cwl__value_"]
                    case_value = param_dict[input_name]["_cwl__value_"]
                    input_json[input_name] = simple_value(case_input, case_value, current_case)
            else:
                matched_field = None
                for field in input_fields:
                    if field["name"] == input_name:
                        matched_field = field
                field_type = field_to_field_type(matched_field)
                if isinstance(field_type, list):
                    assert USE_FIELD_TYPES
                    type_descriptions = [FIELD_TYPE_REPRESENTATION]
                else:
                    type_descriptions = type_descriptions_for_field_types([field_type])
                assert len(type_descriptions) == 1
                type_description_name = type_descriptions[0].name
                input_json[input_name] = simple_value(input, param_dict[input_name], type_description_name)
    
        log.debug("Galaxy Tool State is CWL State is %s" % input_json)
        return input_json
    
    
    def to_galaxy_parameters(tool, as_dict):
        """ Tool is Galaxy's representation of the tool and as_dict is a Galaxified
        representation of the input json (no paths, HDA references for instance).
        """
        inputs = tool.inputs
        galaxy_request = {}
    
        def from_simple_value(input, param_dict_value, type_representation_name=None):
            if type_representation_name == "json":
                return json.dumps(param_dict_value)
            else:
                return param_dict_value
    
        for input_name, input in inputs.items():
            as_dict_value = as_dict.get(input_name, NOT_PRESENT)
            galaxy_input_type = input.type
    
            if galaxy_input_type == "repeat":
                if input_name not in as_dict:
                    continue
    
                only_input = next(iter(input.inputs.values()))
                for index, value in enumerate(as_dict_value):
                    key = "%s_repeat_0|%s" % (input_name, only_input.name)
                    galaxy_value = from_simple_value(only_input, value)
                    galaxy_request[key] = galaxy_value
            elif galaxy_input_type == "conditional":
                case_strings = input.case_strings
                # TODO: less crazy handling of defaults...
                if (as_dict_value is NOT_PRESENT or as_dict_value is None) and "null" in case_strings:
                    type_representation_name = "null"
                elif (as_dict_value is NOT_PRESENT or as_dict_value is None):
                    raise RequestParameterInvalidException(
                        "Cannot translate CWL datatype - value [%s] of type [%s] with case_strings [%s]. Non-null property must be set." % (
                            as_dict_value, type(as_dict_value), case_strings
                        )
                    )
                elif isinstance(as_dict_value, bool) and "boolean" in case_strings:
                    type_representation_name = "boolean"
                elif isinstance(as_dict_value, int) and "integer" in case_strings:
                    type_representation_name = "integer"
                elif isinstance(as_dict_value, int) and "long" in case_strings:
                    type_representation_name = "long"
                elif isinstance(as_dict_value, (int, float)) and "float" in case_strings:
                    type_representation_name = "float"
                elif isinstance(as_dict_value, (int, float)) and "double" in case_strings:
                    type_representation_name = "double"
                elif isinstance(as_dict_value, string_types) and "string" in case_strings:
                    type_representation_name = "string"
                elif isinstance(as_dict_value, dict) and "src" in as_dict_value and "id" in as_dict_value and "file" in case_strings:
                    type_representation_name = "file"
                elif isinstance(as_dict_value, dict) and "src" in as_dict_value and "id" in as_dict_value and "directory" in case_strings:
                    # TODO: can't disambiuate with above if both are available...
                    type_representation_name = "directory"
                elif "field" in case_strings:
                    type_representation_name = "field"
                elif "json" in case_strings and as_dict_value is not None:
                    type_representation_name = "json"
                else:
                    raise RequestParameterInvalidException(
                        "Cannot translate CWL datatype - value [%s] of type [%s] with case_strings [%s]." % (
                            as_dict_value, type(as_dict_value), case_strings
                        )
                    )
                galaxy_request["%s|_cwl__type_" % input_name] = type_representation_name
                if type_representation_name != "null":
                    current_case_index = input.get_current_case(type_representation_name)
                    current_case_inputs = input.cases[current_case_index].inputs
                    current_case_input = current_case_inputs["_cwl__value_"]
                    galaxy_value = from_simple_value(current_case_input, as_dict_value, type_representation_name)
                    galaxy_request["%s|_cwl__value_" % input_name] = galaxy_value
            elif as_dict_value is NOT_PRESENT:
                continue
            else:
                galaxy_value = from_simple_value(input, as_dict_value)
                galaxy_request[input_name] = galaxy_value
    
        log.info("Converted galaxy_request is %s" % galaxy_request)
        return galaxy_request
    
    
    def field_to_field_type(field):
        field_type = field["type"]
        if isinstance(field_type, dict):
            field_type = field_type["type"]
        if isinstance(field_type, list):
            field_type_length = len(field_type)
            if field_type_length == 0:
                raise Exception("Zero-length type list encountered, invalid CWL?")
            elif len(field_type) == 1:
                field_type = field_type[0]
    
        return field_type