PASTEDOWN   75   0
   1803 17.51 KB    497

desuarchive scraper

By Synthbot
Created: 2023-10-08 04:26:02
Expiry: Never

#!/usr/bin/python

# target for each post:
# ... post number
# ... thread number
# ... timestamp
# ... icon name
# ... filename
# ... file dimensions
# ... media_hash (maybe okay when combined with dimensions)
# ... flags: spoiler, deleted, sticky, locked
# ... deleted (flag)
# ... email, name, trip, title
# ... comment
# ... poster hash??
# ... poster country
# ... flair

# step 0: get a list of all OPs
# step 1: collect all post data
# ... get it from the desuarchive dump
# ... get it from the moe dump
# step 2: collect all icons and images
# ... bunch of sites to check... just write a script and run it on a server
# step 3: organize the data (probably into threads & images)
# step 4: upload the data

# from desu /post:
# ... original filename = media_filename
# ... new filename = media_orig
# ... size: media_w, media_h, media_size (bytes)
# ... flair: troll_country_name

import json
import requests
from utils.ratelimit import RateLimit
from utils.data import loadData, convertTypes, getSortedPrefixPaths
import pandas
import datetime
import math
import os
import glob
import argparse
import csv

DESU_API = 'https://desuarchive.org/_/api/chan'
EXPORT_PATH = "/archives/ppp-clone/post-data/exports"

THREAD_PROPS_PREFIX = os.path.join(EXPORT_PATH, 'desuarchive.thread-properties.parquet.')
POST_PROPS_PREFIX = os.path.join(EXPORT_PATH, 'desuarchive.post-properties.parquet.')
IMAGE_HASH_PATH = os.path.join('/archives/ppp-clone/post-data/persistent-metadata/downloaded-images.csv')
THUMB_HASH_PATH = os.path.join('/archives/ppp-clone/post-data/persistent-metadata/downloaded-thumbnails.csv')
IMAGE_FOLDER_PATH = "/archives/ppp-clone/post-data/images-by-time"
THUMB_FOLDER_PATH = "/archives/ppp-clone/post-data/thumbs-by-time"

POST_TYPES = {
    'postId': pandas.Int64Dtype(),
    'subId': pandas.Int64Dtype(),
    'threadId': pandas.Int64Dtype(),
    'timestamp': pandas.Int64Dtype(),
    'origImageName': pandas.StringDtype(),
    'newImageName': pandas.StringDtype(),
    'imageWidth': pandas.Int32Dtype(),
    'imageHeight': pandas.Int32Dtype(),
    'imageSize': pandas.Int64Dtype(),
    'imageLink': pandas.StringDtype(),
    'imageMediaHash': pandas.StringDtype(),
    'thumbName': pandas.StringDtype(),
    'thumbLink': pandas.StringDtype(),
    'spoiler': pandas.BooleanDtype(),
    'deleted': pandas.BooleanDtype(),
    'banned': pandas.BooleanDtype(),
    'capcode': pandas.StringDtype(),
    'email': pandas.StringDtype(),
    'name': pandas.StringDtype(),
    'trip': pandas.StringDtype(),
    'title': pandas.StringDtype(),
    'comment': pandas.StringDtype(),
    'flair': pandas.StringDtype(),
}

THREAD_TYPES = {
    'postId': pandas.Int64Dtype(),
    'numUniqueIps': pandas.Int32Dtype(),
    'sticky': pandas.BooleanDtype(),
    'locked': pandas.BooleanDtype(),
    'expiration': pandas.Int64Dtype()
}

searchLimiter = RateLimit(3)
imageLimiter = RateLimit(300)

def getThreads(startTime, page, order='asc'):
    call = f'{DESU_API}/search/?boards=mlp&type=op&start={startTime}&order={order}&page={page}'
    response = searchLimiter.get(call)

    if not response:
        return None

    return json.loads(response.text)

def searchPosts(threadId, startTime, page, order='asc'):
    call = f'{DESU_API}/search/?boards=mlp&tnum={threadId}&start={startTime}&order={order}&page={page}'
    response = searchLimiter.get(call)

    if not response:
        return None

    return json.loads(response.text)

def getPosts(threadNum):
    call = f'{DESU_API}/thread/?board=mlp&num={threadNum}'
    response = searchLimiter.get(call, fragile=True)
    if not response:
        return None
    return json.loads(response.text)

def toFormattedTime(timestamp):
    if math.isnan(timestamp):
        timestamp = 0
    timedelta = datetime.timedelta(hours=-5)
    timezone = datetime.timezone(timedelta)
    dt = datetime.datetime.fromtimestamp(timestamp, timezone)
    return dt.strftime('%Y-%m-%d')

