Commit 69f0e12d authored by Bryan  BRANCOTTE's avatar Bryan BRANCOTTE

caching json responses, writing results, more field looked up

parent 1e9b8c42
genomes_proks815-genomesonnovember.csv
output.csv
.idea
.venv
\ No newline at end of file
.venv
.cached/
\ No newline at end of file
......@@ -2,6 +2,7 @@
import argparse
import csv
import json
import os
from Bio import Entrez
......@@ -10,45 +11,96 @@ Entrez.email = os.environ.get("NCBI_EMAIL", None)
Entrez.api_key = os.environ.get("NCBI_API_KEY", None)
def cache_json(function):
def wrapper(*args):
path = ".cached/%s.json" % "--".join(args)
try:
with open(path, 'r') as file:
return json.loads(file.read())
except FileNotFoundError:
rv = function(*args)
os.makedirs(".cached", exist_ok=True)
with open(path, 'w') as file:
file.write(json.dumps(rv, indent=4))
return rv
return wrapper
@cache_json
def bio_sample_record(identifier):
with Entrez.esearch(db="biosample", retmax=2, term=identifier) as handle:
record = Entrez.read(handle)
if int(record["Count"]) > 1:
with Entrez.esearch(db="biosample", retmax=2, term="%s AND (latest[filter])" % identifier) as handle:
record = Entrez.read(handle)
record_id = record["IdList"]
with Entrez.efetch(db="biosample", id=record_id, retmode="xml", rettype="docsum") as handle:
return Entrez.read(handle)
@cache_json
def assembly_record(identifier):
with Entrez.esearch(db="assembly", retmax=2, term=identifier) as handle:
record = Entrez.read(handle)
if int(record["Count"]) > 1:
raise NotImplementedError
record_id = record["IdList"]
with Entrez.efetch(db="assembly", id=record_id, retmode="xml", rettype="docsum") as handle:
return Entrez.read(handle)
class StrainWrapped:
def __init__(self, strain, bio_sample, assembly):
self.strain = strain
self.bio_sample = bio_sample
self.assembly = assembly
self.__host = None
self.__sample_data = None
@property
def sample_data(self):
if self.__sample_data is not None:
return self.__sample_data
with Entrez.esearch(db="biosample", retmax=2, term=self.bio_sample) as handle:
record = Entrez.read(handle)
if int(record["Count"]) > 1:
with Entrez.esearch(db="biosample", retmax=2, term="%s AND (latest[filter])" % self.bio_sample) as handle:
record = Entrez.read(handle)
id = record["IdList"]
with Entrez.efetch(db="biosample", id=id, retmode="xml", rettype="docsum") as handle:
# record = Entrez.read(handle)
# docsum = handle.read()
record = Entrez.read(handle)
# print(json.dumps(record, indent=4))
sample_data = record["DocumentSummarySet"]["DocumentSummary"][0]["SampleData"]
from xml.etree.ElementTree import fromstring
myxml = fromstring(sample_data)
# print(myxml)
# if "host" in sample_data:
# exit(2)
return myxml
def _sample_data(self):
# print(json.dumps(record, indent=4))
sample_data = bio_sample_record(self.bio_sample)["DocumentSummarySet"]["DocumentSummary"][0]["SampleData"]
from xml.etree.ElementTree import fromstring
myxml = fromstring(sample_data)
return myxml
@property
def host(self):
if self.__host is not None:
return self.__host
host = self.sample_data.find("Attributes/Attribute[@attribute_name=\"host\"]")
self.__host = host.text if host is not None else None
return self.__host
return self._bio_sample_sample_data_attr("host")
@property
def location(self):
return self._bio_sample_sample_data_attr("geo_loc_name")
@property
def isolation_date(self):
return bio_sample_record(self.bio_sample)["DocumentSummarySet"]["DocumentSummary"][0]["Date"]
@property
def source(self):
return bio_sample_record(self.bio_sample)["DocumentSummarySet"]["DocumentSummary"][0]["SourceSample"]
@property
def coverage(self):
return assembly_record(self.assembly)["DocumentSummarySet"]["DocumentSummary"][0]["Coverage"]
@property
def sequencing(self):
return "TODO"
@property
def assembling(self):
return "TODO"
@property
def origin(self):
return "TODO"
def _bio_sample_sample_data_attr(self, attr_name):
attr = self._sample_data.find("Attributes/Attribute[@attribute_name=\"" + attr_name + "\"]")
return attr.text if attr is not None else None
if __name__ == '__main__':
......@@ -58,11 +110,20 @@ if __name__ == '__main__':
parser.add_argument('-d', '--delimiter', default='\t', help="delimiter to use in both input and output file")
parser.add_argument('-v', dest='verbose', action='store_true')
args = parser.parse_args()
output_header = ["Strain", "BioSample", "Assembly", "Host"]
output_header = ["Strain", "BioSample", "Assembly", "Date", "Location", "Source", "Origin", "Host",
"Sequencing", "Assembling", "Coverage"]
strain_output_pos = output_header.index('Strain')
bio_sample_output_pos = output_header.index('BioSample')
assembly_output_pos = output_header.index('Assembly')
host_output_pos = output_header.index('Host')
location_output_pos = output_header.index('Location')
date_output_pos = output_header.index('Date')
source_output_pos = output_header.index('Source')
origin_output_pos = output_header.index('Origin')
sequencing_output_pos = output_header.index('Sequencing')
coverage_output_pos = output_header.index('Coverage')
assembling_output_pos = output_header.index('Assembling')
with open(args.input, 'r') as csv_input_file, open(args.output, 'w') as csv_output_file:
csv_reader = csv.reader(csv_input_file, delimiter=args.delimiter, quotechar='|')
csv_writer = csv.writer(csv_output_file, delimiter=args.delimiter)
......@@ -70,12 +131,23 @@ if __name__ == '__main__':
strain_pos = header.index('Strain')
bio_sample_pos = header.index('BioSample')
assembly_pos = header.index('Assembly')
print(header)
csv_writer.writerow(output_header)
if args.verbose:
print(output_header)
for row in csv_reader:
results = [""] * len(output_header)
strain = StrainWrapped(row[strain_pos], row[bio_sample_pos], row[assembly_pos])
results[strain_output_pos] = strain.strain
results[bio_sample_output_pos] = strain.bio_sample
results[assembly_output_pos] = strain.assembly
results[-1] = strain.host
print(results)
results[host_output_pos] = strain.host
results[location_output_pos] = strain.location
results[date_output_pos] = strain.isolation_date
results[source_output_pos] = strain.source
results[origin_output_pos] = strain.origin
results[sequencing_output_pos] = strain.sequencing
results[coverage_output_pos] = strain.coverage
results[assembling_output_pos] = strain.assembling
if args.verbose:
print(results)
csv_writer.writerow(results)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment