...
 
Commits (2)
#!/usr/bin/env python3
import argparse
import csv
import json
import traceback
from tqdm import tqdm
from models import StrainWrapped
from utils import cache_json
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-o', '--output', default='output.csv', help="where to write the output file")
parser.add_argument('-i', '--input', default='genomes_proks815-genomesonnovember.sample.csv', help="file to read")
# parser.add_argument('--biosample-attributes', dest='bio_sample_attributes', default='bio-sample-attributes.txt',
# help="A text file where each line is an attribute to fetch when parsing a BioSample entry")
parser.add_argument(
'--limit',
default=-1,
type=int,
help="max row to read in the input file, a value <= 0 means all rows.",
)
parser.add_argument('-d', '--delimiter', default='\t', help="delimiter to use in both input and output file")
parser.add_argument('-v', '--verbose', action='store_true')
parser.add_argument('--no-cache', dest='no_cache', action='store_true', help="Neither use or save to file cache")
args = parser.parse_args()
if args.no_cache:
cache_json.disk_cache_enabled = False
bio_samples_attributes = {}
with open(args.input, 'r') as csv_input_file:
csv_reader = csv.reader(csv_input_file, delimiter=args.delimiter, quotechar='|')
header = next(csv_reader)
strain_pos = header.index('Strain')
bio_sample_pos = header.index('BioSample')
assembly_pos = header.index('Assembly')
cpt = args.limit
for row in tqdm(csv_reader):
try:
strain = StrainWrapped(row[strain_pos], row[bio_sample_pos], row[assembly_pos])
for attr_name, _ in strain.get_bio_sample_attributes():
bio_samples_attributes.setdefault(attr_name, [0])[0] += 1
except Exception as e:
print(row)
traceback.print_exc()
raise e
if cpt == 0:
break
cpt -= 1
bio_samples_attributes = {k: v for k, v in
sorted(bio_samples_attributes.items(), key=lambda item: item[1], reverse=True)}
with open("bio_samples_attributes.txt", 'w') as bio_samples_attributes_txt:
for k, v in bio_samples_attributes.items():
bio_samples_attributes_txt.write("%s\t%i\n" % (k, v[0]))
with open("bio_samples_attributes.json", 'w') as bio_samples_attributes_json:
bio_samples_attributes_json.write(json.dumps(bio_samples_attributes, indent=4))
......@@ -2,174 +2,24 @@
import argparse
import csv
import json
import os
from xml.etree.ElementTree import fromstring, fromstringlist
from Bio import Entrez
Entrez.email = os.environ.get("NCBI_EMAIL", None)
Entrez.api_key = os.environ.get("NCBI_API_KEY", None)
def cache_json(function):
mem_cache = {}
def wrapper(*args):
path = ".cached/%s.%s.json" % (function.__name__, "--".join(args))
if not cache_json.disk_cache_enabled:
try:
return mem_cache[path]
except KeyError:
rv = function(*args)
mem_cache[path] = rv
return rv
try:
with open(path, 'r') as file:
return json.loads(file.read())
except FileNotFoundError:
rv = function(*args)
os.makedirs(".cached", exist_ok=True)
with open(path, 'w') as file:
file.write(json.dumps(rv, indent=4))
return rv
return wrapper
cache_json.disk_cache_enabled = True
@cache_json
def bio_sample_record(identifier):
with Entrez.esearch(db="biosample", retmax=2, term=identifier) as handle:
record = Entrez.read(handle)
if int(record["Count"]) > 1:
with Entrez.esearch(db="biosample", retmax=2, term="%s AND (latest[filter])" % identifier) as handle:
record = Entrez.read(handle)
record_id = record["IdList"]
with Entrez.efetch(db="biosample", id=record_id, retmode="xml", rettype="docsum") as handle:
return Entrez.read(handle)
@cache_json
def assembly_record(identifier):
with Entrez.esearch(db="assembly", retmax=2, term=identifier) as handle:
record = Entrez.read(handle)
count = int(record["Count"])
if count > 1:
raise NotImplementedError("Too much hit on assembly for %s" % identifier)
if count == 0:
return {}
record_id = record["IdList"]
with Entrez.efetch(db="assembly", id=record_id, retmode="xml", rettype="docsum") as handle:
return Entrez.read(handle)
@cache_json
def sra_records(identifier):
with Entrez.esearch(db="sra", retmax=2, term=identifier) as handle:
record = Entrez.read(handle)
count = int(record["Count"])
# if count > 1:
# raise NotImplementedError("Too much hit on sra for %s" % identifier)
if count == 0:
return {}
ret = []
for record_id in record["IdList"]:
with Entrez.efetch(db="sra", id=record_id, retmode="xml", rettype="docsum") as handle:
for response in Entrez.read(handle):
ret.append(response)
return ret
class StrainWrapped:
def __init__(self, strain, bio_sample, assembly):
self.strain = strain.strip()
self.bio_sample = bio_sample.strip()
self.assembly = assembly.strip()
@property
def _bio_sample_sample_data(self):
sample_data = bio_sample_record(self.bio_sample)["DocumentSummarySet"]["DocumentSummary"][0]["SampleData"]
my_xml = fromstring(sample_data)
return my_xml
@property
def host(self):
return self._bio_sample_sample_data_attr("host")
@property
def location(self):
# where was obtained the sample, ville, hospital
return self._bio_sample_sample_data_attr("geo_loc_name")
@property
def isolation_date(self):
# date du prélévement
# return bio_sample_record(self.bio_sample)["DocumentSummarySet"]["DocumentSummary"][0]["Date"]
return self._bio_sample_sample_data_attr("collection_date")
@property
def source(self):
# ville campagne
# return bio_sample_record(self.bio_sample)["DocumentSummarySet"]["DocumentSummary"][0]["SourceSample"]
return "TODO"
@property
def origin(self):
# prélévement sanguin ? bucale ? ...
return self._bio_sample_sample_data_attr("isolation_source")
@property
def coverage(self):
return assembly_record(self.assembly)["DocumentSummarySet"]["DocumentSummary"][0]["Coverage"]
@property
def sequencing(self):
# type ngs pacbio, long read. Donne machine, technique de sequençage, longueur des reads, illumina ou autre,
# nom de la machine, nom du kit
return ", ".join(self._sra_attrs("instrument_model"))
@property
def assembling(self):
# <assembly-level>5</assembly-level>
# <assembly-status>Complete Genome</assembly-status>
my_xml = fromstringlist([
"<wrap>",
assembly_record(self.assembly)["DocumentSummarySet"]["DocumentSummary"][0]["Meta"],
"</wrap>",
])
for node in my_xml.iterfind(".//assembly-status"):
return node.text
def _bio_sample_sample_data_attr(self, attr_name):
attr = self._bio_sample_sample_data.find(".//*[@attribute_name=\"" + attr_name + "\"]")
return attr.text if attr is not None else None
def _sra_attrs(self, attr_name):
for entry in sra_records(self.bio_sample):
my_xml = fromstringlist([
"<wrap>",
entry["ExpXml"],
"</wrap>",
])
nodes = my_xml.iterfind(".//*[@" + attr_name + "]")
for node in nodes:
yield node.attrib[attr_name]
from tqdm import tqdm
from models import StrainWrapped
from utils import cache_json
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-o', '--output', default='output.csv', help="where to write the output file")
parser.add_argument('-i', '--input', default='genomes_proks815-genomesonnovember.sample.csv', help="file to read")
parser.add_argument(
'--limit',
default='-1',
type=int,
help="max row to read in the input file, a value <= 0 means all rows.",
)
parser.add_argument('-d', '--delimiter', default='\t', help="delimiter to use in both input and output file")
parser.add_argument('-v', dest='verbose', action='store_true')
parser.add_argument('-v', '--verbose', action='store_true')
parser.add_argument('--no-cache', dest='no_cache', action='store_true', help="Neither use or save to file cache")
args = parser.parse_args()
if args.no_cache:
......@@ -198,7 +48,10 @@ if __name__ == '__main__':
csv_writer.writerow(output_header)
if args.verbose:
print(output_header)
for row in csv_reader:
cpt = -1
if args.limit > 0:
cpt = args.limit
for row in tqdm(csv_reader):
results = [""] * len(output_header)
strain = StrainWrapped(row[strain_pos], row[bio_sample_pos], row[assembly_pos])
results[strain_output_pos] = strain.strain
......@@ -215,3 +68,6 @@ if __name__ == '__main__':
if args.verbose:
print(results)
csv_writer.writerow(results)
if cpt == 0:
break
cpt -= 1
#!/usr/bin/env python3
from xml.etree.ElementTree import fromstring, fromstringlist
from record_extractor import bio_sample_record, assembly_record, sra_records
class StrainWrapped:
def __init__(self, strain, bio_sample, assembly):
self.strain = strain.strip()
self.bio_sample = bio_sample.strip()
self.assembly = assembly.strip()
@property
def _bio_sample_sample_data(self):
sample_data = bio_sample_record(self.bio_sample)["DocumentSummarySet"]["DocumentSummary"][0]["SampleData"]
my_xml = fromstring(sample_data)
return my_xml
def get_bio_sample_attributes(self):
for attr in self._bio_sample_sample_data.iterfind(".//*[@attribute_name]"):
aka = set(attr.attrib.values())
attribute_name = attr.attrib['attribute_name']
aka.remove(attribute_name)
yield attribute_name, aka
@property
def host(self):
return self._bio_sample_sample_data_attr("host")
@property
def location(self):
# where was obtained the sample, ville, hospital
return self._bio_sample_sample_data_attr("geo_loc_name")
@property
def isolation_date(self):
# date du prélévement
# return bio_sample_record(self.bio_sample)["DocumentSummarySet"]["DocumentSummary"][0]["Date"]
return self._bio_sample_sample_data_attr("collection_date")
@property
def source(self):
# ville campagne
# return bio_sample_record(self.bio_sample)["DocumentSummarySet"]["DocumentSummary"][0]["SourceSample"]
return "TODO"
@property
def origin(self):
# prélévement sanguin ? bucale ? ...
return self._bio_sample_sample_data_attr("isolation_source")
@property
def coverage(self):
return assembly_record(self.assembly)["DocumentSummarySet"]["DocumentSummary"][0]["Coverage"]
@property
def sequencing(self):
# type ngs pacbio, long read. Donne machine, technique de sequençage, longueur des reads, illumina ou autre,
# nom de la machine, nom du kit
return ", ".join(self._sra_attrs("instrument_model"))
@property
def assembling(self):
# <assembly-level>5</assembly-level>
# <assembly-status>Complete Genome</assembly-status>
my_xml = fromstringlist([
"<wrap>",
assembly_record(self.assembly)["DocumentSummarySet"]["DocumentSummary"][0]["Meta"],
"</wrap>",
])
for node in my_xml.iterfind(".//assembly-status"):
return node.text
def _bio_sample_sample_data_attr(self, attr_name):
attr = self._bio_sample_sample_data.find(".//*[@attribute_name=\"" + attr_name + "\"]")
return attr.text if attr is not None else None
def _sra_attrs(self, attr_name):
for entry in sra_records(self.bio_sample):
my_xml = fromstringlist([
"<wrap>",
entry["ExpXml"],
"</wrap>",
])
nodes = my_xml.iterfind(".//*[@" + attr_name + "]")
for node in nodes:
yield node.attrib[attr_name]
#!/usr/bin/env python3
import os
from Bio import Entrez
from utils import cache_json
Entrez.email = os.environ.get("NCBI_EMAIL", None)
Entrez.api_key = os.environ.get("NCBI_API_KEY", None)
@cache_json
def bio_sample_record(identifier):
with Entrez.esearch(db="biosample", retmax=2, term="%s[accn]" % identifier) as handle:
record = Entrez.read(handle)
if int(record["Count"]) > 1:
raise NotImplementedError("Too much hit on biosample for %s" % identifier)
record_id = record["IdList"]
with Entrez.efetch(db="biosample", id=record_id, retmode="xml", rettype="docsum") as handle:
return Entrez.read(handle)
@cache_json
def assembly_record(identifier):
with Entrez.esearch(db="assembly", retmax=2, term=identifier) as handle:
record = Entrez.read(handle)
count = int(record["Count"])
if count > 1:
with Entrez.esearch(db="assembly", retmax=2, term=identifier) as handle:
record = Entrez.read(handle)
if count == 0:
return {}
record_id = record["IdList"]
with Entrez.efetch(db="assembly", id=record_id, retmode="xml", rettype="docsum") as handle:
return Entrez.read(handle)
@cache_json
def sra_records(identifier):
with Entrez.esearch(db="sra", retmax=2, term=identifier) as handle:
record = Entrez.read(handle)
count = int(record["Count"])
# if count > 1:
# raise NotImplementedError("Too much hit on sra for %s" % identifier)
if count == 0:
return {}
ret = []
for record_id in record["IdList"]:
with Entrez.efetch(db="sra", id=record_id, retmode="xml", rettype="docsum") as handle:
for response in Entrez.read(handle):
ret.append(response)
return ret
biopython
\ No newline at end of file
biopython
tqdm
\ No newline at end of file
#!/usr/bin/env python3
import argparse
import csv
import json
import os
import traceback
from xml.etree.ElementTree import fromstring, fromstringlist
def cache_json(function):
mem_cache = {}
def wrapper(*args):
path = ".cached/%s.%s.json" % (function.__name__, "--".join(args))
if not cache_json.disk_cache_enabled:
try:
return mem_cache[path]
except KeyError:
rv = function(*args)
mem_cache[path] = rv
return rv
try:
with open(path, 'r') as file:
return json.loads(file.read())
except FileNotFoundError:
rv = function(*args)
os.makedirs(".cached", exist_ok=True)
with open(path, 'w') as file:
file.write(json.dumps(rv, indent=4))
return rv
return wrapper
cache_json.disk_cache_enabled = True