``` #!/usr/bin/python # target for each post: # ... post number # ... thread number # ... timestamp # ... icon name # ... filename # ... file dimensions # ... media_hash (maybe okay when combined with dimensions) # ... flags: spoiler, deleted, sticky, locked # ... deleted (flag) # ... email, name, trip, title # ... comment # ... poster hash?? # ... poster country # ... flair # step 0: get a list of all OPs # step 1: collect all post data # ... get it from the desuarchive dump # ... get it from the moe dump # step 2: collect all icons and images # ... bunch of sites to check... just write a script and run it on a server # step 3: organize the data (probably into threads & images) # step 4: upload the data # from desu /post: # ... original filename = media_filename # ... new filename = media_orig # ... size: media_w, media_h, media_size (bytes) # ... flair: troll_country_name import json import requests from utils.ratelimit import RateLimit from utils.data import loadData, convertTypes, getSortedPrefixPaths import pandas import datetime import math import os import glob import argparse import csv DESU_API = 'https://desuarchive.org/_/api/chan' EXPORT_PATH = "/archives/ppp-clone/post-data/exports" THREAD_PROPS_PREFIX = os.path.join(EXPORT_PATH, 'desuarchive.thread-properties.parquet.') POST_PROPS_PREFIX = os.path.join(EXPORT_PATH, 'desuarchive.post-properties.parquet.') IMAGE_HASH_PATH = os.path.join('/archives/ppp-clone/post-data/persistent-metadata/downloaded-images.csv') THUMB_HASH_PATH = os.path.join('/archives/ppp-clone/post-data/persistent-metadata/downloaded-thumbnails.csv') IMAGE_FOLDER_PATH = "/archives/ppp-clone/post-data/images-by-time" THUMB_FOLDER_PATH = "/archives/ppp-clone/post-data/thumbs-by-time" POST_TYPES = { 'postId': pandas.Int64Dtype(), 'subId': pandas.Int64Dtype(), 'threadId': pandas.Int64Dtype(), 'timestamp': pandas.Int64Dtype(), 'origImageName': pandas.StringDtype(), 'newImageName': pandas.StringDtype(), 'imageWidth': pandas.Int32Dtype(), 'imageHeight': pandas.Int32Dtype(), 'imageSize': pandas.Int64Dtype(), 'imageLink': pandas.StringDtype(), 'imageMediaHash': pandas.StringDtype(), 'thumbName': pandas.StringDtype(), 'thumbLink': pandas.StringDtype(), 'spoiler': pandas.BooleanDtype(), 'deleted': pandas.BooleanDtype(), 'banned': pandas.BooleanDtype(), 'capcode': pandas.StringDtype(), 'email': pandas.StringDtype(), 'name': pandas.StringDtype(), 'trip': pandas.StringDtype(), 'title': pandas.StringDtype(), 'comment': pandas.StringDtype(), 'flair': pandas.StringDtype(), } THREAD_TYPES = { 'postId': pandas.Int64Dtype(), 'numUniqueIps': pandas.Int32Dtype(), 'sticky': pandas.BooleanDtype(), 'locked': pandas.BooleanDtype(), 'expiration': pandas.Int64Dtype() } searchLimiter = RateLimit(3) imageLimiter = RateLimit(300) def getThreads(startTime, page, order='asc'): call = f'{DESU_API}/search/?boards=mlp&type=op&start={startTime}&order={order}&page={page}' response = searchLimiter.get(call) if not response: return None return json.loads(response.text) def searchPosts(threadId, startTime, page, order='asc'): call = f'{DESU_API}/search/?boards=mlp&tnum={threadId}&start={startTime}&order={order}&page={page}' response = searchLimiter.get(call) if not response: return None return json.loads(response.text) def getPosts(threadNum): call = f'{DESU_API}/thread/?board=mlp&num={threadNum}' response = searchLimiter.get(call, fragile=True) if not response: return None return json.loads(response.text) def toFormattedTime(timestamp): if math.isnan(timestamp): timestamp = 0 timedelta = datetime.timedelta(hours=-5) timezone = datetime.timezone(timedelta) dt = datetime.datetime.fromtimestamp(timestamp, timezone) return dt.strftime('%Y-%m-%d') def consolidateData(): exportPath = EXPORT_PATH if os.path.exists(f'{exportPath}.bak'): raise Exception(f'check the data and delete {exportPath}.bak') fullThreadData = loadData(THREAD_PROPS_PREFIX, ['postId'], types=THREAD_TYPES) fullPostData = loadData(POST_PROPS_PREFIX, ['postId', 'subId'], types=POST_TYPES) os.rename(EXPORT_PATH, f'{EXPORT_PATH}.bak') os.makedirs(EXPORT_PATH) fullThreadData[0].to_parquet(f'{THREAD_PROPS_PREFIX}0') fullPostData[0].to_parquet(f'{POST_PROPS_PREFIX}0') def loadLastArchivedTime(posts): subId = posts['subId'] actualPosts = posts[subId == 0] candidateLastPostCount = 5000 discoveredThreads = 0 while discoveredThreads < 200: latestPosts = actualPosts.nlargest(candidateLastPostCount, 'postId') discoveredThreads = len(set(latestPosts['threadId'])) candidateLastPostCount += 1000 earliest = latestPosts['postId'].idxmin() return posts['timestamp'].iloc[earliest] def getStartTime(): print('loading post data', end='... ', flush=True) if not os.path.exists(f'{POST_PROPS_PREFIX}0'): return set(), 0, 0 posts, nextPostId = loadData(POST_PROPS_PREFIX, ['postId', 'subId', 'threadId', 'timestamp']) print('found', len(posts), 'posts') print('using next save point', nextPostId) newestThreadTime = toFormattedTime(loadLastArchivedTime(posts)) print('using next thread time', newestThreadTime) return newestThreadTime def getNextSaveId(): sortedPaths = getSortedPrefixPaths(POST_PROPS_PREFIX) return sortedPaths[-1][1] + 1 def addPost(dataframe, post): if not post['media']: post['media'] = {} exif = post.get('exif', None) if exif: exif = json.loads(exif) flair = exif.get('troll_country_name', None) else: flair = None dataframe.append({ 'postId': post['num'], 'subId': post['subnum'], 'threadId': post['thread_num'], 'timestamp': post['timestamp'], 'origImageName': post['media'].get('media_filename', None), 'newImageName': post['media'].get('media_orig', None), 'imageWidth': post['media'].get('media_w', None), 'imageHeight': post['media'].get('media_h', None), 'imageSize': post['media'].get('media_size', None), 'imageLink': post['media'].get('media_link', None), 'imageMediaHash': post['media'].get('media_hash', None), 'thumbName': post['media'].get('preview_orig', None), 'thumbLink': post['media'].get('thumb_link', None), 'spoiler': post['media'].get('spoiler', None), 'deleted': post['deleted'], 'banned': post.get('banned', None), 'capcode': post['capcode'], 'email': post['email'], 'name': post['name'], 'trip': post['trip'], 'title': post['title'], 'comment': post['comment'], 'flair': flair, }) def addThread(dataframe, thread): exif = thread.get('exif', None) if exif: exif = json.loads(exif) numUniqueIps = exif.get('uniqueIps', None) else: numUniqueIps = None dataframe.append({ 'postId': thread['num'], 'numUniqueIps': numUniqueIps, 'sticky': thread['sticky'], 'locked': thread['locked'], 'expiration': str(thread['timestamp_expired']), }) def saveDataframe(df, path, types): df = pandas.DataFrame(df) convertTypes(df, types) backupPath = f'{path}.bak' if os.path.exists(path): os.rename(path, backupPath) df.to_parquet(path) if os.path.exists(backupPath): os.unlink(backupPath) print('saved progress to', path) def scrapeNewPosts(startTime=None): # load data # next one: 2023-10-03 nextSaveId = getNextSaveId() startTime = startTime or getStartTime() posts, threads = [], [] # TODO: update the rest of the script to use arrays instead of dataframes currentPage = 1 unsavedPosts = 0 while True: print('getting thread', startTime, currentPage) threadResults = getThreads(startTime, currentPage) if 'error' in threadResults: print(threadResults) break for op in threadResults['0']['posts']: addThread(threads, op) addPost(posts, op) unsavedPosts += 1 print('getting posts for', op['num'], end='... ') postCount = 0 postResults = getPosts(op['num']) if postResults == None: newPosts, newThread = scrapeThread(op['num']) posts.extend(newPosts) else: postResults = postResults[op['num']] if 'posts' in postResults: postResults = postResults['posts'] for post in postResults.values(): addPost(posts, post) unsavedPosts += 1 postCount += 1 print(postCount, 'posts') if currentPage < 200: currentPage += 1 else: startTime = toFormattedTime(op['timestamp']) currentPage = 1 if unsavedPosts > 100000: saveDataframe(threads, f'{THREAD_PROPS_PREFIX}{nextSaveId}', THREAD_TYPES) saveDataframe(posts, f'{POST_PROPS_PREFIX}{nextSaveId}', POST_TYPES) posts, threads = [], [] nextSaveId += 1 unsavedPosts = 0 if unsavedPosts > 0: saveDataframe(threads, f'{THREAD_PROPS_PREFIX}{nextSaveId}', THREAD_TYPES) saveDataframe(posts, f'{POST_PROPS_PREFIX}{nextSaveId}', POST_TYPES) # fetch all new data # periodically save to disk def scrapeThread(threadId): threads = [] posts = [] currentPage = 1 startTime = toFormattedTime(0) while True: postResults = searchPosts(threadId, startTime, currentPage) if 'error' in postResults: print(postResults) break for post in postResults["0"]["posts"]: addPost(posts, post) if int(post["op"]) == 1: addThread(threads, post) print('added', postResults['0']['posts'][0]['num'], '...', postResults['0']['posts'][-1]['num']) if currentPage < 200: currentPage += 1 else: startTime = toFormattedTime(posts[-1]['timestamp']) currentPage = 1 return posts, threads def tryDownloadingImage(candidates, downloadFolder): for candidate in candidates: if not pandas.isna(candidate['imageLink']): link = candidate['imageLink'] name = candidate['newImageName'] hash = imageLimiter.saveImage(link, os.path.join(downloadFolder, name), timeout=60) if hash: return link for candidate in candidates: name = candidate['newImageName'] prefix = name[:4] infix = name[4:6] link = f'https://desu-usergeneratedcontent.xyz/mlp/image/{prefix}/{infix}/{name}' hash = imageLimiter.saveImage(link, os.path.join(downloadFolder, name), timeout=60) if hash: return link return "" def tryDownloadingThumbnail(candidates, downloadFolder): for candidate in candidates: if not pandas.isna(candidate['thumbLink']): link = candidate['thumbLink'] name = candidate['thumbName'] hash = imageLimiter.saveImage(link, os.path.join(downloadFolder, name), timeout=60) if hash: return link for candidate in candidates: name = candidate['thumbName'] prefix = name[:4] infix = name[4:6] link = f'https://desu-usergeneratedcontent.xyz/mlp/thumb/{prefix}/{infix}/{name}' hash = imageLimiter.saveImage(link, os.path.join(downloadFolder, name), timeout=60) if hash: return link return "" def scrapeImages(): # get list of images to scrape print('loading posts') allPosts = loadData(POST_PROPS_PREFIX, columns=['imageMediaHash', 'newImageName', 'imageLink'], types=POST_TYPES)[0] allPosts.dropna(subset=['imageMediaHash'], inplace=True) allPosts.reset_index(drop=True, inplace=True) print('loading external images') externalImages = loadData('/archives/ppp-clone/image-data/image-id-exports/all-ids.parquet.', columns=['b64-md5'], types={'b64-md5': pandas.StringDtype})[0] externalImages.dropna(inplace=True) externalImages = set(externalImages['b64-md5']) print('loading downloaded images') with open(IMAGE_HASH_PATH, 'r') as inp: data = csv.reader(inp) header = next(data) alreadyDownloaded = set([x[0] for x in data if len(x) == 2]) previousImageFolders = sorted([int(os.path.basename(x)) for x in glob.glob(f'{IMAGE_FOLDER_PATH}/*')]) if not previousImageFolders: currentImageFolderNum = 0 else: currentImageFolderNum = previousImageFolders[-1] + 1 currentImageFolder = f'{IMAGE_FOLDER_PATH}/{currentImageFolderNum}' imagesInFolder = 0 with open(IMAGE_HASH_PATH, 'a') as outp: for nextImageHash, indexes in allPosts.groupby('imageMediaHash').groups.items(): if nextImageHash in externalImages: continue if nextImageHash in alreadyDownloaded: continue print('downloading', nextImageHash, end='... ') candidates = allPosts.iloc[indexes].to_dict(orient='records') # candidates = allPosts[allPosts['imageMediaHash'] == nextImageHash].to_dict(orient='records') link = tryDownloadingImage(candidates, currentImageFolder) if link: print('retrieved from', link) imagesInFolder += 1 else: print('failed') outp.write(f'\n{nextImageHash},{link}') outp.flush() if imagesInFolder > 2000: currentImageFolderNum += 1 currentImageFolder = f'{IMAGE_FOLDER_PATH}/{currentImageFolderNum}' imagesInFolder = 0 def scrapeThumbnails(): # get list of images to scrape print('loading posts') allPosts = loadData(POST_PROPS_PREFIX, columns=['imageMediaHash', 'thumbName', 'thumbLink'], types=POST_TYPES)[0] allPosts.dropna(subset=['imageMediaHash'], inplace=True) allPosts.reset_index(drop=True, inplace=True) print('loading downloaded images') with open(THUMB_HASH_PATH, 'r') as inp: data = csv.reader(inp) header = next(data) alreadyDownloaded = set([x[0] for x in data if len(x) == 2]) previousImageFolders = sorted([int(os.path.basename(x)) for x in glob.glob(f'{THUMB_FOLDER_PATH}/*')]) if not previousImageFolders: currentImageFolderNum = 0 else: currentImageFolderNum = previousImageFolders[-1] + 1 currentImageFolder = f'{THUMB_FOLDER_PATH}/{currentImageFolderNum}' imagesInFolder = 0 with open(THUMB_HASH_PATH, 'a') as outp: for nextImageHash, indexes in allPosts.groupby('imageMediaHash').groups.items(): if nextImageHash in alreadyDownloaded: continue print('downloading', nextImageHash, end='... ') candidates = allPosts.iloc[indexes].to_dict(orient='records') # candidates = allPosts[allPosts['imageMediaHash'] == nextImageHash].to_dict(orient='records') link = tryDownloadingThumbnail(candidates, currentImageFolder) if link: print('retrieved from', link) imagesInFolder += 1 else: print('failed') outp.write(f'\n{nextImageHash},{link}') outp.flush() if imagesInFolder > 20000: currentImageFolderNum += 1 currentImageFolder = f'{THUMB_FOLDER_PATH}/{currentImageFolderNum}' imagesInFolder = 0 if __name__ == '__main__': parser = argparse.ArgumentParser( description='Tool to manage IPFS nodes.' ) group = parser.add_mutually_exclusive_group(required=False) group.add_argument('--thread', required=False, type=str) group.add_argument('--consolidate', required=False, action='store_true') group.add_argument('--sticky', required=False, action='store_true') group.add_argument('--start', required=False, type=str) group.add_argument('--posts', required=False, action='store_true') group.add_argument('--images', required=False, action='store_true') group.add_argument('--thumbnails', required=False, action='store_true') args = parser.parse_args() if args.thread: nextSaveId = getNextSaveId() print('saving to', nextSaveId) posts, threads = scrapeThread(args.thread) saveDataframe(threads, f'{THREAD_PROPS_PREFIX}{nextSaveId}', THREAD_TYPES) saveDataframe(posts, f'{POST_PROPS_PREFIX}{nextSaveId}', POST_TYPES) elif args.consolidate: consolidateData() elif args.sticky: raise Exception('not implemented... run thread() on all sticky threads since their posts might get missed when the thread gets old') elif args.start: scrapeNewPosts(startTime=args.start) elif args.posts: scrapeNewPosts() elif args.images: scrapeImages() elif args.thumbnails: scrapeThumbnails() ```