``` import os import glob import pandas def convertTypes(dataframe, types): for column, dtype in types.items(): try: if dtype == int or dtype._is_numeric: dataframe[column] = pandas.to_numeric(dataframe[column]) dataframe[column] = dataframe[column].astype(dtype) except: pass def getSortedPrefixPaths(prefix): prefixLen = len(prefix) relevantPaths = glob.glob(f'{prefix}*') for candidateBackup in relevantPaths[:]: if candidateBackup.endswith('.bak'): likelyFailure = candidateBackup[:-4] if os.path.exists(likelyFailure): os.unlink(likelyFailure) os.rename(candidateBackup, likelyFailure) relevantPaths.remove(candidateBackup) prioritizedPaths = [(x, int(x[prefixLen:])) for x in relevantPaths] return sorted(prioritizedPaths, key=lambda x:x[1]) def getNextExportPath(prefix): sortedPaths = getSortedPrefixPaths(prefix) if not sortedPaths: return f'{prefix}0' nextId = sortedPaths[-1][1] + 1 return f'{prefix}{nextId}' def loadData(prefix, index=None, columns=None, types=None): sortedPaths = getSortedPrefixPaths(prefix) frames = [pandas.read_parquet(x[0], columns=columns) for x in sortedPaths] result = pandas.concat(frames) if types: convertTypes(result, types) if index: result.drop_duplicates(subset=index, keep='last', inplace=True, ignore_index=True) nextPriority = sortedPaths[-1][1] + 1 return result, nextPriority def loadParquet(filename, index=None, columns=None, types=None): result = pandas.read_parquet(filename, columns=columns) if types: convertTypes(result, types) if index: result.drop_duplicates(subset=index, keep='last', inplace=True, ignore_index=True) return result def consolidateData(exportPath, prefixes, indexes, dtypes): if os.path.exists(f'{exportPath}.bak'): raise Exception(f'check the data and delete {exportPath}.bak') os.rename(exportPath, f'{exportPath}.bak') os.makedirs(exportPath) for prefix, index, dtype in zip(prefixes, indexes, dtypes): fullData = loadData(prefix, index=index, types=dtype) fullData[0].to_parquet(f'{prefix}0') def readAllCidFiles(ipfsDirectoryPath): result = {} for root, dirs, files in os.walk(ipfsDirectoryPath): for basename in files: filePath = os.path.join(root, basename) if not basename.endswith('.cid.txt'): continue with open(filePath) as inp: for line in inp.readlines(): line = line.strip() if not line: continue key, cid = line.split(maxsplit=1) result.setdefault(key.strip(), set()).add(cid.strip()) return result def saveDataframe(df, path, types=None): df = pandas.DataFrame(df) if types: convertTypes(df, types) backupPath = f'{path}.bak' if os.path.exists(path): os.rename(path, backupPath) else: dirPath = os.path.dirname(path) os.makedirs(dirPath, exist_ok=True) df.to_parquet(path) if os.path.exists(backupPath): os.unlink(backupPath) print('saved progress to', path) ```