diff --git a/backend/app/app/api/endpoints/catalogs.py b/backend/app/app/api/endpoints/catalogs.py index c3bfe9812e709e5a22571562eb4b4f3c25855fe1..089e73bc01af5fc0b4224e12eca025a45fea1ed4 100644 --- a/backend/app/app/api/endpoints/catalogs.py +++ b/backend/app/app/api/endpoints/catalogs.py @@ -45,9 +45,9 @@ async def create_catalog( return catalog -@router.put("/", response_model=CatalogUpdate) +@router.put("/", response_model=CatalogRead) async def update_catalog( - catalog: CatalogCreate, session: Session = Depends(get_session) + catalog: CatalogUpdate, session: Session = Depends(get_session) ): use_case = CrudCatalogUseCase() try: diff --git a/backend/app/app/api/endpoints/keggs.py b/backend/app/app/api/endpoints/keggs.py index eceda53cf75cad826c103e3bed266b0ea91e6bd7..b8de742e89d7829d70d5f7db8591f8f247488f52 100644 --- a/backend/app/app/api/endpoints/keggs.py +++ b/backend/app/app/api/endpoints/keggs.py @@ -41,13 +41,13 @@ async def create_kegg(kegg: KeggCreate, session: Session = Depends(get_session)) return kegg -@router.put("/", response_model=KeggUpdate) -async def update_kegg(kegg: KeggCreate, session: Session = Depends(get_session)): +@router.put("/", response_model=KeggRead) +async def update_kegg(kegg: KeggUpdate, session: Session = Depends(get_session)): use_case = CrudKeggUseCase() try: kegg = use_case.update_kegg(kegg, session=session) except NoResultFound: - raise HTTPException(status_code=404, detail=f"Kegg [{kegg.name}] not found.") + raise HTTPException(status_code=404, detail=f"Kegg [{kegg.kegg_id}] not found.") return kegg diff --git a/backend/app/app/api/endpoints/ncbi_taxonomy.py b/backend/app/app/api/endpoints/ncbi_taxonomy.py new file mode 100644 index 0000000000000000000000000000000000000000..a8c2c6481888ad30036dbfd41d084631156d7fc6 --- /dev/null +++ b/backend/app/app/api/endpoints/ncbi_taxonomy.py @@ -0,0 +1,82 @@ +from typing import List + +from fastapi import Depends, APIRouter, HTTPException +from sqlalchemy.exc import NoResultFound, IntegrityError +from sqlmodel import Session + +from app.db import get_session +from app.core.schemas.entities.ncbi_taxonomy import ( + NcbiTaxonomyRead, + NcbiTaxonomyUpdate, + NcbiTaxonomyCreate, +) +from app.core.use_cases.crud.ncbi_taxonomy import CrudNcbiTaxonomyUseCase + + +router = APIRouter() + + +@router.get("/", response_model=List[NcbiTaxonomyRead]) +async def get_ncbi_taxonomy_entries(session: Session = Depends(get_session)): + use_case = CrudNcbiTaxonomyUseCase() + ncbi_taxonomys = use_case.get_all(session=session) + return ncbi_taxonomys + + +@router.get("/{tax_id}", response_model=NcbiTaxonomyRead) +async def get_ncbi_taxonomy(tax_id: str, session: Session = Depends(get_session)): + use_case = CrudNcbiTaxonomyUseCase() + try: + ncbi_taxonomy = use_case.get_ncbi_taxonomy(tax_id, session=session) + except NoResultFound: + raise HTTPException( + status_code=404, detail=f"NcbiTaxonomy [{tax_id}] not found." + ) + return ncbi_taxonomy + + +@router.post("/", response_model=NcbiTaxonomyRead) +async def create_ncbi_taxonomy( + ncbi_taxonomy: NcbiTaxonomyCreate, session: Session = Depends(get_session) +): + use_case = CrudNcbiTaxonomyUseCase() + try: + ncbi_taxonomy = use_case.create_ncbi_taxonomy(ncbi_taxonomy, session=session) + except IntegrityError: + raise HTTPException( + status_code=403, + detail=f"NcbiTaxonomy [{ncbi_taxonomy.name}] already exists.", + ) + return ncbi_taxonomy + + +@router.put("/", response_model=NcbiTaxonomyRead) +async def update_ncbi_taxonomy( + ncbi_taxonomy: NcbiTaxonomyUpdate, + exclude_none: bool = True, + session: Session = Depends(get_session), +): + use_case = CrudNcbiTaxonomyUseCase() + try: + ncbi_taxonomy = use_case.update_ncbi_taxonomy( + ncbi_taxonomy, session=session, exclude_none=exclude_none + ) + except NoResultFound: + raise HTTPException( + status_code=404, detail=f"NcbiTaxonomy [{ncbi_taxonomy.tax_id}] not found." + ) + except IntegrityError: + raise HTTPException(status_code=422, detail="Missing data in input data.") + return ncbi_taxonomy + + +@router.delete("/{tax_id}") +async def delete_ncbi_taxonomy(tax_id: str, session: Session = Depends(get_session)): + use_case = CrudNcbiTaxonomyUseCase() + try: + use_case.delete_ncbi_taxonomy(tax_id, session=session) + except NoResultFound: + raise HTTPException( + status_code=404, detail=f"NcbiTaxonomy [{tax_id}] not found." + ) + return {"deleted": True} diff --git a/backend/app/app/api/router.py b/backend/app/app/api/router.py index 2f7a5a750dd414c10b8687beeec7be58941c165f..562b7ab4689181b488a0a5603ba86440a84c4437 100644 --- a/backend/app/app/api/router.py +++ b/backend/app/app/api/router.py @@ -1,6 +1,6 @@ from fastapi import APIRouter -from app.api.endpoints import catalogs, experiments, genes, keggs +from app.api.endpoints import catalogs, experiments, genes, keggs, ncbi_taxonomy api_router = APIRouter() api_router.include_router(genes.router, prefix="/genes", tags=["Genes"]) @@ -8,4 +8,7 @@ api_router.include_router(catalogs.router, prefix="/catalogs", tags=["Catalogs"] api_router.include_router( experiments.router, prefix="/experiments", tags=["Experiments"] ) -api_router.include_router(keggs.router, prefix="/keggs", tags=["Keggs"]) +api_router.include_router(keggs.router, prefix="/keggs", tags=["Kegg"]) +api_router.include_router( + ncbi_taxonomy.router, prefix="/ncbi_taxonomy", tags=["NCBI Taxonomy"] +) diff --git a/backend/app/app/core/models/ncbi_taxonomy.py b/backend/app/app/core/models/ncbi_taxonomy.py index 4729894071ff19c2820d72b1add8c64b4b1b89f5..e998d1908d3cfc7cade51eac8deffc2156e285c7 100644 --- a/backend/app/app/core/models/ncbi_taxonomy.py +++ b/backend/app/app/core/models/ncbi_taxonomy.py @@ -1,11 +1,11 @@ from typing import List, Optional import slugify -from sqlmodel import Field, Relationship, Column, JSON, SQLModel +from sqlmodel import Field, Relationship, Column, JSON, SQLModel, INTEGER class NcbiTaxonomyBase(SQLModel): - tax_id: int + tax_id: int = Field(sa_column=Column("tax_id", INTEGER, unique=True)) name: str = Field(index=False) rank: str diff --git a/backend/app/app/core/repositories/catalog.py b/backend/app/app/core/repositories/catalog.py index 4a1fbc908f47cfcee51b715d29b4c1a12004fc23..f0a01ea39377b8754abc1b8c34bfc226f28aad64 100644 --- a/backend/app/app/core/repositories/catalog.py +++ b/backend/app/app/core/repositories/catalog.py @@ -48,10 +48,12 @@ class SqlModelCatalogsRepo(CatalogsRepo): session.refresh(catalog) return catalog - def update(self, catalog_input: CatalogUpdate, session: Session) -> Catalog: + def update( + self, catalog_input: CatalogUpdate, session: Session, exclude_none: bool = True + ) -> Catalog: """Update a catalog entry.""" catalog = self.get(catalog_input.name, session) - for k, v in catalog_input.dict().items(): + for k, v in catalog_input.dict(exclude_none=exclude_none).items(): setattr(catalog, k, v) session.add(catalog) session.commit() diff --git a/backend/app/app/core/repositories/kegg.py b/backend/app/app/core/repositories/kegg.py index c480d167bde4b9682ed37fc6e05fcaae26ceb15e..e9168cd311b6fc4f2a4932830d4ef01a70f73b01 100644 --- a/backend/app/app/core/repositories/kegg.py +++ b/backend/app/app/core/repositories/kegg.py @@ -48,10 +48,12 @@ class SqlModelKeggsRepo(KeggsRepo): session.refresh(kegg) return kegg - def update(self, kegg_input: KeggUpdate, session: Session) -> Kegg: + def update( + self, kegg_input: KeggUpdate, session: Session, exclude_none: bool = True + ) -> Kegg: """Update a kegg entry.""" kegg = self.get(kegg_input.kegg_id, session) - for k, v in kegg_input.dict().items(): + for k, v in kegg_input.dict(exclude_none=exclude_none).items(): setattr(kegg, k, v) session.add(kegg) session.commit() diff --git a/backend/app/app/core/repositories/ncbi_taxonomy.py b/backend/app/app/core/repositories/ncbi_taxonomy.py new file mode 100644 index 0000000000000000000000000000000000000000..8c4afe7256041113b2df684d503ea111a8ddcdf1 --- /dev/null +++ b/backend/app/app/core/repositories/ncbi_taxonomy.py @@ -0,0 +1,78 @@ +import abc +from typing import List + +from sqlmodel import Session, select + +from app.core.models.ncbi_taxonomy import NcbiTaxonomy +from app.core.schemas.entities.ncbi_taxonomy import ( + NcbiTaxonomyCreate, + NcbiTaxonomyUpdate, +) + + +class NcbiTaxonomysRepo(abc.ABC): + @abc.abstractmethod + def get(self, ncbi_taxonomy_name: str) -> NcbiTaxonomy: + raise NotImplementedError + + @abc.abstractmethod + def get_all(self) -> List[NcbiTaxonomy]: + raise NotImplementedError + + @abc.abstractmethod + def create(self, ncbi_taxonomy_input: NcbiTaxonomyCreate) -> NcbiTaxonomy: + raise NotImplementedError + + @abc.abstractmethod + def update(self, ncbi_taxonomy_input: NcbiTaxonomyUpdate) -> NcbiTaxonomy: + raise NotImplementedError + + @abc.abstractmethod + def delete(self, ncbi_taxonomy_name: str): + raise NotImplementedError + + +class SqlModelNcbiTaxonomysRepo(NcbiTaxonomysRepo): + def get(self, tax_id, session: Session) -> NcbiTaxonomy: + """Retrieve one ncbi_taxonomy from name.""" + statement = select(NcbiTaxonomy).where(NcbiTaxonomy.tax_id == tax_id) + return session.exec(statement).one() + + def get_all(self, session: Session) -> List[NcbiTaxonomy]: + """Retrieve all ncbi_taxonomys.""" + ncbi_taxonomys = session.exec(select(NcbiTaxonomy)).all() + return ncbi_taxonomys + + def create( + self, ncbi_taxonomy_input: NcbiTaxonomyCreate, session: Session + ) -> NcbiTaxonomy: + """Create a ncbi_taxonomy entry.""" + ncbi_taxonomy = NcbiTaxonomy( + tax_id=ncbi_taxonomy_input.tax_id, + rank=ncbi_taxonomy_input.rank, + name=ncbi_taxonomy_input.name, + ) + session.add(ncbi_taxonomy) + session.commit() + session.refresh(ncbi_taxonomy) + return ncbi_taxonomy + + def update( + self, + ncbi_taxonomy_input: NcbiTaxonomyUpdate, + session: Session, + exclude_none: bool = True, + ) -> NcbiTaxonomy: + """Update a ncbi_taxonomy entry.""" + ncbi_taxonomy = self.get(ncbi_taxonomy_input.tax_id, session) + for k, v in ncbi_taxonomy_input.dict(exclude_none=exclude_none).items(): + setattr(ncbi_taxonomy, k, v) + session.add(ncbi_taxonomy) + session.commit() + session.refresh(ncbi_taxonomy) + return ncbi_taxonomy + + def delete(self, tax_id: str, session: Session): + ncbi_taxonomy = self.get(tax_id, session) + session.delete(ncbi_taxonomy) + session.commit() diff --git a/backend/app/app/core/schemas/entities/ncbi_taxonomy.py b/backend/app/app/core/schemas/entities/ncbi_taxonomy.py index ff8ac2f3e206f0bd8e3bf2f02f90b5ed28835779..8dc8e5a7e3c178ba4310fc49d59055720d73702f 100644 --- a/backend/app/app/core/schemas/entities/ncbi_taxonomy.py +++ b/backend/app/app/core/schemas/entities/ncbi_taxonomy.py @@ -1,3 +1,7 @@ +from typing import Optional + +from sqlmodel import SQLModel + from app.core.models.ncbi_taxonomy import NcbiTaxonomyBase @@ -7,3 +11,9 @@ class NcbiTaxonomyRead(NcbiTaxonomyBase): class NcbiTaxonomyCreate(NcbiTaxonomyBase): pass + + +class NcbiTaxonomyUpdate(SQLModel): + tax_id: int + name: Optional[str] + rank: Optional[str] diff --git a/backend/app/app/core/use_cases/crud/ncbi_taxonomy.py b/backend/app/app/core/use_cases/crud/ncbi_taxonomy.py new file mode 100644 index 0000000000000000000000000000000000000000..bf4d4fd21e922338228073530c4b23e246479864 --- /dev/null +++ b/backend/app/app/core/use_cases/crud/ncbi_taxonomy.py @@ -0,0 +1,35 @@ +from typing import List + +import inject + +from app.core.models.ncbi_taxonomy import NcbiTaxonomy +from app.core.schemas.entities.ncbi_taxonomy import ( + NcbiTaxonomyCreate, + NcbiTaxonomyUpdate, +) +from app.core.repositories.ncbi_taxonomy import NcbiTaxonomysRepo + + +class CrudNcbiTaxonomyUseCase: + @inject.autoparams("ncbi_taxonomys_repo") + def __init__(self, ncbi_taxonomys_repo: NcbiTaxonomysRepo): + self._ncbi_taxonomys_repo = ncbi_taxonomys_repo + + def create_ncbi_taxonomy( + self, ncbi_taxonomy_input: NcbiTaxonomyCreate, **kwargs + ) -> NcbiTaxonomy: + return self._ncbi_taxonomys_repo.create(ncbi_taxonomy_input, **kwargs) + + def update_ncbi_taxonomy( + self, ncbi_taxonomy_input: NcbiTaxonomyUpdate, **kwargs + ) -> NcbiTaxonomy: + return self._ncbi_taxonomys_repo.update(ncbi_taxonomy_input, **kwargs) + + def delete_ncbi_taxonomy(self, ncbi_taxonomy_name: str, **kwargs): + return self._ncbi_taxonomys_repo.delete(ncbi_taxonomy_name, **kwargs) + + def get_ncbi_taxonomy(self, ncbi_taxonomy_id: str, **kwargs) -> NcbiTaxonomy: + return self._ncbi_taxonomys_repo.get(ncbi_taxonomy_id, **kwargs) + + def get_all(self, **kwargs) -> List[NcbiTaxonomy]: + return self._ncbi_taxonomys_repo.get_all(**kwargs) diff --git a/backend/app/app/dependency_injections.py b/backend/app/app/dependency_injections.py index 10f4c827a80191219e29a1854c1555741765190f..5a67a034c8884feba5ebf51a9195c30d4ec5dfac 100644 --- a/backend/app/app/dependency_injections.py +++ b/backend/app/app/dependency_injections.py @@ -2,11 +2,16 @@ import inject from app.core.repositories.catalog import CatalogsRepo, SqlModelCatalogsRepo from app.core.repositories.kegg import KeggsRepo, SqlModelKeggsRepo +from app.core.repositories.ncbi_taxonomy import ( + NcbiTaxonomysRepo, + SqlModelNcbiTaxonomysRepo, +) def di_configuration(binder): binder.bind(CatalogsRepo, SqlModelCatalogsRepo()) binder.bind(KeggsRepo, SqlModelKeggsRepo()) + binder.bind(NcbiTaxonomysRepo, SqlModelNcbiTaxonomysRepo()) def run_di(): diff --git a/backend/app/tests/api/endpoints/test_catalogs.py b/backend/app/tests/api/endpoints/test_catalogs.py index 98840a5da2162920814e9e52a82d31b9da7f9f1c..a1aa79f7145e3e699378a74074d5406adc90ac4f 100644 --- a/backend/app/tests/api/endpoints/test_catalogs.py +++ b/backend/app/tests/api/endpoints/test_catalogs.py @@ -5,12 +5,6 @@ from app.core.models.catalog import Catalog from .base_api_test import BaseApiTests -def create_test_db(session: Session): - test_catalog = Catalog(name="test-catalog") - session.add(test_catalog) - session.commit() - - class TestCatalogs(BaseApiTests): @classmethod def _create_test_db(cls, session: Session): diff --git a/backend/app/tests/api/endpoints/test_keggs.py b/backend/app/tests/api/endpoints/test_keggs.py index 2b2ca84066e2f3f88f29126ff3e8e8faf5640a01..de41fafd57a82418d79e38e6e1a8ced61161599b 100644 --- a/backend/app/tests/api/endpoints/test_keggs.py +++ b/backend/app/tests/api/endpoints/test_keggs.py @@ -5,12 +5,6 @@ from app.core.models.kegg import Kegg from .base_api_test import BaseApiTests -def create_test_db(session: Session): - test_kegg = Kegg(name="test-kegg") - session.add(test_kegg) - session.commit() - - class TestKeggs(BaseApiTests): @classmethod def _create_test_db(cls, session: Session): @@ -55,8 +49,8 @@ class TestKeggs(BaseApiTests): self.assertEqual(response.status_code, 200) self.assertDictEqual(response.json(), expected_data) # Finally delete item - cat = self.session.exec(select(Kegg).where(Kegg.kegg_id == kegg_id)).one() - self.session.delete(cat) + kegg = self.session.exec(select(Kegg).where(Kegg.kegg_id == kegg_id)).one() + self.session.delete(kegg) self.session.commit() def test_create_existing_kegg(self): diff --git a/backend/app/tests/api/endpoints/test_ncbi_taxonomy.py b/backend/app/tests/api/endpoints/test_ncbi_taxonomy.py new file mode 100644 index 0000000000000000000000000000000000000000..8bc7356a2954b6ed02db8fb9f7db8a878fc41c37 --- /dev/null +++ b/backend/app/tests/api/endpoints/test_ncbi_taxonomy.py @@ -0,0 +1,161 @@ +from sqlmodel import Session, select + +from app.core.models.ncbi_taxonomy import NcbiTaxonomy + +from .base_api_test import BaseApiTests + + +class TestNcbiTaxonomys(BaseApiTests): + @classmethod + def _create_test_db(cls, session: Session): + test_ncbi_taxonomy = NcbiTaxonomy( + tax_id=1, name="test-ncbi_taxonomy", rank="species" + ) + session.add(test_ncbi_taxonomy) + session.commit() + + def test_get_ncbi_taxonomy_non_existing(self): + # When + response = self.client.get("/api/ncbi_taxonomy/non-existing") + # Then + self.assertEqual(response.status_code, 404) + + def test_get_ncbi_taxonomy_existing(self): + # Given + expected_id = 1 + expected_data = { + "tax_id": expected_id, + "name": "test-ncbi_taxonomy", + "rank": "species", + } + # When + response = self.client.get(f"/api/ncbi_taxonomy/{expected_id}") + # Then + self.assertEqual(response.status_code, 200) + self.assertDictEqual(response.json(), expected_data) + + def test_get_all_ncbi_taxonomys(self): + # Given + expected_data = [{"tax_id": 1, "name": "test-ncbi_taxonomy", "rank": "species"}] + # When + response = self.client.get(f"/api/ncbi_taxonomy/") + # Then + self.assertEqual(response.status_code, 200) + self.assertListEqual(response.json(), expected_data) + + def test_create_ncbi_taxonomy(self): + # Given + tax_id = 2 + ncbi_taxonomy_name = "created_ncbi_taxonomy" + rank_created = "species" + json_input = { + "tax_id": tax_id, + "name": ncbi_taxonomy_name, + "rank": rank_created, + } + expected_data = { + "tax_id": tax_id, + "name": ncbi_taxonomy_name, + "rank": rank_created, + } + # When + response = self.client.post("/api/ncbi_taxonomy/", json=json_input) + # Then + self.assertEqual(response.status_code, 200) + self.assertDictEqual(response.json(), expected_data) + # Finally delete item + ncbi_tax = self.session.exec( + select(NcbiTaxonomy).where(NcbiTaxonomy.tax_id == tax_id) + ).one() + self.session.delete(ncbi_tax) + self.session.commit() + + def test_create_existing_ncbi_taxonomy(self): + # Given + tax_id = 1 + ncbi_taxonomy_name = "test-ncbi_taxonomy" + json_input = {"tax_id": tax_id, "name": ncbi_taxonomy_name, "rank": "species"} + # When + response = self.client.post("/api/ncbi_taxonomy/", json=json_input) + # Then + self.assertEqual(response.status_code, 403) + + def test_delete_ncbi_taxonomy_non_existing(self): + # When + response = self.client.delete(f"/api/ncbi_taxonomy/35") + # Then + self.assertEqual(response.status_code, 404) + + def test_delete_ncbi_taxonomy(self): + # Given + tax_id = 4 + name_to_delete = "delete-me" + # - create ncbi_taxonomy to delete + ncbi_taxonomy_to_delete = NcbiTaxonomy( + tax_id=tax_id, name=name_to_delete, rank="species" + ) + self.session.add(ncbi_taxonomy_to_delete) + self.session.commit() + # When + response = self.client.delete(f"/api/ncbi_taxonomy/{tax_id}") + # Then + self.assertEqual(response.status_code, 200) + + def test_update_ncbi_taxonomy_non_existing_no_rank_exclude_none_true(self): + # When + json_input = {"tax_id": 29, "name": "12345"} + response = self.client.put( + f"/api/ncbi_taxonomy/?exclude_none=true", json=json_input + ) + # Then + self.assertEqual(response.status_code, 404) + + def test_update_ncbi_taxonomy_no_rank_exclude_none_true(self): + # Given + tax_id = 56 + name_to_update = "update-me" + new_name = "new-name" + rank = "species" + # - create ncbi_taxonomy to update + ncbi_taxonomy_to_update = NcbiTaxonomy( + tax_id=tax_id, name=name_to_update, rank=rank + ) + self.session.add(ncbi_taxonomy_to_update) + self.session.commit() + json_input = {"tax_id": tax_id, "name": new_name} + expected_data = {"tax_id": tax_id, "name": new_name, "rank": rank} + # When + response = self.client.put( + f"/api/ncbi_taxonomy/?exclude_none=true", json=json_input + ) + # Then + self.assertEqual(response.status_code, 200) + self.assertDictEqual(response.json(), expected_data) + # Finally + self.session.delete(ncbi_taxonomy_to_update) + self.session.commit() + + def test_update_ncbi_taxonomy_no_rank_exclude_none_false(self): + # Given + tax_id = 56 + name_to_update = "update-me" + new_name = "new-name" + rank = "species" + # - create ncbi_taxonomy to update + ncbi_taxonomy_to_update = NcbiTaxonomy( + tax_id=tax_id, name=name_to_update, rank=rank + ) + self.session.add(ncbi_taxonomy_to_update) + self.session.commit() + json_input = {"tax_id": tax_id, "name": new_name} + try: + # When + response = self.client.put( + f"/api/ncbi_taxonomy/?exclude_none=false", json=json_input + ) + self.assertEqual(response.status_code, 422) + # Finally + finally: + self.session.rollback() + self.session.delete(ncbi_taxonomy_to_update) + self.session.commit() diff --git a/backend/app/tests/core/use_cases/crud/test_ncbi_taxonomy.py b/backend/app/tests/core/use_cases/crud/test_ncbi_taxonomy.py new file mode 100644 index 0000000000000000000000000000000000000000..a72c333b1dd0e9fced69eea2247250b4354efce0 --- /dev/null +++ b/backend/app/tests/core/use_cases/crud/test_ncbi_taxonomy.py @@ -0,0 +1,85 @@ +import unittest + +from app.core.use_cases.crud.ncbi_taxonomy import CrudNcbiTaxonomyUseCase +from app.core.schemas.entities.ncbi_taxonomy import ( + NcbiTaxonomyCreate, + NcbiTaxonomyUpdate, +) + +GET_ACTION = "Returning ncbi_taxonomy" +GET_ALL_ACTION = "Returning all ncbi_taxonomy entries" +CREATE_ACTION = "Creating ncbi_taxonomy" +UPDATE_ACTION = "Updating ncbi_taxonomy" +DELETE_ACTION = "Deleting ncbi_taxonomy" + + +class MockNcbiTaxonomysRepo: + def get(self, ncbi_taxonomy_name: str): + return GET_ACTION + + def get_all(self): + return GET_ALL_ACTION + + def create(self, ncbi_taxonomy_input: NcbiTaxonomyCreate): + return CREATE_ACTION + + def update(self, ncbi_taxonomy_input: NcbiTaxonomyUpdate): + return UPDATE_ACTION + + def delete(self, ncbi_taxonomy_name: str): + return DELETE_ACTION + + +class TestCrudNcbiTaxonomyUseCase(unittest.TestCase): + def setUp(self) -> None: + self.use_case_test = CrudNcbiTaxonomyUseCase( + ncbi_taxonomys_repo=MockNcbiTaxonomysRepo() + ) + + def test_create_ncbi_taxonomy(self): + # Given + ncbi_taxonomy_test = NcbiTaxonomyCreate( + tax_id=1, rank="species", name="test-ncbi_taxonomy" + ) + expected_output = CREATE_ACTION + # When + test_output = self.use_case_test.create_ncbi_taxonomy(ncbi_taxonomy_test) + # Then + self.assertEqual(test_output, expected_output) + + def test_update_ncbi_taxonomy(self): + # Given + ncbi_taxonomy_test = NcbiTaxonomyCreate( + tax_id=1, rank="species", name="test-ncbi_taxonomy" + ) + expected_output = UPDATE_ACTION + # When + test_output = self.use_case_test.update_ncbi_taxonomy(ncbi_taxonomy_test) + # Then + self.assertEqual(test_output, expected_output) + + def test_delete_ncbi_taxonomy(self): + # Given + ncbi_taxonomy_test_name = "test-ncbi_taxonomy" + expected_output = DELETE_ACTION + # When + test_output = self.use_case_test.delete_ncbi_taxonomy(ncbi_taxonomy_test_name) + # Then + self.assertEqual(test_output, expected_output) + + def test_get_ncbi_taxonomy(self): + # Given + ncbi_taxonomy_test_name = "test-ncbi_taxonomy" + expected_output = GET_ACTION + # When + test_output = self.use_case_test.get_ncbi_taxonomy(ncbi_taxonomy_test_name) + # Then + self.assertEqual(test_output, expected_output) + + def test_get_all(self): + # Given + expected_output = GET_ALL_ACTION + # When + output = self.use_case_test.get_all() + # Then + self.assertEqual(output, expected_output)