From b362abd38ad4a6624bc056c58eb90ae235c63f00 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 23 Sep 2019 23:00:23 -0700 Subject: rename postgrest directory sql --- sql/README.md | 126 ++++++++++++++++++++++++++++ sql/backfill/backfill.md | 135 ++++++++++++++++++++++++++++++ sql/backfill/backfill_cdx.py | 131 +++++++++++++++++++++++++++++ sql/backfill/backfill_file_meta.py | 55 ++++++++++++ sql/backfill/backfill_grobid.py | 91 ++++++++++++++++++++ sql/backfill/backfill_grobid_unpaywall.py | 59 +++++++++++++ sql/backfill/filter_transform_cdx.py | 88 +++++++++++++++++++ sql/backfill/petabox_transform.py | 24 ++++++ sql/sandcrawler_schema.sql | 59 +++++++++++++ 9 files changed, 768 insertions(+) create mode 100644 sql/README.md create mode 100644 sql/backfill/backfill.md create mode 100755 sql/backfill/backfill_cdx.py create mode 100755 sql/backfill/backfill_file_meta.py create mode 100755 sql/backfill/backfill_grobid.py create mode 100755 sql/backfill/backfill_grobid_unpaywall.py create mode 100755 sql/backfill/filter_transform_cdx.py create mode 100755 sql/backfill/petabox_transform.py create mode 100644 sql/sandcrawler_schema.sql (limited to 'sql') diff --git a/sql/README.md b/sql/README.md new file mode 100644 index 0000000..b171614 --- /dev/null +++ b/sql/README.md @@ -0,0 +1,126 @@ + +TL;DR: replace hbase with postgresql tables with REST API (http://postgrest.org) + +No primary storage of anything in this table. Everything should be rapidly +re-creatable from dumps, kafka topics (compressed), CDX, petabox metadata, etc. +This is a secondary view on all of that. + +## Schema + + schema/database name is 'sandcrawler' + + cdx: include revisits or not? + id: int64, PK + sha1hex: string, not null, index + cdx_sha1hex: string + url: string, not null + datetime: ISO 8601:1988 (string?), not null + mimetype: string + warc_path: string (item and filename) + warc_offset: i64 + created: datetime, index (?) + ?crawl: string + ?domain: string + + file_meta + sha1hex, string, PK + md5hex: string + sha256hex: string + size_bytes: i64 + mime: string (verifying file status; optional for now?) + + fatcat_file + sha1hex: string, PK + file_ident: string, index? + release_ident: ? + + petabox + id: int64, PK + sha1hex: string, notnull, index + item: string, notnull + path: string, notnull (TODO: URL encoded? separate sub-archive path?) + + grobid + sha1hex: string, PK + updated: datetime + grobid_version (string) + status_code: i32 + status: string (JSONB?), only if status != 200 + metadata: JSONB, title, first author, year (not for now?) + glutton_fatcat_release: string, index + + shadow + sha1hex: string, PK + shadow_corpus: string, PK + shadow_id: string + doi: string + pmid: string + isbn13: string + +Alternatively, to be more like existing system could have "one big table" or +multiple tables all with same key (sha1b32) and UNIQ. As is, every sha1 pk +column is 40 bytes of both index and data, or 8+ GByte (combined) for each +table with 100 million rows. using raw bytes could help, but makes all +code/queries much trickier. + +Should we have "created" or "updated" timestamps on all these columns to enable +kafka tailing? + +TODO: +- how to indicate CDX sha1 vs. true sha1 mis-match? pretty rare. recrawl and delete row from `gwb_cdx`? +- only most recent GROBID? or keep multiple versions? here and minio + +## Existing Stuff Sizes (estimates) + + 78.5G /user/bnewbold/journal_crawl_cdx + 19.7G /user/bnewbold/sandcrawler/output-prod/2018-12-14-1737.00-dumpfilemeta + 2.7G file_hashes.tsv + 228.5G /user/bnewbold/sandcrawler/output-prod/2018-09-23-0405.30-dumpgrobidmetainsertable + +## Use Cases + +Core goal here is to mostly kill hbase/hadoop. What jobs are actually used there? + +- backfill: load in-scope (fulltext) crawl results from CDX + => bulk (many line) inserts +- rowcount: "how many unique PDFs crawled?" + => trivial SQL query +- status code count: "how much GROBID progress?" + => trivial SQL query +- dumpungrobided: "what files still need to be processed" + => SQL join with a "first" on CDX side +- dumpgrobidxml: "merge CDX/file info with extracted XML, for those that were successful" + => SQL dump or rowscan, then minio fetches + +This table is generally "single file raw fulltext metadata". + +"Enrichment" jobs: + +- GROBID +- glutton (if not GROBID) +- extra file metadata +- match newly enriched files to fatcat + +What else? + +- track additional raw file metadata +- dump all basic GROBID metadata (title, authors, year) to attempt merge/match + +Questions we might want to answer + +- total size of PDF corpus (terabytes) +- unqiue files hit per domain + +## Prototype Plan + +- backfill all CDX crawl files (TSV transform?) +- load full GROBID XML (both into minio and into SQL) +- load full fatcat file dump (TSV transform) +- load dumpfilemeta + +## Example Useful Lookups + + + http get :3030/cdx?url=eq.https://coleccionables.mercadolibre.com.ar/arduino-pdf_Installments_NoInterest_BestSellers_YES + http get :3030/file_meta?sha1hex=eq.120582c855a7cc3c70a8527c560d7f27e6027278 + diff --git a/sql/backfill/backfill.md b/sql/backfill/backfill.md new file mode 100644 index 0000000..f1a5f86 --- /dev/null +++ b/sql/backfill/backfill.md @@ -0,0 +1,135 @@ + +SQL Backfill Notes +----------------------- + +GROBID is going to be somewhat complex. + +TODO: +x CDX backfill script (CDX to postgresql direct, bulk inserts, python) +x `file_meta` bulk insert (TSV to postgresql direct, bulk upserts, python) +x GROBID insert (python, dump TSV to minio then postgresql) + +## `cdx` + + #cat example.cdx | rg ' 200 ' | cut -d' ' -f2,3,4,6,9,10,11 + #cat example.cdx | rg ' 200 ' | awk '{print $6 "\t" $3 "\t" $2 "\t" $4 "\t\t" $6 "\t" $11 "\t" $9 "\t" $10}' | b32_hex.py | awk '{print $2 "\t" $3 "\t" $4 "\t" $1 "\t" $6 "\t" $7 "\t" $8}' > cdx.example.tsv + cat example.cdx | ./filter_transform_cdx.py > cdx.example.tsv + + COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.example.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); + +Big HDFS import: + + # but actually didn't import those; don't want to need to re-import + hdfs dfs -get journal_crawl_cdx/* + + cat citeseerx_crawl_2017.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.citeseerx_crawl_2017.tsv + cat gwb-pdf-20171227034923-surt-filter/* | rg ' 200 ' | ./filter_transform_cdx.py > gwb-pdf-20171227034923-surt-filter.tsv + cat UNPAYWALL-PDF-CRAWL-2018-07.filtered.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.UNPAYWALL-PDF-CRAWL-2018-07.filtered.tsv + cat MSAG-PDF-CRAWL-2017.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.MSAG-PDF-CRAWL-2017.tsv + + cat CORE-UPSTREAM-CRAWL-2018-11.sorted.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.CORE-UPSTREAM-CRAWL-2018-11.sorted.tsv + cat DIRECT-OA-CRAWL-2019.pdfs.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.DIRECT-OA-CRAWL-2019.pdfs.tsv + cat DOI-LANDING-CRAWL-2018-06.200_pdf.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.DOI-LANDING-CRAWL-2018-06.200_pdf.tsv + cat OA-JOURNAL-TESTCRAWL-TWO-2018.pdf.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.OA-JOURNAL-TESTCRAWL-TWO-2018.pdf.tsv + cat SEMSCHOLAR-PDF-CRAWL-2017.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.SEMSCHOLAR-PDF-CRAWL-2017.tsv + cat TARGETED-PDF-CRAWL-2017.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.TARGETED-PDF-CRAWL-2017.tsv + cat UNPAYWALL-PDF-CRAWL-2019-04.pdfs_sorted.cdx | rg ' 200 ' | ./filter_transform_cdx.py > cdx.UNPAYWALL-PDF-CRAWL-2019-04.pdfs_sorted.tsv + +TODO: nasty escaping? + +In psql: + + COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.UNPAYWALL-PDF-CRAWL-2018-07.filtered.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); + # ERROR + COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.MSAG-PDF-CRAWL-2017.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); + # ERROR + COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.citeseerx_crawl_2017.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); + # COPY 1653840 + COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.CORE-UPSTREAM-CRAWL-2018-11.sorted.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); + # COPY 2827563 + COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.DIRECT-OA-CRAWL-2019.pdfs.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); + # COPY 10651736 + COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.DOI-LANDING-CRAWL-2018-06.200_pdf.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); + # COPY 768565 + COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.OA-JOURNAL-TESTCRAWL-TWO-2018.pdf.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); + # COPY 5310017 + COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.SEMSCHOLAR-PDF-CRAWL-2017.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); + # COPY 2219839 + COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.TARGETED-PDF-CRAWL-2017.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); + # ERROR + COPY cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) FROM '/sandcrawler-db/backfill/cdx/cdx.UNPAYWALL-PDF-CRAWL-2019-04.pdfs_sorted.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); + # ERROR + +NOTE: these largely didn't work; will need to write a batch importer. + +Batch import process: + + cat UNPAYWALL-PDF-CRAWL-2018-07.filtered.cdx MSAG-PDF-CRAWL-2017.cdx TARGETED-PDF-CRAWL-2017.cdx UNPAYWALL-PDF-CRAWL-2019-04.pdfs_sorted.cdx | ./backfill_cdx.py + # Done: Counter({'raw_lines': 123254127, 'total': 51365599, 'batches': 51365}) + +## `fatcat_file` + + zcat file_export.2019-07-07.json.gz | pv -l | jq -r 'select(.sha1 != null) | [.sha1, .ident, .release_ids[0]] | @tsv' | sort -S 8G | uniq -w 40 > /sandcrawler-db/backfill/fatcat_file.2019-07-07.tsv + +In psql: + + COPY fatcat_file FROM '/sandcrawler-db/backfill/fatcat_file.2019-07-07.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); + # => COPY 24727350 + +## `file_meta` + + zcat /fast/download/file_export.2019-07-07.json.gz | pv -l | jq -r 'select(.md5 != null) | [.sha1, .sha256, .md5, .size, .mimetype] | @tsv' | sort -S 8G | uniq -w 40 > /sandcrawler-db/backfill/file_meta.2019-07-07.tsv + +In psql: + + COPY file_meta FROM '/sandcrawler-db/backfill/file_meta.2019-07-07.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); + # -> COPY 5860092 + +## `petabox` + + zcat /fast/download/file_export.2019-07-07.json.gz | rg '//archive.org/' | pigz > /fast/download/file_export.2019-07-07.petabox.json.gz + zcat /fast/download/file_export.2019-07-07.petabox.json.gz | ./petabox_transform.py | sort -u -S 8G | awk '{print $3 "\t" $1 "\t" $2}' | uniq -s40 | awk '{print $2 "\t" $3 "\t" $1}' > petabox.fatcat_2019-07-07.tsv + +In psql: + + COPY petabox FROM '/sandcrawler-db/backfill/petabox.fatcat_2019-07-07.tsv' WITH (FORMAT TEXT, DELIMITER E'\t', NULL ''); + # -> COPY 2887834 + +## `grobid` + +Quick test: + + zcat /bigger/unpaywall-transfer/2019-07-17-1741.30-dumpgrobidxml/part-00000.gz | cut -f2 | head | ./backfill_grobid.py + +Run big batch: + + ls /bigger/unpaywall-transfer/2019-07-17-1741.30-dumpgrobidxml/part*gz | parallel --progress -j8 'zcat {} | cut -f2 | ./backfill_grobid.py' + # [...] + # Done: Counter({'minio-success': 161605, 'total': 161605, 'raw_lines': 161605, 'batches': 161}) + # [...] + +Was running slow with lots of iowait and 99% jdb2. This seems to be disk I/O. Going to try: + + sudo mount /dev/sdc1 /sandcrawler-minio/ -o data=writeback,noatime,nobarrier + + # -j8: 20+ M/s write, little jdb2 + # -j16: 30+ M/s write, little jdb2 + # -j12: 30+ M/s write, going with this + +For general use should go back to: + + sudo mount /dev/sdc1 /sandcrawler-minio/ -o data=noatime + + # -j4: Still pretty slow, only ~3-5 M/s disk write. jbd2 consistently at 99%, 360 K/s write + +## rough table sizes + + table_name | table_size | indexes_size | total_size + --------------------------------------------------------------+------------+--------------+------------ + "public"."cdx" | 11 GB | 8940 MB | 20 GB + "public"."shadow" | 8303 MB | 7205 MB | 15 GB + "public"."fatcat_file" | 5206 MB | 2094 MB | 7300 MB + "public"."file_meta" | 814 MB | 382 MB | 1196 MB + "public"."petabox" | 403 MB | 594 MB | 997 MB + [...] + diff --git a/sql/backfill/backfill_cdx.py b/sql/backfill/backfill_cdx.py new file mode 100755 index 0000000..1c452ca --- /dev/null +++ b/sql/backfill/backfill_cdx.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python3 +""" +This is a "one-time" tranform helper script for CDX backfill into sandcrawler +postgresql. + +Most of this file was copied from '../python/common.py'. +""" + +import json, os, sys, collections +import base64 +import psycopg2 +import psycopg2.extras + +NORMAL_MIME = ( + 'application/pdf', + 'application/postscript', + 'text/html', + 'text/xml', +) + +def normalize_mime(raw): + raw = raw.lower() + for norm in NORMAL_MIME: + if raw.startswith(norm): + return norm + + # Special cases + if raw.startswith('application/xml'): + return 'text/xml' + if raw.startswith('application/x-pdf'): + return 'application/pdf' + return None + + +def test_normalize_mime(): + assert normalize_mime("asdf") is None + assert normalize_mime("application/pdf") == "application/pdf" + assert normalize_mime("application/pdf+journal") == "application/pdf" + assert normalize_mime("Application/PDF") == "application/pdf" + assert normalize_mime("application/p") is None + assert normalize_mime("application/xml+stuff") == "text/xml" + assert normalize_mime("application/x-pdf") == "application/pdf" + assert normalize_mime("application/x-html") is None + +def b32_hex(s): + s = s.strip().split()[0].lower() + if s.startswith("sha1:"): + s = s[5:] + if len(s) != 32: + return s + return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') + + +def parse_cdx_line(raw_cdx): + + cdx = raw_cdx.split() + if len(cdx) < 11: + return None + + surt = cdx[0] + dt = cdx[1] + url = cdx[2] + mime = normalize_mime(cdx[3]) + http_status = cdx[4] + key = cdx[5] + c_size = cdx[8] + offset = cdx[9] + warc = cdx[10] + + if not (key.isalnum() and c_size.isdigit() and offset.isdigit() + and http_status == "200" and len(key) == 32 and dt.isdigit() + and mime != None): + return None + + if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc): + return None + + # these are the new/specific bits + sha1 = b32_hex(key) + return dict(url=url, datetime=dt, sha1hex=sha1, cdx_sha1hex=None, mimetype=mime, warc_path=warc, warc_csize=int(c_size), warc_offset=int(offset)) + +def insert(cur, batch): + sql = """ + INSERT INTO + cdx (url, datetime, sha1hex, mimetype, warc_path, warc_csize, warc_offset) + VALUES %s + ON CONFLICT ON CONSTRAINT cdx_pkey DO NOTHING + RETURNING 1; + """ + batch = [(d['url'], d['datetime'], d['sha1hex'], d['mimetype'], + d['warc_path'], d['warc_csize'], d['warc_offset']) + for d in batch] + res = psycopg2.extras.execute_values(cur, sql, batch) # fetch=True + #return len(res) + +def stdin_to_pg(): + # no host means it will use local domain socket by default + conn = psycopg2.connect(database="sandcrawler", user="postgres") + cur = conn.cursor() + counts = collections.Counter({'total': 0}) + batch = [] + for l in sys.stdin: + l = l.strip() + if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0: + print("Progress: {}...".format(counts)) + counts['raw_lines'] += 1 + if not l: + continue + info = parse_cdx_line(l) + if not info: + continue + batch.append(info) + counts['total'] += 1 + if len(batch) >= 1000: + insert(cur, batch) + conn.commit() + #counts['inserted'] += i + #counts['existing'] += len(batch) - i + batch = [] + counts['batches'] += 1 + if batch: + insert(cur, batch) + #counts['inserted'] += i + #counts['existing'] += len(batch) - i + batch = [] + conn.commit() + cur.close() + print("Done: {}".format(counts)) + +if __name__=='__main__': + stdin_to_pg() diff --git a/sql/backfill/backfill_file_meta.py b/sql/backfill/backfill_file_meta.py new file mode 100755 index 0000000..e3b40a0 --- /dev/null +++ b/sql/backfill/backfill_file_meta.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 +""" +This is a "one-time" tranform helper script for file_meta backfill into +sandcrawler postgresql. + +Most of this file was copied from '../python/common.py'. +""" + +import json, os, sys, collections +import psycopg2 +import psycopg2.extras + + +def insert(cur, batch): + sql = """ + INSERT INTO + file_meta + VALUES %s + ON CONFLICT DO NOTHING; + """ + res = psycopg2.extras.execute_values(cur, sql, batch) + +def stdin_to_pg(): + # no host means it will use local domain socket by default + conn = psycopg2.connect(database="sandcrawler", user="postgres") + cur = conn.cursor() + counts = collections.Counter({'total': 0}) + batch = [] + for l in sys.stdin: + if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0: + print("Progress: {}...".format(counts)) + counts['raw_lines'] += 1 + if not l.strip(): + continue + info = l.split("\t") + if not info: + continue + assert len(info) == 5 + info[-1] = info[-1].strip() or None + batch.append(info) + counts['total'] += 1 + if len(batch) >= 1000: + insert(cur, batch) + conn.commit() + batch = [] + counts['batches'] += 1 + if batch: + insert(cur, batch) + batch = [] + conn.commit() + cur.close() + print("Done: {}".format(counts)) + +if __name__=='__main__': + stdin_to_pg() diff --git a/sql/backfill/backfill_grobid.py b/sql/backfill/backfill_grobid.py new file mode 100755 index 0000000..08fad7f --- /dev/null +++ b/sql/backfill/backfill_grobid.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +""" +This is a "one-time" tranform helper script for GROBID backfill into +sandcrawler minio and postgresql. +""" + +import json, os, sys, collections, io +import base64 +import requests +from minio import Minio +import psycopg2 +import psycopg2.extras + + +def b32_hex(s): + s = s.strip().split()[0].lower() + if s.startswith("sha1:"): + s = s[5:] + if len(s) != 32: + return s + return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') + +def insert(cur, batch): + sql = """ + INSERT INTO + grobid (sha1hex, grobid_version, status_code, status, fatcat_release, metadata) + VALUES %s + ON CONFLICT DO NOTHING; + """ + batch = [(d['sha1hex'], d['grobid_version'], d['status_code'], d['status'], d['fatcat_release'], d['metadata']) + for d in batch] + res = psycopg2.extras.execute_values(cur, sql, batch) + +def stdin_to_pg(): + mc = Minio('localhost:9000', + access_key=os.environ['MINIO_ACCESS_KEY'], + secret_key=os.environ['MINIO_SECRET_KEY'], + secure=False) + # no host means it will use local domain socket by default + conn = psycopg2.connect(database="sandcrawler", user="postgres") + cur = conn.cursor() + counts = collections.Counter({'total': 0}) + batch = [] + for l in sys.stdin: + if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0: + print("Progress: {}...".format(counts)) + counts['raw_lines'] += 1 + l = l.strip() + if not l: + continue + row = json.loads(l) + if not row: + continue + sha1hex = b32_hex(row['pdf_hash']) + grobid_xml = row['tei_xml'].encode('utf-8') + grobid_xml_len = len(grobid_xml) + grobid_xml = io.BytesIO(grobid_xml) + + key = "{}/{}/{}.tei.xml".format( + sha1hex[0:2], + sha1hex[2:4], + sha1hex) + mc.put_object("grobid", key, grobid_xml, grobid_xml_len, + content_type="application/tei+xml", + metadata=None) + counts['minio-success'] += 1 + + info = dict( + sha1hex=sha1hex, + grobid_version=None, # TODO + status_code=200, + status=None, + fatcat_release=None, + metadata=None, + ) + batch.append(info) + counts['total'] += 1 + if len(batch) >= 1000: + insert(cur, batch) + conn.commit() + batch = [] + counts['batches'] += 1 + if batch: + insert(cur, batch) + batch = [] + conn.commit() + cur.close() + print("Done: {}".format(counts)) + +if __name__=='__main__': + stdin_to_pg() diff --git a/sql/backfill/backfill_grobid_unpaywall.py b/sql/backfill/backfill_grobid_unpaywall.py new file mode 100755 index 0000000..58e9e3c --- /dev/null +++ b/sql/backfill/backfill_grobid_unpaywall.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3 +""" +This is a "one-time" tranform helper script for GROBID backfill into +sandcrawler minio and postgresql. + +This variant of backfill_grobid.py pushes into the unpaywall bucket of +sandcrawler-minio and doesn't push anything to sandcrawler table in general. +""" + +import json, os, sys, collections, io +import base64 +import requests +from minio import Minio +import psycopg2 +import psycopg2.extras + + +def b32_hex(s): + s = s.strip().split()[0].lower() + if s.startswith("sha1:"): + s = s[5:] + if len(s) != 32: + return s + return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') + +def stdin_to_minio(): + mc = Minio('localhost:9000', + access_key=os.environ['MINIO_ACCESS_KEY'], + secret_key=os.environ['MINIO_SECRET_KEY'], + secure=False) + counts = collections.Counter({'total': 0}) + for l in sys.stdin: + if counts['raw_lines'] > 0 and counts['raw_lines'] % 10000 == 0: + print("Progress: {}...".format(counts)) + counts['raw_lines'] += 1 + l = l.strip() + if not l: + continue + row = json.loads(l) + if not row: + continue + sha1hex = b32_hex(row['pdf_hash']) + grobid_xml = row['tei_xml'].encode('utf-8') + grobid_xml_len = len(grobid_xml) + grobid_xml = io.BytesIO(grobid_xml) + + key = "grobid/{}/{}/{}.tei.xml".format( + sha1hex[0:2], + sha1hex[2:4], + sha1hex) + mc.put_object("unpaywall", key, grobid_xml, grobid_xml_len, + content_type="application/tei+xml", + metadata=None) + counts['minio-success'] += 1 + + print("Done: {}".format(counts)) + +if __name__=='__main__': + stdin_to_minio() diff --git a/sql/backfill/filter_transform_cdx.py b/sql/backfill/filter_transform_cdx.py new file mode 100755 index 0000000..3507dfc --- /dev/null +++ b/sql/backfill/filter_transform_cdx.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 +""" +This is a "one-time" tranform helper script for CDX backfill into sandcrawler +postgresql. + +Most of this file was copied from '../python/common.py'. +""" + +import json, os, sys +import base64 + +NORMAL_MIME = ( + 'application/pdf', + 'application/postscript', + 'text/html', + 'text/xml', +) + +def normalize_mime(raw): + raw = raw.lower() + for norm in NORMAL_MIME: + if raw.startswith(norm): + return norm + + # Special cases + if raw.startswith('application/xml'): + return 'text/xml' + if raw.startswith('application/x-pdf'): + return 'application/pdf' + return None + + +def test_normalize_mime(): + assert normalize_mime("asdf") is None + assert normalize_mime("application/pdf") == "application/pdf" + assert normalize_mime("application/pdf+journal") == "application/pdf" + assert normalize_mime("Application/PDF") == "application/pdf" + assert normalize_mime("application/p") is None + assert normalize_mime("application/xml+stuff") == "text/xml" + assert normalize_mime("application/x-pdf") == "application/pdf" + assert normalize_mime("application/x-html") is None + +def b32_hex(s): + s = s.strip().split()[0].lower() + if s.startswith("sha1:"): + s = s[5:] + if len(s) != 32: + return s + return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') + + +def parse_cdx_line(raw_cdx): + + cdx = raw_cdx.split() + if len(cdx) < 11: + return None + + surt = cdx[0] + dt = cdx[1] + url = cdx[2] + mime = normalize_mime(cdx[3]) + http_status = cdx[4] + key = cdx[5] + c_size = cdx[8] + offset = cdx[9] + warc = cdx[10] + + if not (key.isalnum() and c_size.isdigit() and offset.isdigit() + and http_status == "200" and len(key) == 32 and dt.isdigit() + and mime != None): + return None + + if '-' in (surt, dt, url, mime, http_status, key, c_size, offset, warc): + return None + + # these are the new/specific bits + sha1 = b32_hex(key) + return dict(url=url, datetime=dt, sha1hex=sha1, cdx_sha1hex=None, mimetype=mime, warc_path=warc, warc_csize=int(c_size), warc_offset=int(offset)) + +for l in sys.stdin: + l = l.strip() + if not l: + continue + info = parse_cdx_line(l) + if not info: + continue + print("\t".join([info['url'], info['datetime'], info['sha1hex'], info['mimetype'], info['warc_path'], str(info['warc_csize']), str(info['warc_offset'])])) + diff --git a/sql/backfill/petabox_transform.py b/sql/backfill/petabox_transform.py new file mode 100755 index 0000000..b638911 --- /dev/null +++ b/sql/backfill/petabox_transform.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 + +import json, sys, os + +for l in sys.stdin.readlines(): + l = l.strip() + if not l: + continue + r = json.loads(l) + if not r['sha1']: + continue + sha1hex = r['sha1'] + for url in r['urls']: + u = url['url'] + if not '//archive.org/' in u: + continue + u = u.split('/') + if u[2] == 'web.archive.org': + continue + #print(u) + assert u[2] == 'archive.org' and u[3] in ('download', 'serve') + item = u[4] + path = '/'.join(u[5:]) + print("\t".join([item, path, sha1hex])) diff --git a/sql/sandcrawler_schema.sql b/sql/sandcrawler_schema.sql new file mode 100644 index 0000000..fd921ed --- /dev/null +++ b/sql/sandcrawler_schema.sql @@ -0,0 +1,59 @@ + +CREATE TABLE IF NOT EXISTS cdx ( + url TEXT NOT NULL CHECK (octet_length(url) >= 1), + datetime TEXT NOT NULL CHECK (octet_length(datetime) = 14), + sha1hex TEXT NOT NULL CHECK (octet_length(sha1hex) = 40), + cdx_sha1hex TEXT CHECK (octet_length(cdx_sha1hex) = 40), + mimetype TEXT CHECK (octet_length(mimetype) >= 1), + warc_path TEXT CHECK (octet_length(warc_path) >= 1), + warc_csize BIGINT, + warc_offset BIGINT, + row_created TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, + PRIMARY KEY(url, datetime) +); +CREATE INDEX IF NOT EXISTS cdx_sha1hex_idx ON cdx(sha1hex); +CREATE INDEX IF NOT EXISTS cdx_row_created_idx ON cdx(row_created); + +CREATE TABLE IF NOT EXISTS file_meta ( + sha1hex TEXT PRIMARY KEY CHECK (octet_length(sha1hex) = 40), + sha256hex TEXT CHECK (octet_length(sha256hex) = 64), + md5hex TEXT CHECK (octet_length(md5hex) = 32), + size_bytes BIGINT, + mimetype TEXT CHECK (octet_length(mimetype) >= 1) +); + +CREATE TABLE IF NOT EXISTS fatcat_file ( + sha1hex TEXT PRIMARY KEY CHECK (octet_length(sha1hex) = 40), + file_ident TEXT CHECK (octet_length(file_ident) = 26), + first_release_ident TEXT CHECK (octet_length(first_release_ident) = 26) +); + +CREATE TABLE IF NOT EXISTS petabox ( + item TEXT NOT NULL CHECK (octet_length(item) >= 1), + path TEXT NOT NULL CHECK (octet_length(path) >= 1), + sha1hex TEXT NOT NULL CHECK (octet_length(sha1hex) = 40), + PRIMARY KEY(item, path) +); +CREATE INDEX petabox_sha1hex_idx ON petabox(sha1hex); + +CREATE TABLE IF NOT EXISTS grobid ( + sha1hex TEXT PRIMARY KEY CHECK (octet_length(sha1hex) = 40), + updated TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, + grobid_version TEXT CHECK (octet_length(grobid_version) >= 1), + status_code INT NOT NULL, + status TEXT CHECK (octet_length(status) >= 1), + fatcat_release TEXT CHECK (octet_length(fatcat_release) = 26), + metadata JSONB +); +-- CREATE INDEX grobid_fatcat_release_idx ON grobid(fatcat_release); + +CREATE TABLE IF NOT EXISTS shadow ( + shadow_corpus TEXT NOT NULL CHECK (octet_length(shadow_corpus) >= 1), + shadow_id TEXT NOT NULL CHECK (octet_length(shadow_id) >= 1), + sha1hex TEXT NOT NULL CHECK (octet_length(sha1hex) = 40), + doi TEXT CHECK (octet_length(doi) >= 1), + pmid TEXT CHECK (octet_length(pmid) >= 1), + isbn13 TEXT CHECK (octet_length(isbn13) >= 1), + PRIMARY KEY(shadow_corpus, shadow_id) +); +CREATE INDEX shadow_sha1hex_idx ON shadow(sha1hex); -- cgit v1.2.3