diff options
-rw-r--r-- | TODO | 23 | ||||
-rw-r--r-- | hbase/schema_design.md | 8 | ||||
-rw-r--r-- | notes/crawl_cdx_merge.md | 2 | ||||
-rw-r--r-- | pig/filter-cdx-paper-pdfs.pig | 2 | ||||
-rwxr-xr-x | please | 28 | ||||
-rwxr-xr-x | python/deliver_dumpgrobid_to_s3.py | 124 | ||||
-rwxr-xr-x | python/grobid2json.py | 10 | ||||
-rw-r--r-- | python/tests/files/small.json | 6 | ||||
-rw-r--r-- | scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala | 34 |
9 files changed, 227 insertions, 10 deletions
@@ -1,4 +1,25 @@ +## Kafka Pipelines + +- after network split, mass restarting import/harvest stuff seemed to + completely reset consumergroups (!). bunch of LeaderNotFoundError + => change/update consumer group config + => ensure we are recording timestamps to allow timestamp-based resets +- refactor python kafka clients (slack convo with kenji+dvd) + => try librdkafka? + => switch to python-kafka? +- monitoring/alerting of consumergroup offsets + => start with crude python script? +- document: need to restart all consumers after brokers restart +- operate on batches, using threads/async, and reduce worker (process) counts + dramatically + +source of kafka-manager weirdness? + Dec 02 01:05:40 wbgrp-svc263.us.archive.org kafka-manager[7032]: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'user_data': java.nio.BufferUnderflowException + Dec 02 01:05:40 wbgrp-svc263.us.archive.org kafka-manager[7032]: [error] k.m.a.c.KafkaManagedOffsetCache - Failed to get member metadata from group summary and member summary : grobid-hbase-insert : MemberSummary(pykafka-8128e0be-4952-4e79-8644-a52987421259,pykafka,/207.241.225.228,[B@6c368f37,[B@2b007e01) + +## Other + - paper match heuristic: include 10.1007%2F978-3-319-49304-6_18 (URL-escaped slash) - catch EOFFail fetching from wayback - "author counts match" in scoring @@ -8,7 +29,7 @@ => python; talks directly to HBase - author counts should match (+/- one?) -match strategies (hbase columns) +match strategies (hbase columns): - legacy_doi - url_doi - grobid_crossref (doi) diff --git a/hbase/schema_design.md b/hbase/schema_design.md index 67a940f..2db8998 100644 --- a/hbase/schema_design.md +++ b/hbase/schema_design.md @@ -40,6 +40,14 @@ Column families: - `warc` (item and file name) - `offset` - `c_size` (compressed size) + - `meta` (json string) + - `size` (int) + - `mime` (str) + - `magic` (str) + - `magic_mime` (str) + - `sha1` (hex str) + - `md5` (hex str) + - `sha256` (hex str) - `grobid0`: processing status, version, XML and JSON fulltext, JSON metadata. timestamp. Should be compressed! `COMPRESSION => SNAPPY` - `status_code` (signed int; HTTP status from grobid) - `quality` (int or string; we define the meaning ("good"/"marginal"/"bad") diff --git a/notes/crawl_cdx_merge.md b/notes/crawl_cdx_merge.md index d2cffee..d330e9b 100644 --- a/notes/crawl_cdx_merge.md +++ b/notes/crawl_cdx_merge.md @@ -11,7 +11,7 @@ Run script from scratch repo: Assuming we're just looking at PDFs: - zcat CRAWL-2000.cdx.gz | rg -i pdf | sort -S 4G -u | gzip > CRAWL-2000.sorted.cdx.gz + zcat CRAWL-2000.cdx.gz | rg -i pdf | sort -S 4G -u > CRAWL-2000.sorted.cdx ## Old Way diff --git a/pig/filter-cdx-paper-pdfs.pig b/pig/filter-cdx-paper-pdfs.pig index 7e10720..402d340 100644 --- a/pig/filter-cdx-paper-pdfs.pig +++ b/pig/filter-cdx-paper-pdfs.pig @@ -30,7 +30,7 @@ cdx = FILTER cdx OR surt matches '(?i).+\\).*/(pubs|research|publications?|articles?|proceedings?|papers?|fulltext)/.*' -- words in domains - OR surt matches '.*(,hal|,eprint|scielo|redalyc|revues|revistas|research|journal).*\\).*' + OR surt matches '.*(,hal|,eprint|,ojs|,dspace|scielo|redalyc|revues|revistas|research|journal).*\\).*' -- DOI-like pattern in URL OR surt matches '.*\\).*/10\\.\\d{3,5}/.*'; @@ -157,7 +157,9 @@ def run_statuscount(args): datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) cmd = """hadoop jar \ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ - com.twitter.scalding.Tool sandcrawler.HBaseStatusCountJob \ + com.twitter.scalding.Tool \ + -Dmapred.task.timeout=3600000 \ + sandcrawler.HBaseStatusCountJob \ --hdfs \ --app.conf.path scalding/ia_cluster.conf \ --hbase-table wbgrp-journal-extract-0-{env} \ @@ -249,6 +251,27 @@ def run_dumpfilemeta(args): env=args.env) subprocess.call(cmd, shell=True) +def run_dumpgrobidstatuscode(args): + if args.rebuild: + rebuild_scalding() + print("Starting dumpgrobidstatuscode job...") + output = "{}/output-{}/{}-dumpgrobidstatuscode".format( + HDFS_DIR, + args.env, + datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S")) + cmd = """hadoop jar \ + scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \ + com.twitter.scalding.Tool sandcrawler.DumpGrobidStatusCodeJob \ + --hdfs \ + --app.conf.path scalding/ia_cluster.conf \ + --hbase-table wbgrp-journal-extract-0-{env} \ + --zookeeper-hosts {zookeeper_hosts} \ + --output {output}""".format( + output=output, + zookeeper_hosts=ZOOKEEPER_HOSTS, + env=args.env) + subprocess.call(cmd, shell=True) + def run_dumpgrobidmetainsertable(args): if args.rebuild: rebuild_scalding() @@ -433,6 +456,9 @@ def main(): sub_dumpfilemeta = subparsers.add_parser('dump-file-meta') sub_dumpfilemeta.set_defaults(func=run_dumpfilemeta) + sub_dumpgrobidstatuscode = subparsers.add_parser('dump-grobid-status-code') + sub_dumpgrobidstatuscode.set_defaults(func=run_dumpgrobidstatuscode) + sub_dumpgrobidmetainsertable = subparsers.add_parser('dump-grobid-meta-insertable') sub_dumpgrobidmetainsertable.set_defaults(func=run_dumpgrobidmetainsertable) diff --git a/python/deliver_dumpgrobid_to_s3.py b/python/deliver_dumpgrobid_to_s3.py new file mode 100755 index 0000000..ac0949c --- /dev/null +++ b/python/deliver_dumpgrobid_to_s3.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 +""" +Tool for bulk uploading GROBID TEI-XML output from a local filesystem dump +(from HBase) to AWS S3. + +See unpaywall delivery README (in bnewbold's scratch repo) for notes on running +this script for that specific use-case. + +Script takes: +- input TSV: `sha1_hex, json (including grobid0:tei_xml)` + => usually from dumpgrobid, with SHA-1 key transformed to hex, and filtered + down (eg, by join by SHA-1) to a specific manifest +- AWS S3 bucket and prefix + +AWS S3 credentials are passed via environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) + +Output: +- errors/stats to stderr +- log to stdout (redirect to file), prefixed by sha1 + +Requires: +- raven (sentry) +- boto3 (AWS S3 client library) +""" + +import os +import sys +import json +import base64 +import hashlib +import argparse +from collections import Counter + +import boto3 +import raven + +# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable +sentry_client = raven.Client() + + +def b32_hex(s): + """copy/pasta from elsewhere""" + s = s.strip().split()[0].lower() + if s.startswith("sha1:"): + s = s[5:] + if len(s) != 32: + return s + return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8') + + +class DeliverDumpGrobidS3(): + + def __init__(self, s3_bucket, **kwargs): + self.rstore = None + self.count = Counter() + self.s3_bucket = s3_bucket + self.s3_prefix = kwargs.get('s3_prefix', 'grobid/') + self.s3_suffix = kwargs.get('s3_suffix', '.tei.xml') + self.s3_storage_class = kwargs.get('s3_storage_class', 'STANDARD') + self.s3 = boto3.resource('s3') + self.bucket = self.s3.Bucket(self.s3_bucket) + + def run(self, dump_file): + sys.stderr.write("Starting...\n") + for line in dump_file: + line = line.strip().split('\t') + if len(line) != 2: + self.count['skip-line'] += 1 + continue + sha1_hex, grobid_json = line[0], line[1] + if len(sha1_hex) != 40: + sha1_hex = b32_hex(sha1_hex) + assert len(sha1_hex) == 40 + grobid = json.loads(grobid_json) + tei_xml = grobid.get('tei_xml') + if not tei_xml: + print("{}\tskip empty".format(sha1_hex)) + self.count['skip-empty'] += 1 + continue + tei_xml = tei_xml.encode('utf-8') + # upload to AWS S3 + obj = self.bucket.put_object( + Key="{}{}/{}{}".format( + self.s3_prefix, + sha1_hex[0:4], + sha1_hex, + self.s3_suffix, + StorageClass=self.s3_storage_class), + Body=tei_xml) + print("{}\tsuccess\t{}\t{}".format(sha1_hex, obj.key, len(tei_xml))) + self.count['success-s3'] += 1 + sys.stderr.write("{}\n".format(self.count)) + +@sentry_client.capture_exceptions +def main(): + + parser = argparse.ArgumentParser() + parser.add_argument('--s3-bucket', + required=True, + type=str, + help='AWS S3 bucket to upload into') + parser.add_argument('--s3-prefix', + type=str, + default="grobid/", + help='key prefix for items created in bucket') + parser.add_argument('--s3-suffix', + type=str, + default=".tei.xml", + help='file suffix for created objects') + parser.add_argument('--s3-storage-class', + type=str, + default="STANDARD", + help='AWS S3 storage class (redundancy) to use') + parser.add_argument('dump_file', + help="TSV/JSON dump file", + default=sys.stdin, + type=argparse.FileType('r')) + args = parser.parse_args() + + worker = DeliverDumpGrobidS3(**args.__dict__) + worker.run(args.dump_file) + +if __name__ == '__main__': # pragma: no cover + main() diff --git a/python/grobid2json.py b/python/grobid2json.py index ca460f8..d438d48 100755 --- a/python/grobid2json.py +++ b/python/grobid2json.py @@ -31,9 +31,13 @@ import xml.etree.ElementTree as ET ns = "http://www.tei-c.org/ns/1.0" def all_authors(elem): - names = [' '.join([e.findtext('./{%s}forename' % ns) or '', e.findtext('./{%s}surname' % ns) or '']).strip() - for e in elem.findall('.//{%s}author/{%s}persName' % (ns, ns))] - return [dict(name=n) for n in names] + names = [] + for e in elem.findall('.//{%s}author/{%s}persName' % (ns, ns)): + given_name = e.findtext('./{%s}forename' % ns) or None + surname = e.findtext('./{%s}surname' % ns) or None + full_name = '{} {}'.format(given_name or '', surname or '').strip() + names.append(dict(name=full_name, given_name=given_name, surname=surname)) + return names def journal_info(elem): diff --git a/python/tests/files/small.json b/python/tests/files/small.json index 208fb49..49a5671 100644 --- a/python/tests/files/small.json +++ b/python/tests/files/small.json @@ -1,8 +1,8 @@ { "title": "Dummy Example File", "authors": [ - {"name": "Brewster Kahle"}, - {"name": "J Doe"} + {"name": "Brewster Kahle", "given_name": "Brewster", "surname": "Kahle"}, + {"name": "J Doe", "given_name": "J", "surname": "Doe"} ], "journal": { "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678", @@ -15,7 +15,7 @@ "date": "2000", "doi": null, "citations": [ - { "authors": [{"name": "A Seaperson"}], + { "authors": [{"name": "A Seaperson", "given_name": "A", "surname": "Seaperson"}], "date": "2001", "id": "b0", "index": 0, diff --git a/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala b/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala new file mode 100644 index 0000000..42b3464 --- /dev/null +++ b/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala @@ -0,0 +1,34 @@ +package sandcrawler + +import java.util.Properties + +import cascading.property.AppProps +import cascading.tuple.Fields +import com.twitter.scalding._ +import com.twitter.scalding.typed.TDsl._ +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.hbase.util.Bytes +import parallelai.spyglass.base.JobBase +import parallelai.spyglass.hbase.HBaseConstants.SourceMode +import parallelai.spyglass.hbase.HBasePipeConversions +import parallelai.spyglass.hbase.HBaseSource + +// Dumps status code for each GROBID-processed file. Good for crawl/corpus +// analytics, if we consider GROBID status a rough "is this a paper" metric. +class DumpGrobidStatusCodeJob(args: Args) extends JobBase(args) with HBasePipeConversions { + + val metaPipe : TypedPipe[(String, Long)] = HBaseBuilder.build(args("hbase-table"), + args("zookeeper-hosts"), + List("grobid0:status_code"), + SourceMode.SCAN_ALL) + .read + .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "status_code")) + .filter { case (_, status_code) => status_code != null } + .map { case (key, status_code) => + (Bytes.toString(key.copyBytes()), + Bytes.toLong(status_code.copyBytes())) + }; + + metaPipe.write(TypedTsv[(String,Long)](args("output"))) + +} |