def consolidateData():
    exportPath = EXPORT_PATH
    if os.path.exists(f'{exportPath}.bak'):
        raise Exception(f'check the data and delete {exportPath}.bak')

    fullThreadData = loadData(THREAD_PROPS_PREFIX, ['postId'], types=THREAD_TYPES)
    fullPostData = loadData(POST_PROPS_PREFIX, ['postId', 'subId'], types=POST_TYPES)

    os.rename(EXPORT_PATH, f'{EXPORT_PATH}.bak')
    os.makedirs(EXPORT_PATH)

    fullThreadData[0].to_parquet(f'{THREAD_PROPS_PREFIX}0')
    fullPostData[0].to_parquet(f'{POST_PROPS_PREFIX}0')

def loadLastArchivedTime(posts):
    subId = posts['subId']
    actualPosts = posts[subId == 0]
    candidateLastPostCount = 5000
    discoveredThreads = 0
    while discoveredThreads < 200:
        latestPosts = actualPosts.nlargest(candidateLastPostCount, 'postId')
        discoveredThreads = len(set(latestPosts['threadId']))
        candidateLastPostCount += 1000

    earliest = latestPosts['postId'].idxmin()
    return posts['timestamp'].iloc[earliest]

def getStartTime():
    print('loading post data', end='... ', flush=True)
    if not os.path.exists(f'{POST_PROPS_PREFIX}0'):
        return set(), 0, 0

    posts, nextPostId = loadData(POST_PROPS_PREFIX, ['postId', 'subId', 'threadId', 'timestamp'])
    print('found', len(posts), 'posts')
    print('using next save point', nextPostId)

    newestThreadTime = toFormattedTime(loadLastArchivedTime(posts))
    print('using next thread time', newestThreadTime)
    return newestThreadTime

def getNextSaveId():
    sortedPaths = getSortedPrefixPaths(POST_PROPS_PREFIX)
    return sortedPaths[-1][1] + 1

def addPost(dataframe, post):
    if not post['media']:
        post['media'] = {}

    exif = post.get('exif', None)
    if exif:
        exif = json.loads(exif)
        flair = exif.get('troll_country_name', None)
    else:
        flair = None

    dataframe.append({
        'postId': post['num'],
        'subId': post['subnum'],
        'threadId': post['thread_num'],
        'timestamp': post['timestamp'],
        'origImageName': post['media'].get('media_filename', None),
        'newImageName': post['media'].get('media_orig', None),
        'imageWidth': post['media'].get('media_w', None),
        'imageHeight': post['media'].get('media_h', None),
        'imageSize': post['media'].get('media_size', None),
        'imageLink': post['media'].get('media_link', None),
        'imageMediaHash': post['media'].get('media_hash', None),
        'thumbName': post['media'].get('preview_orig', None),
        'thumbLink': post['media'].get('thumb_link', None),
        'spoiler': post['media'].get('spoiler', None),
        'deleted': post['deleted'],
        'banned': post.get('banned', None),
        'capcode': post['capcode'],
        'email': post['email'],
        'name': post['name'],
        'trip': post['trip'],
        'title': post['title'],
        'comment': post['comment'],
        'flair': flair,
    })

def addThread(dataframe, thread):
    exif = thread.get('exif', None)
    if exif:
        exif = json.loads(exif)
        numUniqueIps = exif.get('uniqueIps', None)
    else:
        numUniqueIps = None

    dataframe.append({
        'postId': thread['num'],
        'numUniqueIps': numUniqueIps,
        'sticky': thread['sticky'],
        'locked': thread['locked'],
        'expiration': str(thread['timestamp_expired']),
    })

def saveDataframe(df, path, types):
    df = pandas.DataFrame(df)
    convertTypes(df, types)

    backupPath = f'{path}.bak'
    if os.path.exists(path):
        os.rename(path, backupPath)
    df.to_parquet(path)
    if os.path.exists(backupPath):
        os.unlink(backupPath)
    print('saved progress to', path)

