Skip to content
Snippets Groups Projects
Unverified Commit 95b1f8cf authored by Miles Wells's avatar Miles Wells
Browse files

Added update and save methods

parent a02102e4
Branches updateCache
No related tags found
No related merge requests found
...@@ -72,8 +72,10 @@ class One(ConversionMixin): ...@@ -72,8 +72,10 @@ class One(ConversionMixin):
# init the cache file # init the cache file
self._cache = Bunch({'_meta': { self._cache = Bunch({'_meta': {
'expired': False, 'expired': False,
'created_time': None, 'created_time': None, # Timestamp of when cache tables were first generated
'loaded_time': None, 'loaded_time': None, # Timestamp of when cache tables were last loaded
# 'updated_time': None, # Timestamp of when cache tables were last changed
# 'modified_time': None, # Timestamp of when cache tables were last saved
'raw': {} # map of original table metadata 'raw': {} # map of original table metadata
}}) }})
self._load_cache() self._load_cache()
...@@ -93,6 +95,7 @@ class One(ConversionMixin): ...@@ -93,6 +95,7 @@ class One(ConversionMixin):
def _load_cache(self, cache_dir=None, **kwargs): def _load_cache(self, cache_dir=None, **kwargs):
meta = self._cache['_meta'] meta = self._cache['_meta']
meta['created_time'] = None # Reset created time
INDEX_KEY = 'id' INDEX_KEY = 'id'
for cache_file in Path(cache_dir or self.cache_dir).glob('*.pqt'): for cache_file in Path(cache_dir or self.cache_dir).glob('*.pqt'):
table = cache_file.stem table = cache_file.stem
...@@ -159,6 +162,32 @@ class One(ConversionMixin): ...@@ -159,6 +162,32 @@ class One(ConversionMixin):
raise ValueError(f'Unknown refresh type "{mode}"') raise ValueError(f'Unknown refresh type "{mode}"')
return self._cache['_meta']['loaded_time'] return self._cache['_meta']['loaded_time']
def _save_cache(self, date_modified=None):
"""
Save the current cache tables into parquet files.
Parameters
----------
date_modified : datetime.datetime
The recorded timestamp of when cache was last modified. If None the modified_time
field of the cache meta data is used, or the current time if the this field doesn't
exist
"""
# Get modified date from kwarg, otherwise from meta data, otherwise current time
mod_date = date_modified or self._cache['_meta'].get('modified_time', datetime.now())
self._cache['_meta']['modified_time'] = mod_date
# Format date e.g. '2020-01-01 00:00'
mod_date = mod_date.isoformat(sep=' ', timespec='minutes')
# Save each table in the _cache Bunch
for table in filter(lambda x: not x[0] == '_', self._cache.keys()):
# Attempt to get the raw meta data for this table
curr_meta = self._cache['_meta'].get('raw', {}).get(table, {})
# Add modified data to this meta data
meta = {**curr_meta, 'date_modified': mod_date}
self._cache['_meta']['raw'][table] = meta
filename = self.cache_dir.joinpath(f'{table}.pqt')
parquet.save(filename, self._cache[table], meta)
def _download_datasets(self, dsets, **kwargs) -> List[Path]: def _download_datasets(self, dsets, **kwargs) -> List[Path]:
""" """
Download several datasets given a set of datasets Download several datasets given a set of datasets
...@@ -1226,31 +1255,34 @@ class OneAlyx(One): ...@@ -1226,31 +1255,34 @@ class OneAlyx(One):
return return
# Warn user if expired # Warn user if expired
age = datetime.now() - cache_meta['created_time']
if ( if (
cache_meta['expired'] and cache_meta['expired'] and
cache_meta.get('created_time', False) and cache_meta.get('created_time', False) and
not self.alyx.silent not self.alyx.silent
): ):
age = datetime.now() - cache_meta['created_time']
t_str = (f'{age.days} day(s)' t_str = (f'{age.days} day(s)'
if age.days >= 1 if age.days >= 1
else f'{np.floor(age.seconds / (60 * 2))} hour(s)') else f'{np.floor(age.seconds / (60 * 2))} hour(s)')
_logger.info(f'cache over {t_str} old') _logger.info(f'cache over {t_str} old')
try: try:
# Determine whether a newer cache is available if age.days >= 1 or clobber or len(self._cache.keys()) <= 1:
cache_info = self.alyx.get('cache/info', expires=True) # Determine whether a newer cache is available
remote_created = datetime.fromisoformat(cache_info['date_created']) cache_info = self.alyx.get('cache/info', expires=True)
local_created = cache_meta.get('created_time', None) remote_created = datetime.fromisoformat(cache_info['date_created'])
if local_created and (remote_created - local_created) < timedelta(minutes=1): local_created = cache_meta.get('created_time', None)
_logger.info('No newer cache available') if local_created and (remote_created - local_created) < timedelta(minutes=1):
return _logger.info('No newer cache available')
return
# Download the remote cache files
_logger.info('Downloading remote caches...') # Download the remote cache files
files = self.alyx.download_cache_tables() _logger.info('Downloading remote caches...')
assert any(files) files = self.alyx.download_cache_tables()
super(OneAlyx, self)._load_cache(self.cache_dir) # Reload cache after download assert any(files)
super(OneAlyx, self)._load_cache(self.cache_dir) # Reload cache after download
else:
self._update_cache()
except requests.exceptions.HTTPError: except requests.exceptions.HTTPError:
_logger.error('Failed to load the remote cache file') _logger.error('Failed to load the remote cache file')
self.mode = 'remote' self.mode = 'remote'
...@@ -1258,6 +1290,33 @@ class OneAlyx(One): ...@@ -1258,6 +1290,33 @@ class OneAlyx(One):
_logger.error('Failed to connect to Alyx') _logger.error('Failed to connect to Alyx')
self.mode = 'local' self.mode = 'local'
def _update_cache(self):
"""
TODO Test when empty returned
Returns
-------
"""
# nuo = datetime.datetime.now() - datetime.timedelta(hours=24)
meta = self._cache['_meta']
nuo = meta.get('updated_time', meta['created_time'])
query = f'auto_datetime__gt,{nuo}'
dsets = self.alyx.rest('datasets', 'list', django=query, no_cache=True)
_logger.debug(f'{len(dsets)} updated')
assert all(datetime.fromisoformat(x['auto_datetime']) >= nuo for x in dsets)
datasets = util.datasets2records(dsets)
ses = self.alyx.rest('sessions', 'list', django=query, no_cache=True)
sessions = util.ses2df(ses)
# Merge with current data
meta['updated_time'] = datetime.now()
self._cache['sessions'].update(sessions)
self._cache['datasets'].update(datasets)
self._save_cache(meta['updated_time'])
@property @property
def alyx(self): def alyx(self):
"""one.webclient.AlyxClient: The Alyx Web client""" """one.webclient.AlyxClient: The Alyx Web client"""
......
...@@ -7,6 +7,7 @@ from collections.abc import Mapping ...@@ -7,6 +7,7 @@ from collections.abc import Mapping
import fnmatch import fnmatch
import pandas as pd import pandas as pd
from iblutil.io import parquet from iblutil.io import parquet
import numpy as np import numpy as np
...@@ -23,6 +24,29 @@ def Listable(t): ...@@ -23,6 +24,29 @@ def Listable(t):
return Union[t, Sequence[t]] return Union[t, Sequence[t]]
def ses2df(ses: list) -> pd.DataFrame:
"""Extract session cache records from a remote sessions list.
Unlike ses2records which takes the output of a full session read, ses2df takes the output of
session list.
Parameters
----------
ses : list of dict
List of session dictionaries from Alyx REST endpoint
Returns
-------
pd.DataFrame
Sessions frame
"""
eid = parquet.str2np([x['url'][-36:] for x in ses])
session = pd.DataFrame(data=ses).rename({'start_time': 'date'}, axis=1).drop('url', axis=1)
session['date'] = session['date'].apply(lambda x: x[:10])
session[['eid_0', 'eid_1']] = eid
return session.set_index(['eid_0', 'eid_1'])
def ses2records(ses: dict) -> [pd.Series, pd.DataFrame]: def ses2records(ses: dict) -> [pd.Series, pd.DataFrame]:
"""Extract session cache record and datasets cache from a remote session data record. """Extract session cache record and datasets cache from a remote session data record.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment