diff --git a/one/api.py b/one/api.py index 779f3655a9450469fc7c3eafc96c0b2e2498a897..dab910fef97f69411c8a6c25e17ab54d3e855241 100644 --- a/one/api.py +++ b/one/api.py @@ -72,8 +72,10 @@ class One(ConversionMixin): # init the cache file self._cache = Bunch({'_meta': { 'expired': False, - 'created_time': None, - 'loaded_time': None, + 'created_time': None, # Timestamp of when cache tables were first generated + 'loaded_time': None, # Timestamp of when cache tables were last loaded + # 'updated_time': None, # Timestamp of when cache tables were last changed + # 'modified_time': None, # Timestamp of when cache tables were last saved 'raw': {} # map of original table metadata }}) self._load_cache() @@ -93,6 +95,7 @@ class One(ConversionMixin): def _load_cache(self, cache_dir=None, **kwargs): meta = self._cache['_meta'] + meta['created_time'] = None # Reset created time INDEX_KEY = 'id' for cache_file in Path(cache_dir or self.cache_dir).glob('*.pqt'): table = cache_file.stem @@ -159,6 +162,32 @@ class One(ConversionMixin): raise ValueError(f'Unknown refresh type "{mode}"') return self._cache['_meta']['loaded_time'] + def _save_cache(self, date_modified=None): + """ + Save the current cache tables into parquet files. + + Parameters + ---------- + date_modified : datetime.datetime + The recorded timestamp of when cache was last modified. If None the modified_time + field of the cache meta data is used, or the current time if the this field doesn't + exist + """ + # Get modified date from kwarg, otherwise from meta data, otherwise current time + mod_date = date_modified or self._cache['_meta'].get('modified_time', datetime.now()) + self._cache['_meta']['modified_time'] = mod_date + # Format date e.g. '2020-01-01 00:00' + mod_date = mod_date.isoformat(sep=' ', timespec='minutes') + # Save each table in the _cache Bunch + for table in filter(lambda x: not x[0] == '_', self._cache.keys()): + # Attempt to get the raw meta data for this table + curr_meta = self._cache['_meta'].get('raw', {}).get(table, {}) + # Add modified data to this meta data + meta = {**curr_meta, 'date_modified': mod_date} + self._cache['_meta']['raw'][table] = meta + filename = self.cache_dir.joinpath(f'{table}.pqt') + parquet.save(filename, self._cache[table], meta) + def _download_datasets(self, dsets, **kwargs) -> List[Path]: """ Download several datasets given a set of datasets @@ -1226,31 +1255,34 @@ class OneAlyx(One): return # Warn user if expired + age = datetime.now() - cache_meta['created_time'] if ( cache_meta['expired'] and cache_meta.get('created_time', False) and not self.alyx.silent ): - age = datetime.now() - cache_meta['created_time'] t_str = (f'{age.days} day(s)' if age.days >= 1 else f'{np.floor(age.seconds / (60 * 2))} hour(s)') _logger.info(f'cache over {t_str} old') try: - # Determine whether a newer cache is available - cache_info = self.alyx.get('cache/info', expires=True) - remote_created = datetime.fromisoformat(cache_info['date_created']) - local_created = cache_meta.get('created_time', None) - if local_created and (remote_created - local_created) < timedelta(minutes=1): - _logger.info('No newer cache available') - return - - # Download the remote cache files - _logger.info('Downloading remote caches...') - files = self.alyx.download_cache_tables() - assert any(files) - super(OneAlyx, self)._load_cache(self.cache_dir) # Reload cache after download + if age.days >= 1 or clobber or len(self._cache.keys()) <= 1: + # Determine whether a newer cache is available + cache_info = self.alyx.get('cache/info', expires=True) + remote_created = datetime.fromisoformat(cache_info['date_created']) + local_created = cache_meta.get('created_time', None) + if local_created and (remote_created - local_created) < timedelta(minutes=1): + _logger.info('No newer cache available') + return + + # Download the remote cache files + _logger.info('Downloading remote caches...') + files = self.alyx.download_cache_tables() + assert any(files) + super(OneAlyx, self)._load_cache(self.cache_dir) # Reload cache after download + else: + self._update_cache() except requests.exceptions.HTTPError: _logger.error('Failed to load the remote cache file') self.mode = 'remote' @@ -1258,6 +1290,33 @@ class OneAlyx(One): _logger.error('Failed to connect to Alyx') self.mode = 'local' + def _update_cache(self): + """ + TODO Test when empty returned + Returns + ------- + + """ + # nuo = datetime.datetime.now() - datetime.timedelta(hours=24) + meta = self._cache['_meta'] + nuo = meta.get('updated_time', meta['created_time']) + query = f'auto_datetime__gt,{nuo}' + dsets = self.alyx.rest('datasets', 'list', django=query, no_cache=True) + + _logger.debug(f'{len(dsets)} updated') + assert all(datetime.fromisoformat(x['auto_datetime']) >= nuo for x in dsets) + + datasets = util.datasets2records(dsets) + + ses = self.alyx.rest('sessions', 'list', django=query, no_cache=True) + sessions = util.ses2df(ses) + + # Merge with current data + meta['updated_time'] = datetime.now() + self._cache['sessions'].update(sessions) + self._cache['datasets'].update(datasets) + self._save_cache(meta['updated_time']) + @property def alyx(self): """one.webclient.AlyxClient: The Alyx Web client""" diff --git a/one/util.py b/one/util.py index 052de359f23b22327710dbb370a5780d29e03f63..61e88513033aa9288b6dc14f8a5820ad2b8710bc 100644 --- a/one/util.py +++ b/one/util.py @@ -7,6 +7,7 @@ from collections.abc import Mapping import fnmatch import pandas as pd + from iblutil.io import parquet import numpy as np @@ -23,6 +24,29 @@ def Listable(t): return Union[t, Sequence[t]] +def ses2df(ses: list) -> pd.DataFrame: + """Extract session cache records from a remote sessions list. + + Unlike ses2records which takes the output of a full session read, ses2df takes the output of + session list. + + Parameters + ---------- + ses : list of dict + List of session dictionaries from Alyx REST endpoint + + Returns + ------- + pd.DataFrame + Sessions frame + """ + eid = parquet.str2np([x['url'][-36:] for x in ses]) + session = pd.DataFrame(data=ses).rename({'start_time': 'date'}, axis=1).drop('url', axis=1) + session['date'] = session['date'].apply(lambda x: x[:10]) + session[['eid_0', 'eid_1']] = eid + return session.set_index(['eid_0', 'eid_1']) + + def ses2records(ses: dict) -> [pd.Series, pd.DataFrame]: """Extract session cache record and datasets cache from a remote session data record.