1803 17.51 KB 497
#!/usr/bin/python
# target for each post:
# ... post number
# ... thread number
# ... timestamp
# ... icon name
# ... filename
# ... file dimensions
# ... media_hash (maybe okay when combined with dimensions)
# ... flags: spoiler, deleted, sticky, locked
# ... deleted (flag)
# ... email, name, trip, title
# ... comment
# ... poster hash??
# ... poster country
# ... flair
# step 0: get a list of all OPs
# step 1: collect all post data
# ... get it from the desuarchive dump
# ... get it from the moe dump
# step 2: collect all icons and images
# ... bunch of sites to check... just write a script and run it on a server
# step 3: organize the data (probably into threads & images)
# step 4: upload the data
# from desu /post:
# ... original filename = media_filename
# ... new filename = media_orig
# ... size: media_w, media_h, media_size (bytes)
# ... flair: troll_country_name
import json
import requests
from utils.ratelimit import RateLimit
from utils.data import loadData, convertTypes, getSortedPrefixPaths
import pandas
import datetime
import math
import os
import glob
import argparse
import csv
DESU_API = 'https://desuarchive.org/_/api/chan'
EXPORT_PATH = "/archives/ppp-clone/post-data/exports"
THREAD_PROPS_PREFIX = os.path.join(EXPORT_PATH, 'desuarchive.thread-properties.parquet.')
POST_PROPS_PREFIX = os.path.join(EXPORT_PATH, 'desuarchive.post-properties.parquet.')
IMAGE_HASH_PATH = os.path.join('/archives/ppp-clone/post-data/persistent-metadata/downloaded-images.csv')
THUMB_HASH_PATH = os.path.join('/archives/ppp-clone/post-data/persistent-metadata/downloaded-thumbnails.csv')
IMAGE_FOLDER_PATH = "/archives/ppp-clone/post-data/images-by-time"
THUMB_FOLDER_PATH = "/archives/ppp-clone/post-data/thumbs-by-time"
POST_TYPES = {
'postId': pandas.Int64Dtype(),
'subId': pandas.Int64Dtype(),
'threadId': pandas.Int64Dtype(),
'timestamp': pandas.Int64Dtype(),
'origImageName': pandas.StringDtype(),
'newImageName': pandas.StringDtype(),
'imageWidth': pandas.Int32Dtype(),
'imageHeight': pandas.Int32Dtype(),
'imageSize': pandas.Int64Dtype(),
'imageLink': pandas.StringDtype(),
'imageMediaHash': pandas.StringDtype(),
'thumbName': pandas.StringDtype(),
'thumbLink': pandas.StringDtype(),
'spoiler': pandas.BooleanDtype(),
'deleted': pandas.BooleanDtype(),
'banned': pandas.BooleanDtype(),
'capcode': pandas.StringDtype(),
'email': pandas.StringDtype(),
'name': pandas.StringDtype(),
'trip': pandas.StringDtype(),
'title': pandas.StringDtype(),
'comment': pandas.StringDtype(),
'flair': pandas.StringDtype(),
}
THREAD_TYPES = {
'postId': pandas.Int64Dtype(),
'numUniqueIps': pandas.Int32Dtype(),
'sticky': pandas.BooleanDtype(),
'locked': pandas.BooleanDtype(),
'expiration': pandas.Int64Dtype()
}
searchLimiter = RateLimit(3)
imageLimiter = RateLimit(300)
def getThreads(startTime, page, order='asc'):
call = f'{DESU_API}/search/?boards=mlp&type=op&start={startTime}&order={order}&page={page}'
response = searchLimiter.get(call)
if not response:
return None
return json.loads(response.text)
def searchPosts(threadId, startTime, page, order='asc'):
call = f'{DESU_API}/search/?boards=mlp&tnum={threadId}&start={startTime}&order={order}&page={page}'
response = searchLimiter.get(call)
if not response:
return None
return json.loads(response.text)
def getPosts(threadNum):
call = f'{DESU_API}/thread/?board=mlp&num={threadNum}'
response = searchLimiter.get(call, fragile=True)
if not response:
return None
return json.loads(response.text)
def toFormattedTime(timestamp):
if math.isnan(timestamp):
timestamp = 0
timedelta = datetime.timedelta(hours=-5)
timezone = datetime.timezone(timedelta)
dt = datetime.datetime.fromtimestamp(timestamp, timezone)
return dt.strftime('%Y-%m-%d')
def consolidateData():
exportPath = EXPORT_PATH
if os.path.exists(f'{exportPath}.bak'):
raise Exception(f'check the data and delete {exportPath}.bak')
fullThreadData = loadData(THREAD_PROPS_PREFIX, ['postId'], types=THREAD_TYPES)
fullPostData = loadData(POST_PROPS_PREFIX, ['postId', 'subId'], types=POST_TYPES)
os.rename(EXPORT_PATH, f'{EXPORT_PATH}.bak')
os.makedirs(EXPORT_PATH)
fullThreadData[0].to_parquet(f'{THREAD_PROPS_PREFIX}0')
fullPostData[0].to_parquet(f'{POST_PROPS_PREFIX}0')
def loadLastArchivedTime(posts):
subId = posts['subId']
actualPosts = posts[subId == 0]
candidateLastPostCount = 5000
discoveredThreads = 0
while discoveredThreads < 200:
latestPosts = actualPosts.nlargest(candidateLastPostCount, 'postId')
discoveredThreads = len(set(latestPosts['threadId']))
candidateLastPostCount += 1000
earliest = latestPosts['postId'].idxmin()
return posts['timestamp'].iloc[earliest]
def getStartTime():
print('loading post data', end='... ', flush=True)
if not os.path.exists(f'{POST_PROPS_PREFIX}0'):
return set(), 0, 0
posts, nextPostId = loadData(POST_PROPS_PREFIX, ['postId', 'subId', 'threadId', 'timestamp'])
print('found', len(posts), 'posts')
print('using next save point', nextPostId)
newestThreadTime = toFormattedTime(loadLastArchivedTime(posts))
print('using next thread time', newestThreadTime)
return newestThreadTime
def getNextSaveId():
sortedPaths = getSortedPrefixPaths(POST_PROPS_PREFIX)
return sortedPaths[-1][1] + 1
def addPost(dataframe, post):
if not post['media']:
post['media'] = {}
exif = post.get('exif', None)
if exif:
exif = json.loads(exif)
flair = exif.get('troll_country_name', None)
else:
flair = None
dataframe.append({
'postId': post['num'],
'subId': post['subnum'],
'threadId': post['thread_num'],
'timestamp': post['timestamp'],
'origImageName': post['media'].get('media_filename', None),
'newImageName': post['media'].get('media_orig', None),
'imageWidth': post['media'].get('media_w', None),
'imageHeight': post['media'].get('media_h', None),
'imageSize': post['media'].get('media_size', None),
'imageLink': post['media'].get('media_link', None),
'imageMediaHash': post['media'].get('media_hash', None),
'thumbName': post['media'].get('preview_orig', None),
'thumbLink': post['media'].get('thumb_link', None),
'spoiler': post['media'].get('spoiler', None),
'deleted': post['deleted'],
'banned': post.get('banned', None),
'capcode': post['capcode'],
'email': post['email'],
'name': post['name'],
'trip': post['trip'],
'title': post['title'],
'comment': post['comment'],
'flair': flair,
})
def addThread(dataframe, thread):
exif = thread.get('exif', None)
if exif:
exif = json.loads(exif)
numUniqueIps = exif.get('uniqueIps', None)
else:
numUniqueIps = None
dataframe.append({
'postId': thread['num'],
'numUniqueIps': numUniqueIps,
'sticky': thread['sticky'],
'locked': thread['locked'],
'expiration': str(thread['timestamp_expired']),
})
def saveDataframe(df, path, types):
df = pandas.DataFrame(df)
convertTypes(df, types)
backupPath = f'{path}.bak'
if os.path.exists(path):
os.rename(path, backupPath)
df.to_parquet(path)
if os.path.exists(backupPath):
os.unlink(backupPath)
print('saved progress to', path)
def scrapeNewPosts(startTime=None):
# load data
# next one: 2023-10-03
nextSaveId = getNextSaveId()
startTime = startTime or getStartTime()
posts, threads = [], [] # TODO: update the rest of the script to use arrays instead of dataframes
currentPage = 1
unsavedPosts = 0
while True:
print('getting thread', startTime, currentPage)
threadResults = getThreads(startTime, currentPage)
if 'error' in threadResults:
print(threadResults)
break
for op in threadResults['0']['posts']:
addThread(threads, op)
addPost(posts, op)
unsavedPosts += 1
print('getting posts for', op['num'], end='... ')
postCount = 0
postResults = getPosts(op['num'])
if postResults == None:
newPosts, newThread = scrapeThread(op['num'])
posts.extend(newPosts)
else:
postResults = postResults[op['num']]
if 'posts' in postResults:
postResults = postResults['posts']
for post in postResults.values():
addPost(posts, post)
unsavedPosts += 1
postCount += 1
print(postCount, 'posts')
if currentPage < 200:
currentPage += 1
else:
startTime = toFormattedTime(op['timestamp'])
currentPage = 1
if unsavedPosts > 100000:
saveDataframe(threads, f'{THREAD_PROPS_PREFIX}{nextSaveId}', THREAD_TYPES)
saveDataframe(posts, f'{POST_PROPS_PREFIX}{nextSaveId}', POST_TYPES)
posts, threads = [], []
nextSaveId += 1
unsavedPosts = 0
if unsavedPosts > 0:
saveDataframe(threads, f'{THREAD_PROPS_PREFIX}{nextSaveId}', THREAD_TYPES)
saveDataframe(posts, f'{POST_PROPS_PREFIX}{nextSaveId}', POST_TYPES)
# fetch all new data
# periodically save to disk
def scrapeThread(threadId):
threads = []
posts = []
currentPage = 1
startTime = toFormattedTime(0)
while True:
postResults = searchPosts(threadId, startTime, currentPage)
if 'error' in postResults:
print(postResults)
break
for post in postResults["0"]["posts"]:
addPost(posts, post)
if int(post["op"]) == 1:
addThread(threads, post)
print('added', postResults['0']['posts'][0]['num'], '...', postResults['0']['posts'][-1]['num'])
if currentPage < 200:
currentPage += 1
else:
startTime = toFormattedTime(posts[-1]['timestamp'])
currentPage = 1
return posts, threads
def tryDownloadingImage(candidates, downloadFolder):
for candidate in candidates:
if not pandas.isna(candidate['imageLink']):
link = candidate['imageLink']
name = candidate['newImageName']
hash = imageLimiter.saveImage(link, os.path.join(downloadFolder, name), timeout=60)
if hash:
return link
for candidate in candidates:
name = candidate['newImageName']
prefix = name[:4]
infix = name[4:6]
link = f'https://desu-usergeneratedcontent.xyz/mlp/image/{prefix}/{infix}/{name}'
hash = imageLimiter.saveImage(link, os.path.join(downloadFolder, name), timeout=60)
if hash:
return link
return ""
def tryDownloadingThumbnail(candidates, downloadFolder):
for candidate in candidates:
if not pandas.isna(candidate['thumbLink']):
link = candidate['thumbLink']
name = candidate['thumbName']
hash = imageLimiter.saveImage(link, os.path.join(downloadFolder, name), timeout=60)
if hash:
return link
for candidate in candidates:
name = candidate['thumbName']
prefix = name[:4]
infix = name[4:6]
link = f'https://desu-usergeneratedcontent.xyz/mlp/thumb/{prefix}/{infix}/{name}'
hash = imageLimiter.saveImage(link, os.path.join(downloadFolder, name), timeout=60)
if hash:
return link
return ""
def scrapeImages():
# get list of images to scrape
print('loading posts')
allPosts = loadData(POST_PROPS_PREFIX, columns=['imageMediaHash', 'newImageName', 'imageLink'], types=POST_TYPES)[0]
allPosts.dropna(subset=['imageMediaHash'], inplace=True)
allPosts.reset_index(drop=True, inplace=True)
print('loading external images')
externalImages = loadData('/archives/ppp-clone/image-data/image-id-exports/all-ids.parquet.', columns=['b64-md5'], types={'b64-md5': pandas.StringDtype})[0]
externalImages.dropna(inplace=True)
externalImages = set(externalImages['b64-md5'])
print('loading downloaded images')
with open(IMAGE_HASH_PATH, 'r') as inp:
data = csv.reader(inp)
header = next(data)
alreadyDownloaded = set([x[0] for x in data if len(x) == 2])
previousImageFolders = sorted([int(os.path.basename(x)) for x in glob.glob(f'{IMAGE_FOLDER_PATH}/*')])
if not previousImageFolders:
currentImageFolderNum = 0
else:
currentImageFolderNum = previousImageFolders[-1] + 1
currentImageFolder = f'{IMAGE_FOLDER_PATH}/{currentImageFolderNum}'
imagesInFolder = 0
with open(IMAGE_HASH_PATH, 'a') as outp:
for nextImageHash, indexes in allPosts.groupby('imageMediaHash').groups.items():
if nextImageHash in externalImages:
continue
if nextImageHash in alreadyDownloaded:
continue
print('downloading', nextImageHash, end='... ')
candidates = allPosts.iloc[indexes].to_dict(orient='records')
# candidates = allPosts[allPosts['imageMediaHash'] == nextImageHash].to_dict(orient='records')
link = tryDownloadingImage(candidates, currentImageFolder)
if link:
print('retrieved from', link)
imagesInFolder += 1
else:
print('failed')
outp.write(f'\n{nextImageHash},{link}')
outp.flush()
if imagesInFolder > 2000:
currentImageFolderNum += 1
currentImageFolder = f'{IMAGE_FOLDER_PATH}/{currentImageFolderNum}'
imagesInFolder = 0
def scrapeThumbnails():
# get list of images to scrape
print('loading posts')
allPosts = loadData(POST_PROPS_PREFIX, columns=['imageMediaHash', 'thumbName', 'thumbLink'], types=POST_TYPES)[0]
allPosts.dropna(subset=['imageMediaHash'], inplace=True)
allPosts.reset_index(drop=True, inplace=True)
print('loading downloaded images')
with open(THUMB_HASH_PATH, 'r') as inp:
data = csv.reader(inp)
header = next(data)
alreadyDownloaded = set([x[0] for x in data if len(x) == 2])
previousImageFolders = sorted([int(os.path.basename(x)) for x in glob.glob(f'{THUMB_FOLDER_PATH}/*')])
if not previousImageFolders:
currentImageFolderNum = 0
else:
currentImageFolderNum = previousImageFolders[-1] + 1
currentImageFolder = f'{THUMB_FOLDER_PATH}/{currentImageFolderNum}'
imagesInFolder = 0
with open(THUMB_HASH_PATH, 'a') as outp:
for nextImageHash, indexes in allPosts.groupby('imageMediaHash').groups.items():
if nextImageHash in alreadyDownloaded:
continue
print('downloading', nextImageHash, end='... ')
candidates = allPosts.iloc[indexes].to_dict(orient='records')
# candidates = allPosts[allPosts['imageMediaHash'] == nextImageHash].to_dict(orient='records')
link = tryDownloadingThumbnail(candidates, currentImageFolder)
if link:
print('retrieved from', link)
imagesInFolder += 1
else:
print('failed')
outp.write(f'\n{nextImageHash},{link}')
outp.flush()
if imagesInFolder > 20000:
currentImageFolderNum += 1
currentImageFolder = f'{THUMB_FOLDER_PATH}/{currentImageFolderNum}'
imagesInFolder = 0
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Tool to manage IPFS nodes.'
)
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument('--thread', required=False, type=str)
group.add_argument('--consolidate', required=False, action='store_true')
group.add_argument('--sticky', required=False, action='store_true')
group.add_argument('--start', required=False, type=str)
group.add_argument('--posts', required=False, action='store_true')
group.add_argument('--images', required=False, action='store_true')
group.add_argument('--thumbnails', required=False, action='store_true')
args = parser.parse_args()
if args.thread:
nextSaveId = getNextSaveId()
print('saving to', nextSaveId)
posts, threads = scrapeThread(args.thread)
saveDataframe(threads, f'{THREAD_PROPS_PREFIX}{nextSaveId}', THREAD_TYPES)
saveDataframe(posts, f'{POST_PROPS_PREFIX}{nextSaveId}', POST_TYPES)
elif args.consolidate:
consolidateData()
elif args.sticky:
raise Exception('not implemented... run thread() on all sticky threads since their posts might get missed when the thread gets old')
elif args.start:
scrapeNewPosts(startTime=args.start)
elif args.posts:
scrapeNewPosts()
elif args.images:
scrapeImages()
elif args.thumbnails:
scrapeThumbnails()
by Synthbot
by Synthbot
by Synthbot
by Synthbot
by Synthbot