import os
import glob
import pandas
def convertTypes(dataframe, types):
for column, dtype in types.items():
try:
if dtype == int or dtype._is_numeric:
dataframe[column] = pandas.to_numeric(dataframe[column])
dataframe[column] = dataframe[column].astype(dtype)
except:
pass
def getSortedPrefixPaths(prefix):
prefixLen = len(prefix)
relevantPaths = glob.glob(f'{prefix}*')
for candidateBackup in relevantPaths[:]:
if candidateBackup.endswith('.bak'):
likelyFailure = candidateBackup[:-4]
if os.path.exists(likelyFailure):
os.unlink(likelyFailure)
os.rename(candidateBackup, likelyFailure)
relevantPaths.remove(candidateBackup)
prioritizedPaths = [(x, int(x[prefixLen:])) for x in relevantPaths]
return sorted(prioritizedPaths, key=lambda x:x[1])
def getNextExportPath(prefix):
sortedPaths = getSortedPrefixPaths(prefix)
if not sortedPaths:
return f'{prefix}0'
nextId = sortedPaths[-1][1] + 1
return f'{prefix}{nextId}'
def loadData(prefix, index=None, columns=None, types=None):
sortedPaths = getSortedPrefixPaths(prefix)
frames = [pandas.read_parquet(x[0], columns=columns) for x in sortedPaths]
result = pandas.concat(frames)
if types:
convertTypes(result, types)
if index:
result.drop_duplicates(subset=index, keep='last', inplace=True, ignore_index=True)
nextPriority = sortedPaths[-1][1] + 1
return result, nextPriority
def loadParquet(filename, index=None, columns=None, types=None):
result = pandas.read_parquet(filename, columns=columns)
if types:
convertTypes(result, types)
if index:
result.drop_duplicates(subset=index, keep='last', inplace=True, ignore_index=True)
return result
def consolidateData(exportPath, prefixes, indexes, dtypes):
if os.path.exists(f'{exportPath}.bak'):
raise Exception(f'check the data and delete {exportPath}.bak')
os.rename(exportPath, f'{exportPath}.bak')
os.makedirs(exportPath)
for prefix, index, dtype in zip(prefixes, indexes, dtypes):
fullData = loadData(prefix, index=index, types=dtype)
fullData[0].to_parquet(f'{prefix}0')
def readAllCidFiles(ipfsDirectoryPath):
result = {}
for root, dirs, files in os.walk(ipfsDirectoryPath):
for basename in files:
filePath = os.path.join(root, basename)
if not basename.endswith('.cid.txt'):
continue
with open(filePath) as inp:
for line in inp.readlines():
line = line.strip()
if not line:
continue
key, cid = line.split(maxsplit=1)
result.setdefault(key.strip(), set()).add(cid.strip())
return result
def saveDataframe(df, path, types=None):
df = pandas.DataFrame(df)
if types:
convertTypes(df, types)
backupPath = f'{path}.bak'
if os.path.exists(path):
os.rename(path, backupPath)
else:
dirPath = os.path.dirname(path)
os.makedirs(dirPath, exist_ok=True)
df.to_parquet(path)
if os.path.exists(backupPath):
os.unlink(backupPath)
print('saved progress to', path)
by Synthbot
by Synthbot
by Synthbot
by Synthbot
by Synthbot