aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--TODO23
-rw-r--r--hbase/schema_design.md8
-rw-r--r--notes/crawl_cdx_merge.md2
-rw-r--r--pig/filter-cdx-paper-pdfs.pig2
-rwxr-xr-xplease28
-rwxr-xr-xpython/deliver_dumpgrobid_to_s3.py124
-rwxr-xr-xpython/grobid2json.py10
-rw-r--r--python/tests/files/small.json6
-rw-r--r--scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala34
9 files changed, 227 insertions, 10 deletions
diff --git a/TODO b/TODO
index 1f1c2b9..77b48c9 100644
--- a/TODO
+++ b/TODO
@@ -1,4 +1,25 @@
+## Kafka Pipelines
+
+- after network split, mass restarting import/harvest stuff seemed to
+ completely reset consumergroups (!). bunch of LeaderNotFoundError
+ => change/update consumer group config
+ => ensure we are recording timestamps to allow timestamp-based resets
+- refactor python kafka clients (slack convo with kenji+dvd)
+ => try librdkafka?
+ => switch to python-kafka?
+- monitoring/alerting of consumergroup offsets
+ => start with crude python script?
+- document: need to restart all consumers after brokers restart
+- operate on batches, using threads/async, and reduce worker (process) counts
+ dramatically
+
+source of kafka-manager weirdness?
+ Dec 02 01:05:40 wbgrp-svc263.us.archive.org kafka-manager[7032]: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'user_data': java.nio.BufferUnderflowException
+ Dec 02 01:05:40 wbgrp-svc263.us.archive.org kafka-manager[7032]: [error] k.m.a.c.KafkaManagedOffsetCache - Failed to get member metadata from group summary and member summary : grobid-hbase-insert : MemberSummary(pykafka-8128e0be-4952-4e79-8644-a52987421259,pykafka,/207.241.225.228,[B@6c368f37,[B@2b007e01)
+
+## Other
+
- paper match heuristic: include 10.1007%2F978-3-319-49304-6_18 (URL-escaped slash)
- catch EOFFail fetching from wayback
- "author counts match" in scoring
@@ -8,7 +29,7 @@
=> python; talks directly to HBase
- author counts should match (+/- one?)
-match strategies (hbase columns)
+match strategies (hbase columns):
- legacy_doi
- url_doi
- grobid_crossref (doi)
diff --git a/hbase/schema_design.md b/hbase/schema_design.md
index 67a940f..2db8998 100644
--- a/hbase/schema_design.md
+++ b/hbase/schema_design.md
@@ -40,6 +40,14 @@ Column families:
- `warc` (item and file name)
- `offset`
- `c_size` (compressed size)
+ - `meta` (json string)
+ - `size` (int)
+ - `mime` (str)
+ - `magic` (str)
+ - `magic_mime` (str)
+ - `sha1` (hex str)
+ - `md5` (hex str)
+ - `sha256` (hex str)
- `grobid0`: processing status, version, XML and JSON fulltext, JSON metadata. timestamp. Should be compressed! `COMPRESSION => SNAPPY`
- `status_code` (signed int; HTTP status from grobid)
- `quality` (int or string; we define the meaning ("good"/"marginal"/"bad")
diff --git a/notes/crawl_cdx_merge.md b/notes/crawl_cdx_merge.md
index d2cffee..d330e9b 100644
--- a/notes/crawl_cdx_merge.md
+++ b/notes/crawl_cdx_merge.md
@@ -11,7 +11,7 @@ Run script from scratch repo:
Assuming we're just looking at PDFs:
- zcat CRAWL-2000.cdx.gz | rg -i pdf | sort -S 4G -u | gzip > CRAWL-2000.sorted.cdx.gz
+ zcat CRAWL-2000.cdx.gz | rg -i pdf | sort -S 4G -u > CRAWL-2000.sorted.cdx
## Old Way
diff --git a/pig/filter-cdx-paper-pdfs.pig b/pig/filter-cdx-paper-pdfs.pig
index 7e10720..402d340 100644
--- a/pig/filter-cdx-paper-pdfs.pig
+++ b/pig/filter-cdx-paper-pdfs.pig
@@ -30,7 +30,7 @@ cdx = FILTER cdx
OR surt matches '(?i).+\\).*/(pubs|research|publications?|articles?|proceedings?|papers?|fulltext)/.*'
-- words in domains
- OR surt matches '.*(,hal|,eprint|scielo|redalyc|revues|revistas|research|journal).*\\).*'
+ OR surt matches '.*(,hal|,eprint|,ojs|,dspace|scielo|redalyc|revues|revistas|research|journal).*\\).*'
-- DOI-like pattern in URL
OR surt matches '.*\\).*/10\\.\\d{3,5}/.*';
diff --git a/please b/please
index 10c591d..8fb7f19 100755
--- a/please
+++ b/please
@@ -157,7 +157,9 @@ def run_statuscount(args):
datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
cmd = """hadoop jar \
scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
- com.twitter.scalding.Tool sandcrawler.HBaseStatusCountJob \
+ com.twitter.scalding.Tool \
+ -Dmapred.task.timeout=3600000 \
+ sandcrawler.HBaseStatusCountJob \
--hdfs \
--app.conf.path scalding/ia_cluster.conf \
--hbase-table wbgrp-journal-extract-0-{env} \
@@ -249,6 +251,27 @@ def run_dumpfilemeta(args):
env=args.env)
subprocess.call(cmd, shell=True)
+def run_dumpgrobidstatuscode(args):
+ if args.rebuild:
+ rebuild_scalding()
+ print("Starting dumpgrobidstatuscode job...")
+ output = "{}/output-{}/{}-dumpgrobidstatuscode".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
+ cmd = """hadoop jar \
+ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
+ com.twitter.scalding.Tool sandcrawler.DumpGrobidStatusCodeJob \
+ --hdfs \
+ --app.conf.path scalding/ia_cluster.conf \
+ --hbase-table wbgrp-journal-extract-0-{env} \
+ --zookeeper-hosts {zookeeper_hosts} \
+ --output {output}""".format(
+ output=output,
+ zookeeper_hosts=ZOOKEEPER_HOSTS,
+ env=args.env)
+ subprocess.call(cmd, shell=True)
+
def run_dumpgrobidmetainsertable(args):
if args.rebuild:
rebuild_scalding()
@@ -433,6 +456,9 @@ def main():
sub_dumpfilemeta = subparsers.add_parser('dump-file-meta')
sub_dumpfilemeta.set_defaults(func=run_dumpfilemeta)
+ sub_dumpgrobidstatuscode = subparsers.add_parser('dump-grobid-status-code')
+ sub_dumpgrobidstatuscode.set_defaults(func=run_dumpgrobidstatuscode)
+
sub_dumpgrobidmetainsertable = subparsers.add_parser('dump-grobid-meta-insertable')
sub_dumpgrobidmetainsertable.set_defaults(func=run_dumpgrobidmetainsertable)
diff --git a/python/deliver_dumpgrobid_to_s3.py b/python/deliver_dumpgrobid_to_s3.py
new file mode 100755
index 0000000..ac0949c
--- /dev/null
+++ b/python/deliver_dumpgrobid_to_s3.py
@@ -0,0 +1,124 @@
+#!/usr/bin/env python3
+"""
+Tool for bulk uploading GROBID TEI-XML output from a local filesystem dump
+(from HBase) to AWS S3.
+
+See unpaywall delivery README (in bnewbold's scratch repo) for notes on running
+this script for that specific use-case.
+
+Script takes:
+- input TSV: `sha1_hex, json (including grobid0:tei_xml)`
+ => usually from dumpgrobid, with SHA-1 key transformed to hex, and filtered
+ down (eg, by join by SHA-1) to a specific manifest
+- AWS S3 bucket and prefix
+
+AWS S3 credentials are passed via environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
+
+Output:
+- errors/stats to stderr
+- log to stdout (redirect to file), prefixed by sha1
+
+Requires:
+- raven (sentry)
+- boto3 (AWS S3 client library)
+"""
+
+import os
+import sys
+import json
+import base64
+import hashlib
+import argparse
+from collections import Counter
+
+import boto3
+import raven
+
+# Yep, a global. Gets DSN from `SENTRY_DSN` environment variable
+sentry_client = raven.Client()
+
+
+def b32_hex(s):
+ """copy/pasta from elsewhere"""
+ s = s.strip().split()[0].lower()
+ if s.startswith("sha1:"):
+ s = s[5:]
+ if len(s) != 32:
+ return s
+ return base64.b16encode(base64.b32decode(s.upper())).lower().decode('utf-8')
+
+
+class DeliverDumpGrobidS3():
+
+ def __init__(self, s3_bucket, **kwargs):
+ self.rstore = None
+ self.count = Counter()
+ self.s3_bucket = s3_bucket
+ self.s3_prefix = kwargs.get('s3_prefix', 'grobid/')
+ self.s3_suffix = kwargs.get('s3_suffix', '.tei.xml')
+ self.s3_storage_class = kwargs.get('s3_storage_class', 'STANDARD')
+ self.s3 = boto3.resource('s3')
+ self.bucket = self.s3.Bucket(self.s3_bucket)
+
+ def run(self, dump_file):
+ sys.stderr.write("Starting...\n")
+ for line in dump_file:
+ line = line.strip().split('\t')
+ if len(line) != 2:
+ self.count['skip-line'] += 1
+ continue
+ sha1_hex, grobid_json = line[0], line[1]
+ if len(sha1_hex) != 40:
+ sha1_hex = b32_hex(sha1_hex)
+ assert len(sha1_hex) == 40
+ grobid = json.loads(grobid_json)
+ tei_xml = grobid.get('tei_xml')
+ if not tei_xml:
+ print("{}\tskip empty".format(sha1_hex))
+ self.count['skip-empty'] += 1
+ continue
+ tei_xml = tei_xml.encode('utf-8')
+ # upload to AWS S3
+ obj = self.bucket.put_object(
+ Key="{}{}/{}{}".format(
+ self.s3_prefix,
+ sha1_hex[0:4],
+ sha1_hex,
+ self.s3_suffix,
+ StorageClass=self.s3_storage_class),
+ Body=tei_xml)
+ print("{}\tsuccess\t{}\t{}".format(sha1_hex, obj.key, len(tei_xml)))
+ self.count['success-s3'] += 1
+ sys.stderr.write("{}\n".format(self.count))
+
+@sentry_client.capture_exceptions
+def main():
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--s3-bucket',
+ required=True,
+ type=str,
+ help='AWS S3 bucket to upload into')
+ parser.add_argument('--s3-prefix',
+ type=str,
+ default="grobid/",
+ help='key prefix for items created in bucket')
+ parser.add_argument('--s3-suffix',
+ type=str,
+ default=".tei.xml",
+ help='file suffix for created objects')
+ parser.add_argument('--s3-storage-class',
+ type=str,
+ default="STANDARD",
+ help='AWS S3 storage class (redundancy) to use')
+ parser.add_argument('dump_file',
+ help="TSV/JSON dump file",
+ default=sys.stdin,
+ type=argparse.FileType('r'))
+ args = parser.parse_args()
+
+ worker = DeliverDumpGrobidS3(**args.__dict__)
+ worker.run(args.dump_file)
+
+if __name__ == '__main__': # pragma: no cover
+ main()
diff --git a/python/grobid2json.py b/python/grobid2json.py
index ca460f8..d438d48 100755
--- a/python/grobid2json.py
+++ b/python/grobid2json.py
@@ -31,9 +31,13 @@ import xml.etree.ElementTree as ET
ns = "http://www.tei-c.org/ns/1.0"
def all_authors(elem):
- names = [' '.join([e.findtext('./{%s}forename' % ns) or '', e.findtext('./{%s}surname' % ns) or '']).strip()
- for e in elem.findall('.//{%s}author/{%s}persName' % (ns, ns))]
- return [dict(name=n) for n in names]
+ names = []
+ for e in elem.findall('.//{%s}author/{%s}persName' % (ns, ns)):
+ given_name = e.findtext('./{%s}forename' % ns) or None
+ surname = e.findtext('./{%s}surname' % ns) or None
+ full_name = '{} {}'.format(given_name or '', surname or '').strip()
+ names.append(dict(name=full_name, given_name=given_name, surname=surname))
+ return names
def journal_info(elem):
diff --git a/python/tests/files/small.json b/python/tests/files/small.json
index 208fb49..49a5671 100644
--- a/python/tests/files/small.json
+++ b/python/tests/files/small.json
@@ -1,8 +1,8 @@
{
"title": "Dummy Example File",
"authors": [
- {"name": "Brewster Kahle"},
- {"name": "J Doe"}
+ {"name": "Brewster Kahle", "given_name": "Brewster", "surname": "Kahle"},
+ {"name": "J Doe", "given_name": "J", "surname": "Doe"}
],
"journal": {
"name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678",
@@ -15,7 +15,7 @@
"date": "2000",
"doi": null,
"citations": [
- { "authors": [{"name": "A Seaperson"}],
+ { "authors": [{"name": "A Seaperson", "given_name": "A", "surname": "Seaperson"}],
"date": "2001",
"id": "b0",
"index": 0,
diff --git a/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala b/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala
new file mode 100644
index 0000000..42b3464
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/DumpGrobidStatusCodeJob.scala
@@ -0,0 +1,34 @@
+package sandcrawler
+
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+// Dumps status code for each GROBID-processed file. Good for crawl/corpus
+// analytics, if we consider GROBID status a rough "is this a paper" metric.
+class DumpGrobidStatusCodeJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val metaPipe : TypedPipe[(String, Long)] = HBaseBuilder.build(args("hbase-table"),
+ args("zookeeper-hosts"),
+ List("grobid0:status_code"),
+ SourceMode.SCAN_ALL)
+ .read
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "status_code"))
+ .filter { case (_, status_code) => status_code != null }
+ .map { case (key, status_code) =>
+ (Bytes.toString(key.copyBytes()),
+ Bytes.toLong(status_code.copyBytes()))
+ };
+
+ metaPipe.write(TypedTsv[(String,Long)](args("output")))
+
+}