def scrapeNewPosts(startTime=None):
    # load data
    # next one: 2023-10-03
    nextSaveId = getNextSaveId()
    startTime = startTime or getStartTime()

    posts, threads = [], [] # TODO: update the rest of the script to use arrays instead of dataframes
    currentPage = 1
    unsavedPosts = 0

    while True:
        print('getting thread', startTime, currentPage)
        threadResults = getThreads(startTime, currentPage)
        if 'error' in threadResults:
            print(threadResults)
            break

        for op in threadResults['0']['posts']:
            addThread(threads, op)
            addPost(posts, op)
            unsavedPosts += 1

            print('getting posts for', op['num'], end='... ')
            postCount = 0
            postResults = getPosts(op['num'])
            if postResults == None:
                newPosts, newThread = scrapeThread(op['num'])
                posts.extend(newPosts)
            else:
                postResults = postResults[op['num']]
                if 'posts' in postResults:
                    postResults = postResults['posts']
                    for post in postResults.values():
                        addPost(posts, post)
                        unsavedPosts += 1
                        postCount += 1
                print(postCount, 'posts')

        if currentPage < 200:
            currentPage += 1
        else:
            startTime = toFormattedTime(op['timestamp'])
            currentPage = 1

        if unsavedPosts > 100000:
            saveDataframe(threads, f'{THREAD_PROPS_PREFIX}{nextSaveId}', THREAD_TYPES)
            saveDataframe(posts, f'{POST_PROPS_PREFIX}{nextSaveId}', POST_TYPES)
            posts, threads = [], []
            nextSaveId += 1
            unsavedPosts = 0

    if unsavedPosts > 0:
        saveDataframe(threads, f'{THREAD_PROPS_PREFIX}{nextSaveId}', THREAD_TYPES)
        saveDataframe(posts, f'{POST_PROPS_PREFIX}{nextSaveId}', POST_TYPES)

    # fetch all new data
    # periodically save to disk

def scrapeThread(threadId):
    threads = []
    posts = []
    currentPage = 1
    startTime = toFormattedTime(0)

    while True:
        postResults = searchPosts(threadId, startTime, currentPage)
        if 'error' in postResults:
            print(postResults)
            break

        for post in postResults["0"]["posts"]:
            addPost(posts, post)
            if int(post["op"]) == 1:
                addThread(threads, post)

        print('added', postResults['0']['posts'][0]['num'], '...', postResults['0']['posts'][-1]['num'])

        if currentPage < 200:
            currentPage += 1
        else:
            startTime = toFormattedTime(posts[-1]['timestamp'])
            currentPage = 1

    return posts, threads

def tryDownloadingImage(candidates, downloadFolder):
    for candidate in candidates:
        if not pandas.isna(candidate['imageLink']):
            link = candidate['imageLink']
            name = candidate['newImageName']
            hash = imageLimiter.saveImage(link, os.path.join(downloadFolder, name), timeout=60)
            if hash:
                return link

    for candidate in candidates:
        name = candidate['newImageName']
        prefix = name[:4]
        infix = name[4:6]
        link = f'https://desu-usergeneratedcontent.xyz/mlp/image/{prefix}/{infix}/{name}'
        hash = imageLimiter.saveImage(link, os.path.join(downloadFolder, name), timeout=60)
        if hash:
            return link

    return ""

def tryDownloadingThumbnail(candidates, downloadFolder):
    for candidate in candidates:
        if not pandas.isna(candidate['thumbLink']):
            link = candidate['thumbLink']
            name = candidate['thumbName']
            hash = imageLimiter.saveImage(link, os.path.join(downloadFolder, name), timeout=60)
            if hash:
                return link

    for candidate in candidates:
        name = candidate['thumbName']
        prefix = name[:4]
        infix = name[4:6]
        link = f'https://desu-usergeneratedcontent.xyz/mlp/thumb/{prefix}/{infix}/{name}'
        hash = imageLimiter.saveImage(link, os.path.join(downloadFolder, name), timeout=60)
        if hash:
            return link

    return ""

def scrapeImages():
    # get list of images to scrape
    print('loading posts')
    allPosts = loadData(POST_PROPS_PREFIX, columns=['imageMediaHash', 'newImageName', 'imageLink'], types=POST_TYPES)[0]
    allPosts.dropna(subset=['imageMediaHash'], inplace=True)
    allPosts.reset_index(drop=True, inplace=True)

    print('loading external images')
    externalImages = loadData('/archives/ppp-clone/image-data/image-id-exports/all-ids.parquet.', columns=['b64-md5'], types={'b64-md5': pandas.StringDtype})[0]
    externalImages.dropna(inplace=True)
    externalImages = set(externalImages['b64-md5'])

    print('loading downloaded images')
    with open(IMAGE_HASH_PATH, 'r') as inp:
        data = csv.reader(inp)
        header = next(data)
        alreadyDownloaded = set([x[0] for x in data if len(x) == 2])

    previousImageFolders = sorted([int(os.path.basename(x)) for x in glob.glob(f'{IMAGE_FOLDER_PATH}/*')])
    if not previousImageFolders:
        currentImageFolderNum = 0
    else:
        currentImageFolderNum = previousImageFolders[-1] + 1
    currentImageFolder = f'{IMAGE_FOLDER_PATH}/{currentImageFolderNum}'
    imagesInFolder = 0

    with open(IMAGE_HASH_PATH, 'a') as outp:
        for nextImageHash, indexes in allPosts.groupby('imageMediaHash').groups.items():
            if nextImageHash in externalImages:
                continue
            if nextImageHash in alreadyDownloaded:
                continue

            print('downloading', nextImageHash, end='... ')
            candidates = allPosts.iloc[indexes].to_dict(orient='records')
            # candidates = allPosts[allPosts['imageMediaHash'] == nextImageHash].to_dict(orient='records')
            link = tryDownloadingImage(candidates, currentImageFolder)
            if link:
                print('retrieved from', link)
                imagesInFolder += 1
            else:
                print('failed')
            outp.write(f'\n{nextImageHash},{link}')

            outp.flush()

            if imagesInFolder > 2000:
                currentImageFolderNum += 1
                currentImageFolder = f'{IMAGE_FOLDER_PATH}/{currentImageFolderNum}'
                imagesInFolder = 0

def scrapeThumbnails():
    # get list of images to scrape
    print('loading posts')
    allPosts = loadData(POST_PROPS_PREFIX, columns=['imageMediaHash', 'thumbName', 'thumbLink'], types=POST_TYPES)[0]
    allPosts.dropna(subset=['imageMediaHash'], inplace=True)
    allPosts.reset_index(drop=True, inplace=True)

    print('loading downloaded images')
    with open(THUMB_HASH_PATH, 'r') as inp:
        data = csv.reader(inp)
        header = next(data)
        alreadyDownloaded = set([x[0] for x in data if len(x) == 2])

    previousImageFolders = sorted([int(os.path.basename(x)) for x in glob.glob(f'{THUMB_FOLDER_PATH}/*')])
    if not previousImageFolders:
        currentImageFolderNum = 0
    else:
        currentImageFolderNum = previousImageFolders[-1] + 1
    currentImageFolder = f'{THUMB_FOLDER_PATH}/{currentImageFolderNum}'
    imagesInFolder = 0

    with open(THUMB_HASH_PATH, 'a') as outp:
        for nextImageHash, indexes in allPosts.groupby('imageMediaHash').groups.items():
            if nextImageHash in alreadyDownloaded:
                continue

            print('downloading', nextImageHash, end='... ')
            candidates = allPosts.iloc[indexes].to_dict(orient='records')
            # candidates = allPosts[allPosts['imageMediaHash'] == nextImageHash].to_dict(orient='records')
            link = tryDownloadingThumbnail(candidates, currentImageFolder)
            if link:
                print('retrieved from', link)
                imagesInFolder += 1
            else:
                print('failed')
            outp.write(f'\n{nextImageHash},{link}')

            outp.flush()

            if imagesInFolder > 20000:
                currentImageFolderNum += 1
                currentImageFolder = f'{THUMB_FOLDER_PATH}/{currentImageFolderNum}'
                imagesInFolder = 0

if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description='Tool to manage IPFS nodes.'
    )
    group = parser.add_mutually_exclusive_group(required=False)
    group.add_argument('--thread', required=False, type=str)
    group.add_argument('--consolidate', required=False, action='store_true')
    group.add_argument('--sticky', required=False, action='store_true')
    group.add_argument('--start', required=False, type=str)
    group.add_argument('--posts', required=False, action='store_true')
    group.add_argument('--images', required=False, action='store_true')
    group.add_argument('--thumbnails', required=False, action='store_true')

    args = parser.parse_args()
    if args.thread:
        nextSaveId = getNextSaveId()
        print('saving to', nextSaveId)
        posts, threads = scrapeThread(args.thread)
        saveDataframe(threads, f'{THREAD_PROPS_PREFIX}{nextSaveId}', THREAD_TYPES)
        saveDataframe(posts, f'{POST_PROPS_PREFIX}{nextSaveId}', POST_TYPES)
    elif args.consolidate:
        consolidateData()
    elif args.sticky:
        raise Exception('not implemented... run thread() on all sticky threads since their posts might get missed when the thread gets old')
    elif args.start:
        scrapeNewPosts(startTime=args.start)
    elif args.posts:
        scrapeNewPosts()
    elif args.images:
        scrapeImages()
    elif args.thumbnails:
        scrapeThumbnails()

Pony Preservation Project - /mlp/con 2021

by Synthbot

Pony Preservation Project - /mlp/con 2020

by Synthbot

Preservation Project History - 2020 to 2021

by Synthbot

Missing music

by Synthbot

Animation format

by Synthbot