aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xplease81
-rw-r--r--scalding/build.sbt2
-rw-r--r--scalding/src/main/resources/slug-blacklist.txt427
-rw-r--r--scalding/src/main/scala/sandcrawler/CrossrefScorable.scala2
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorable.scala12
-rw-r--r--scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala62
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala37
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala32
-rw-r--r--scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala10
-rw-r--r--scalding/src/main/scala/sandcrawler/ScorableFeatures.scala51
-rw-r--r--scalding/src/main/scala/sandcrawler/ScoreJob.scala13
-rw-r--r--scalding/src/main/scala/sandcrawler/StringUtilities.scala2
-rw-r--r--scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala124
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala71
-rw-r--r--scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala41
-rw-r--r--scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala13
-rw-r--r--scalding/src/test/scala/sandcrawler/ScoreJobTest.scala5
17 files changed, 913 insertions, 72 deletions
diff --git a/please b/please
index 1a992f2..b32dd79 100755
--- a/please
+++ b/please
@@ -95,6 +95,27 @@ def run_rowcount(args):
env=args.env)
subprocess.call(cmd, shell=True)
+def run_statuscodecount(args):
+ if args.rebuild:
+ rebuild_scalding()
+ print("Starting statuscodecount job...")
+ output = "{}/output-{}/{}-statuscodecount".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
+ cmd = """hadoop jar \
+ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
+ com.twitter.scalding.Tool sandcrawler.HBaseStatusCodeCountJob \
+ --hdfs \
+ --app.conf.path scalding/ia_cluster.conf \
+ --hbase-table wbgrp-journal-extract-0-{env} \
+ --zookeeper-hosts {zookeeper_hosts} \
+ --output {output}""".format(
+ output=output,
+ zookeeper_hosts=ZOOKEEPER_HOSTS,
+ env=args.env)
+ subprocess.call(cmd, shell=True)
+
def run_statuscount(args):
if args.rebuild:
rebuild_scalding()
@@ -126,10 +147,15 @@ def run_matchcrossref(args):
datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
# Notes: -D options must come after Tool but before class name
# https://github.com/twitter/scalding/wiki/Frequently-asked-questions#how-do-i-pass-parameters-to-my-hadoop-job-number-of-reducers--memory-options--etc-
+ # Compression: changed due to errors in production
+ # https://stackoverflow.com/a/11336820/4682349
cmd = """hadoop jar \
scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
com.twitter.scalding.Tool \
-Dmapred.reduce.tasks={reducers} \
+ -Dcascading.spill.list.threshold=500000 \
+ -D mapred.output.compress=false \
+ -Dmapred.compress.map.output=true\
sandcrawler.ScoreJob \
--hdfs \
--app.conf.path scalding/ia_cluster.conf \
@@ -144,6 +170,51 @@ def run_matchcrossref(args):
crossref_input=args.crossref_input)
subprocess.call(cmd, shell=True)
+def run_grobidscorabledump(args):
+ if args.rebuild:
+ rebuild_scalding()
+ print("Starting grobid-scorable-dump job...")
+ output = "{}/output-{}/{}-grobidscorabledump".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"))
+ cmd = """hadoop jar \
+ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
+ com.twitter.scalding.Tool sandcrawler.GrobidScorableDumpJob \
+ --hdfs \
+ --app.conf.path scalding/ia_cluster.conf \
+ --hbase-table wbgrp-journal-extract-0-{env} \
+ --zookeeper-hosts {zookeeper_hosts} \
+ --output {output}""".format(
+ output=output,
+ zookeeper_hosts=ZOOKEEPER_HOSTS,
+ env=args.env)
+ subprocess.call(cmd, shell=True)
+
+def run_colcount(args):
+ if args.rebuild:
+ rebuild_scalding()
+ print("Starting colcount job...")
+ output = "{}/output-{}/{}-colcount-{}".format(
+ HDFS_DIR,
+ args.env,
+ datetime.strftime(datetime.now(), "%Y-%m-%d-%H%M.%S"),
+ args.column.replace(':', '_'))
+ cmd = """hadoop jar \
+ scalding/target/scala-2.11/sandcrawler-assembly-0.2.0-SNAPSHOT.jar \
+ com.twitter.scalding.Tool sandcrawler.HBaseColCountJob \
+ --hdfs \
+ --app.conf.path scalding/ia_cluster.conf \
+ --hbase-table wbgrp-journal-extract-0-{env} \
+ --zookeeper-hosts {zookeeper_hosts} \
+ --column {column} \
+ --output {output}""".format(
+ column=args.column,
+ output=output,
+ zookeeper_hosts=ZOOKEEPER_HOSTS,
+ env=args.env)
+ subprocess.call(cmd, shell=True)
+
def main():
parser = argparse.ArgumentParser()
@@ -174,6 +245,9 @@ def main():
sub_statuscount = subparsers.add_parser('status-count')
sub_statuscount.set_defaults(func=run_statuscount)
+ sub_statuscodecount = subparsers.add_parser('status-code-count')
+ sub_statuscodecount.set_defaults(func=run_statuscodecount)
+
sub_matchcrossref = subparsers.add_parser('match-crossref')
sub_matchcrossref.set_defaults(func=run_matchcrossref)
sub_matchcrossref.add_argument('crossref_input',
@@ -182,6 +256,13 @@ def main():
help="number of reducers to run",
type=int, default=30)
+ sub_grobidscorabledump = subparsers.add_parser('grobid-scorable-dump')
+ sub_grobidscorabledump.set_defaults(func=run_grobidscorabledump)
+
+ sub_colcount = subparsers.add_parser('col-count')
+ sub_colcount.set_defaults(func=run_colcount)
+ sub_colcount.add_argument('column',
+ help="column name to use in count")
args = parser.parse_args()
if not args.__dict__.get("func"):
diff --git a/scalding/build.sbt b/scalding/build.sbt
index d477399..01f55ca 100644
--- a/scalding/build.sbt
+++ b/scalding/build.sbt
@@ -42,7 +42,7 @@ lazy val root = (project in file(".")).
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % hadoopVersion,
libraryDependencies += "org.apache.hadoop" % "hadoop-mapreduce-client-jobclient" % hadoopVersion classifier "tests",
libraryDependencies += "org.apache.hbase" % "hbase-common" % hbaseVersion,
- libraryDependencies += "parallelai" % "parallelai.spyglass" % "2.11_0.17.2_cdh5.3.1",
+ libraryDependencies += "parallelai" % "parallelai.spyglass" % "2.11_0.17.2_cdh5.3.1-p1",
// cargo-culted from twitter/scalding's build.sbt
// hint via https://stackoverflow.com/questions/23280494/sbt-assembly-error-deduplicate-different-file-contents-found-in-the-following#23280952
diff --git a/scalding/src/main/resources/slug-blacklist.txt b/scalding/src/main/resources/slug-blacklist.txt
new file mode 100644
index 0000000..6bc947b
--- /dev/null
+++ b/scalding/src/main/resources/slug-blacklist.txt
@@ -0,0 +1,427 @@
+abbreviations
+abstract
+acknowledgements
+article
+authorreply
+authorsreply
+bookreview
+bookreviews
+casereport
+commentary
+commentaryon
+commenton
+commentto
+contents
+correspondence
+dedication
+editorialadvisoryboard
+focus
+hypothesis
+inbrief
+introduction
+introductiontotheissue
+lettertotheeditor
+listofabbreviations
+note
+overview
+preface
+references
+results
+review
+reviewarticle
+summary
+title
+name
+website
+technicalreport
+abstractsofaapaposterandpodiumpresentations
+affiliation
+authorindexforvolume81
+bookreviewssection
+links
+ll
+description
+indicegeneral
+genealogy
+summariesofkeyjournalarticles
+summer
+0
+jobdescription
+socialengineering
+approximation
+thetimes
+atribute
+components
+entscheidungsverzeichnis
+indexofauthorsandtitles
+inventions
+newlyelectedmembersofthecollege
+summaryofproceedings
+appointmentsandstaffchanges
+classes
+finalexam
+partone
+referenciasbibliograficas
+abstractsofthesesfromthescandinaviancountries
+acknowledgementsvii
+messagefromgeneralcochairs
+mm
+redaktorensforord
+109
+place
+computerscience
+medicinalchemistry
+homework
+sun
+dedicatoria
+outline
+affect
+section
+informationtoauthors
+theeditorsdesk
+entrevista
+thefirstauthorreplies
+councilminutes
+no
+furtherreadings
+printing
+glosario
+lucina
+mrsnews
+85
+bookofabstracts
+literaturrundschau
+thismonthin
+listedestableaux
+sommario
+specialsection
+revieweracknowledgement2013
+abstractnotsubmittedforonlinepublication
+essays
+tableofcontentsandprologue
+76
+callforarticles
+ataglance
+fichatecnica
+literacy
+theyearinreview
+print
+symptom
+66
+additionalresources
+unitednations
+meetingabstracts
+annualacknowledgementofmanuscriptreviewers
+specifications
+boardoftrustees
+timemanagement
+papers
+importantnotice
+annexa
+linearregression
+chaptertwo
+finalreport
+suggestedreadings
+essay
+copyrightform
+resumo
+bigdata
+chapter10
+publication
+changes
+presentacio
+ai
+materialsafetydatasheet
+home
+dearreaders
+acknowledgmentofreferees
+maintenance
+question
+listofpublications
+responsetothelettertotheeditor
+communiquedepresse
+58
+noii
+litteraturverzeichniss
+whatshappening
+chairmansopeningremarks
+editorscorrespondence
+award
+thebasics
+56
+conservation
+blood
+chapter7
+chaos
+citation
+memoranda
+figure3
+distribution
+regression
+synthese
+context
+editorialconsultants
+theoreticalbackground
+lecture
+electrophoresis
+editorspicks
+introductorycomments
+collaborateurs
+recensions
+personalandmiscellaneous
+issuesandevents
+theworldbank
+51
+reviewsanddescriptionsoftablesandbooks
+congratulations
+definition
+yourquestionsanswered
+chapterone
+missionstatement
+62
+newsandreviews
+decisionmaking
+41
+projectmanagement
+community
+metaanalysis
+magazin
+other
+linearalgebra
+online
+exercises
+languageteaching
+address
+radiology
+foreward
+heartfailure
+editoriale
+memorandum
+editorialinformation
+revieweracknowledgement
+safety
+21
+features
+aboutauthors
+messagefromtheprogramcochairs
+health
+security
+literaturecited
+meetingsandconferences
+messagefromthechairs
+feature
+meetingsofinterest
+introductiongenerale
+pressrelease
+lungcancer
+6
+7
+institutenews
+researchresearchers
+profile
+11
+5
+rehabilitation
+documents
+currentresearch
+4
+originalarticle
+committee
+endnotes
+chapter1
+originalarticles
+guidelinesforcontributors
+chapteroneintroduction
+aboutthecover
+membershipapplication
+awardsappointmentsannouncements
+materialsandmethods
+symposium
+abstractsofcommunications
+oralabstracts
+insidethisissue
+abreviations
+ear
+editorialstaff
+proceedings
+commentaries
+perspective
+agradecimientos
+iii
+glossaryofterms
+appendix1
+editorialeditorial
+interview
+fortherecord
+abstractwithdrawn
+highlightsfromthisissue
+materials
+keywords
+editorscomment
+editorscorner
+readersforum
+messagefromtheprogramchairs
+authorguidelines
+resumes
+reviewessay
+editorinchief
+8
+frequentlyaskedquestions
+aimsandscope
+abouttheeditors
+indice
+editorialstatement
+sommaire
+inthestudy
+listofreferees
+background
+bookreviewsandnotices
+articlesofsignificantinterestselectedfromthisissuebytheeditors
+fromthepresident
+panorama
+welcome
+history
+continuingeducation
+livresrecus
+acknowledgementtoreferees
+equipment
+coverimage
+1
+summaries
+positionsavailable
+y
+congresscalendar
+conferencereport
+upcomingevents
+posterpresentations
+editorsintroduction
+rezension
+appendixb
+2
+generalinformation
+theauthors
+notesandcomments
+subscriptions
+w
+listoffigures
+communication
+calendarofmeetings
+workscited
+n
+o
+d
+editorialcomments
+listofreviewers
+notitle
+editorschoice
+lettertotheeditors
+comments
+sumario
+editorialintroduction
+resenas
+introduccion
+classifieds
+tocorrespondents
+notesforcontributors
+forthcomingarticles
+bibliografia
+ii
+editorialsoftwaresurveysection
+listofparticipants
+advertisersindex
+generaldiscussion
+paperstoappearinforthcomingissues
+indexofauthors
+letterfromtheeditor
+chapter1introduction
+organizingcommittee
+noticeboard
+guideforauthors
+chapteri
+resources
+resume
+u
+p
+addendum
+impressum
+copyrightnotice
+r
+associationnews
+classified
+s
+l
+corrigenda
+presentacion
+instructionsforauthors
+resumen
+editorialnote
+comptesrendus
+response
+curriculumvitae
+obituaries
+meetings
+education
+e
+m
+a
+forthcomingevents
+reviewers
+abouttheauthors
+fromtheeditors
+i
+instructionstoauthors
+newproducts
+callforpapers
+blankpage
+apresentacao
+publishersnote
+newbooks
+corrections
+tabledesmatieres
+calendarofevents
+editorsnote
+shorternotices
+listofcontributors
+notesandnews
+editorialcomment
+newsandnotes
+comment
+inmemoriam
+presentation
+executivesummary
+guesteditorial
+avantpropos
+discussion
+glossary
+letters
+notes
+reply
+contributors
+acknowledgments
+fromtheeditor
+buchbesprechungen
+obituary
+inthisissue
+notesoncontributors
+conclusion
+einleitung
+inhalt
+bibliography
+titeleiinhaltsverzeichnis
+booksreceived
+calendar
+inhaltsverzeichnis
+corrigendum
+subjectindex
+errata
+frontmatter
+correction
+abstracts
+letterstotheeditor
+foreword
+tableofcontents
+authorindex
+index
+erratum
+editorialboard
+editorial
diff --git a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
index ff8201a..5d1eaf5 100644
--- a/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/CrossrefScorable.scala
@@ -35,7 +35,7 @@ object CrossrefScorable {
new MapFeatures(Scorable.NoSlug, json)
} else {
// bnewbold: not checking that titles(0) is non-null/non-empty; case would be, in JSON, "title": [ null ]
- val sf : ScorableFeatures = new ScorableFeatures(title=titles(0), doi=doi)
+ val sf : ScorableFeatures = ScorableFeatures.create(title=titles(0), doi=doi)
new MapFeatures(sf.toSlug, sf.toString)
}
} else {
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
index 9a09e05..e510f75 100644
--- a/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorable.scala
@@ -24,10 +24,10 @@ class GrobidScorable extends Scorable with HBasePipeConversions {
getSource(args)
.read
// Can't just "fromBytesWritable" because we have multiple types?
- .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "tei_json", "status_code"))
- .filter { case (_, tei_json, status_code) => tei_json != null && status_code != null }
- .map { case (key, tei_json, status_code) =>
- (Bytes.toString(key.copyBytes()), Bytes.toString(tei_json.copyBytes()), Bytes.toLong(status_code.copyBytes()))
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code"))
+ .filter { case (_, metadata, status_code) => metadata != null && status_code != null }
+ .map { case (key, metadata, status_code) =>
+ (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes()))
}
// TODO: Should I combine next two stages for efficiency?
.collect { case (key, json, StatusOK) => (key, json) }
@@ -37,7 +37,7 @@ class GrobidScorable extends Scorable with HBasePipeConversions {
object GrobidScorable {
def getHBaseSource(table : String, host : String) : HBaseSource = {
- HBaseBuilder.build(table, host, List("grobid0:tei_json", "grobid0:status_code"), SourceMode.SCAN_ALL)
+ HBaseBuilder.build(table, host, List("grobid0:metadata", "grobid0:status_code"), SourceMode.SCAN_ALL)
}
def jsonToMapFeatures(key : String, json : String) : MapFeatures = {
@@ -45,7 +45,7 @@ object GrobidScorable {
case None => MapFeatures(Scorable.NoSlug, json)
case Some(map) => {
if (map contains "title") {
- new ScorableFeatures(Scorable.getString(map, "title"), sha1=key).toMapFeatures
+ ScorableFeatures.create(title=Scorable.getString(map, "title"), sha1=key).toMapFeatures
} else {
MapFeatures(Scorable.NoSlug, json)
}
diff --git a/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala
new file mode 100644
index 0000000..468b68e
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/GrobidScorableDumpJob.scala
@@ -0,0 +1,62 @@
+
+package sandcrawler
+
+import cascading.flow.FlowDef
+import cascading.pipe.Pipe
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class GrobidScorableDumpJob(args: Args) extends JobBase(args) {
+
+ val grobidHbaseRows = Stat("hbase-rows-scanned", "hbase-grobid-dump")
+ val filteredGrobidRows = Stat("grobid-rows-filtered", "hbase-grobid-dump")
+ val parsedGrobidRows = Stat("grobid-rows-parsed", "hbase-grobid-dump")
+ val validGrobidRows = Stat("grobid-rows-valid-slug", "hbase-grobid-dump")
+
+ val pipe = GrobidScorable.getHBaseSource(args("hbase-table"), args("zookeeper-hosts"))
+ .read
+ // Can't just "fromBytesWritable" because we have multiple types?
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable,ImmutableBytesWritable)](new Fields("key", "metadata", "status_code"))
+ .filter { case (_, metadata, status_code) =>
+ grobidHbaseRows.inc
+ metadata != null && status_code != null
+ }
+ .map { case (key, metadata, status_code) =>
+ (Bytes.toString(key.copyBytes()), Bytes.toString(metadata.copyBytes()), Bytes.toLong(status_code.copyBytes()))
+ }
+ // TODO: Should I combine next two stages for efficiency?
+ .collect { case (key, json, 200) =>
+ filteredGrobidRows.inc
+ (key, json)
+ }
+ .map { entry : (String, String) =>
+ parsedGrobidRows.inc
+ GrobidScorable.jsonToMapFeatures(entry._1, entry._2)
+ }
+ .filter { entry => Scorable.isValidSlug(entry.slug) }
+ .map { entry =>
+ validGrobidRows.inc
+ entry
+ }
+ // XXX: this groupBy after the map?
+ .groupBy { case MapFeatures(slug, json) => slug }
+ .map { tuple =>
+ val (slug : String, features : MapFeatures) = tuple
+ (slug, ReduceFeatures(features.json))
+ }
+
+ pipe
+ .map { case (slug, features) =>
+ (slug, features.json)
+ }
+ .write(TypedTsv[(String, String)](args("output")))
+}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala
new file mode 100644
index 0000000..20cc7a1
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/HBaseColCountJob.scala
@@ -0,0 +1,37 @@
+package sandcrawler
+
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class HBaseColCountJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val output = args("output")
+
+ HBaseColCountJob.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"),
+ args("column"))
+ .read
+ .debug
+ .groupAll { _.size('count) }
+ .write(Tsv(output))
+}
+
+object HBaseColCountJob {
+
+ // eg, "wbgrp-journal-extract-0-qa",7 "mtrcs-zk1.us.archive.org:2181"
+ def getHBaseSource(hbaseTable: String, zookeeperHosts: String, col: String) : HBaseSource = {
+ HBaseBuilder.build(
+ hbaseTable,
+ zookeeperHosts,
+ List(col),
+ SourceMode.SCAN_ALL)
+ }
+}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala
new file mode 100644
index 0000000..4d9880f
--- /dev/null
+++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCodeCountJob.scala
@@ -0,0 +1,32 @@
+package sandcrawler
+
+import java.util.Properties
+
+import cascading.property.AppProps
+import cascading.tuple.Fields
+import com.twitter.scalding._
+import com.twitter.scalding.typed.TDsl._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import parallelai.spyglass.base.JobBase
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBasePipeConversions
+import parallelai.spyglass.hbase.HBaseSource
+
+class HBaseStatusCodeCountJob(args: Args) extends JobBase(args) with HBasePipeConversions {
+
+ val source = HBaseCountJob.getHBaseSource(
+ args("hbase-table"),
+ args("zookeeper-hosts"),
+ "grobid0:status_code")
+
+ val statusPipe : TypedPipe[Long] = source
+ .read
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code)
+ .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) }
+
+ statusPipe.groupBy { identity }
+ .size
+ .debug
+ .write(TypedTsv[(Long,Long)](args("output")))
+}
diff --git a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
index fd0b4e2..f79d672 100644
--- a/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
+++ b/scalding/src/main/scala/sandcrawler/HBaseStatusCountJob.scala
@@ -18,15 +18,15 @@ class HBaseStatusCountJob(args: Args) extends JobBase(args) with HBasePipeConver
val source = HBaseCountJob.getHBaseSource(
args("hbase-table"),
args("zookeeper-hosts"),
- "grobid0:status_code")
+ "grobid0:status")
- val statusPipe : TypedPipe[Long] = source
+ val statusPipe : TypedPipe[String] = source
.read
- .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status_code)
- .map { case (key, raw_code) => Bytes.toLong(raw_code.copyBytes()) }
+ .toTypedPipe[(ImmutableBytesWritable,ImmutableBytesWritable)]('key, 'status)
+ .map { case (key, raw_status) => Bytes.toString(raw_status.copyBytes()) }
statusPipe.groupBy { identity }
.size
.debug
- .write(TypedTsv[(Long,Long)](args("output")))
+ .write(TypedTsv[(String,Long)](args("output")))
}
diff --git a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
index 8ed3369..0b9868a 100644
--- a/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
+++ b/scalding/src/main/scala/sandcrawler/ScorableFeatures.scala
@@ -1,31 +1,35 @@
package sandcrawler
+import java.io.InputStream
+
+import scala.io.Source
import scala.util.parsing.json.JSONObject
+object ScorableFeatures {
+ // TODO: Add exception handling.
+ val fileStream : InputStream = getClass.getResourceAsStream("/slug-blacklist.txt")
+ val SlugBlacklist : Set[String] = Source.fromInputStream(fileStream).getLines.toSet
+ fileStream.close
+
+ // Static factory method
+ def create(title : String, year : Int = 0, doi : String = "", sha1 : String = "") : ScorableFeatures = {
+ new ScorableFeatures(
+ title=if (title == null) "" else title,
+ year=year,
+ doi=if (doi == null) "" else doi,
+ sha1=if (sha1 == null) "" else sha1)
+ }
+}
// Contains features needed to make slug and to score (in combination
-// with a second ScorableFeatures).
-class ScorableFeatures(title : String, year: Int = 0, doi : String = "", sha1: String = "") {
-
- val slugBlacklist = Set( "abbreviations", "abstract", "acknowledgements",
- "article", "authorreply", "authorsreply", "bookreview", "bookreviews",
- "casereport", "commentary", "commentaryon", "commenton", "commentto",
- "contents", "correspondence", "dedication", "editorialadvisoryboard",
- "focus", "hypothesis", "inbrief", "introduction", "introductiontotheissue",
- "lettertotheeditor", "listofabbreviations", "note", "overview", "preface",
- "references", "results", "review", "reviewarticle", "summary", "title",
- "name")
-
- def toMap() : Map[String, Any] = {
- Map("title" -> (if (title == null) "" else title),
- "year" -> year,
- "doi" -> (if (doi == null) "" else doi),
- "sha1" -> (if (sha1 == null) "" else sha1))
- }
+// with a second ScorableFeatures). Create with above static factory method.
+class ScorableFeatures private(title : String, year: Int = 0, doi : String = "", sha1: String = "") {
- override def toString() : String = {
- JSONObject(toMap()).toString
- }
+ def toMap() : Map[String, Any] =
+ Map("title" -> title, "year" -> year, "doi" -> doi, "sha1" -> sha1)
+
+ override def toString() : String =
+ JSONObject(toMap).toString
def toSlug() : String = {
if (title == null) {
@@ -34,11 +38,10 @@ class ScorableFeatures(title : String, year: Int = 0, doi : String = "", sha1: S
val unaccented = StringUtilities.removeAccents(title)
// Remove punctuation
val slug = StringUtilities.removePunctuation((unaccented.toLowerCase())).replaceAll("\\s", "")
- if (slug.isEmpty || slug == null || (slugBlacklist contains slug)) Scorable.NoSlug else slug
+ if (slug.isEmpty || slug == null || (ScorableFeatures.SlugBlacklist contains slug)) Scorable.NoSlug else slug
}
}
- def toMapFeatures = {
+ def toMapFeatures : MapFeatures =
MapFeatures(toSlug, toString)
- }
}
diff --git a/scalding/src/main/scala/sandcrawler/ScoreJob.scala b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
index 75d45e9..28e9132 100644
--- a/scalding/src/main/scala/sandcrawler/ScoreJob.scala
+++ b/scalding/src/main/scala/sandcrawler/ScoreJob.scala
@@ -13,15 +13,18 @@ class ScoreJob(args: Args) extends JobBase(args) {
val pipe1 : TypedPipe[(String, ReduceFeatures)] = sc1.getInputPipe(args)
val pipe2 : TypedPipe[(String, ReduceFeatures)] = sc2.getInputPipe(args)
- pipe1.join(pipe2).map { entry =>
- val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
- new ReduceOutput(
+ pipe1
+ .addTrap(TypedTsv(args("output") + ".trapped"))
+ .join(pipe2)
+ .map { entry =>
+ val (slug : String, (features1 : ReduceFeatures, features2 : ReduceFeatures)) = entry
+ new ReduceOutput(
slug,
Scorable.computeSimilarity(features1, features2),
features1.json,
features2.json)
- }
- //TypedTsv doesn't work over case classes.
+ }
+ //TypedTsv doesn't work over case classes.
.map { entry => (entry.slug, entry.score, entry.json1, entry.json2) }
.write(TypedTsv[(String, Int, String, String)](args("output")))
}
diff --git a/scalding/src/main/scala/sandcrawler/StringUtilities.scala b/scalding/src/main/scala/sandcrawler/StringUtilities.scala
index 2745875..e03b60d 100644
--- a/scalding/src/main/scala/sandcrawler/StringUtilities.scala
+++ b/scalding/src/main/scala/sandcrawler/StringUtilities.scala
@@ -36,7 +36,7 @@ object StringUtilities {
// Source: https://stackoverflow.com/a/30076541/631051
def removePunctuation(s: String) : String = {
- s.replaceAll("""[\p{Punct}]""", "")
+ s.replaceAll("""[\p{Punct}’·“”‘’“”«»「」]""", "")
}
// Adapted from: https://stackoverflow.com/a/16018452/631051
diff --git a/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala b/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala
new file mode 100644
index 0000000..12e13dc
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/GrobidScorableDumpJobTest.scala
@@ -0,0 +1,124 @@
+
+package sandcrawler
+
+import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.JobTest
+import com.twitter.scalding.TextLine
+import com.twitter.scalding.TupleConversions
+import com.twitter.scalding.TypedTsv
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.scalatest._
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+
+class GrobidScorableDumpJobTest extends FlatSpec with Matchers {
+ //scalastyle:off
+ val JsonString = """
+{
+ "title": "<<TITLE>>",
+ "authors": [
+ {"name": "Brewster Kahle"},
+ {"name": "J Doe"}
+ ],
+ "journal": {
+ "name": "Dummy Example File. Journal of Fake News. pp. 1-2. ISSN 1234-5678",
+ "eissn": null,
+ "issn": null,
+ "issue": null,
+ "publisher": null,
+ "volume": null
+ },
+ "date": "2000",
+ "doi": null,
+ "citations": [
+ { "authors": [{"name": "A Seaperson"}],
+ "date": "2001",
+ "id": "b0",
+ "index": 0,
+ "issue": null,
+ "journal": "Letters in the Alphabet",
+ "publisher": null,
+ "title": "Everything is Wonderful",
+ "url": null,
+ "volume": "20"},
+ { "authors": [],
+ "date": "2011-03-28",
+ "id": "b1",
+ "index": 1,
+ "issue": null,
+ "journal": "The Dictionary",
+ "publisher": null,
+ "title": "All about Facts",
+ "url": null,
+ "volume": "14"}
+ ],
+ "abstract": "Everything you ever wanted to know about nothing",
+ "body": "Introduction \nEverything starts somewhere, as somebody [1] once said. \n\n In Depth \n Meat \nYou know, for kids. \n Potatos \nQED.",
+ "acknowledgement": null,
+ "annex": null
+}
+"""
+ // scalastyle:on
+ val JsonStringWithTitle = JsonString.replace("<<TITLE>>", "Dummy Example File")
+ val JsonStringWithoutTitle = JsonString.replace("title", "nottitle")
+ val MalformedJsonString = JsonString.replace("}", "")
+
+ // Pipeline tests
+ val output = "/tmp/testOutput"
+ val input = "/tmp/testInput"
+ val (testTable, testHost) = ("test-table", "dummy-host:2181")
+
+ val Sha1Strings : List[String] = List(
+ "sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", // good
+ "sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", // good
+ "sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", // good
+ "sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", // bad status
+ "sha1:93187A85273589347598473894839443", // malformed
+ "sha1:024937534094897039547e9824382943") // bad status
+
+ val JsonStrings : List[String] = List(
+ JsonString.replace("<<TITLE>>", "Title 1"),
+ JsonString.replace("<<TITLE>>", "Title 2: TNG"),
+ JsonString.replace("<<TITLE>>", "Title 3: The Sequel"),
+ // This will have bad status.
+ JsonString.replace("<<TITLE>>", "Title 1"),
+ MalformedJsonString,
+ // This will have bad status.
+ JsonString.replace("<<TITLE>>", "Title 2")
+ )
+
+ // bnewbold: status codes aren't strings, they are uint64
+ val Ok : Long = 200
+ val Bad : Long = 400
+ val StatusCodes = List(Ok, Ok, Ok, Bad, Ok, Bad)
+
+ val SampleDataHead : List[Tuple] = (Sha1Strings, JsonStrings, StatusCodes)
+ .zipped
+ .toList
+ .map { case (sha, json, status) => List(Bytes.toBytes(sha), Bytes.toBytes(json), Bytes.toBytes(status)) }
+ .map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) }
+
+ // scalastyle:off null
+ // Add example of lines without GROBID data
+ val SampleData = SampleDataHead :+ new Tuple(
+ new ImmutableBytesWritable(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAA88888888888")), null, null)
+ // scalastyle:on null
+
+ JobTest("sandcrawler.GrobidScorableDumpJob")
+ .arg("test", "")
+ .arg("app.conf.path", "app.conf")
+ .arg("output", output)
+ .arg("hbase-table", testTable)
+ .arg("zookeeper-hosts", testHost)
+ .arg("debug", "true")
+ .source[Tuple](GrobidScorable.getHBaseSource(testTable, testHost), SampleData)
+ .sink[(String, String)](TypedTsv[(String, String)](output)) {
+ outputBuffer =>
+ "The pipeline" should "return correct-length list" in {
+ outputBuffer should have length 3
+ }
+ }
+ .run
+ .finish
+}
diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala
new file mode 100644
index 0000000..d2cf9de
--- /dev/null
+++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCodeCountTest.scala
@@ -0,0 +1,71 @@
+package sandcrawler
+
+import cascading.tuple.Fields
+import cascading.tuple.Tuple
+import com.twitter.scalding.JobTest
+import com.twitter.scalding.Tsv
+import com.twitter.scalding.TupleConversions
+import com.twitter.scalding.TypedTsv
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.junit.runner.RunWith
+import org.scalatest.FunSpec
+import org.scalatest.junit.JUnitRunner
+import org.slf4j.LoggerFactory
+import parallelai.spyglass.hbase.HBaseConstants.SourceMode
+import parallelai.spyglass.hbase.HBaseSource
+import scala._
+
+@RunWith(classOf[JUnitRunner])
+class HBaseStatusCodeCountTest extends FunSpec with TupleConversions {
+
+ val output = "/tmp/testOutput"
+ val (testTable, testHost) = ("test-table", "dummy-host:2181")
+
+ val log = LoggerFactory.getLogger(this.getClass.getName)
+
+ val statusType1 : Long = 200
+ val statusType2 : Long = 404
+ val statusType1Bytes = Bytes.toBytes(statusType1)
+ val statusType2Bytes = Bytes.toBytes(statusType2)
+
+ // TODO(bnewbold): now to express a null (empty value) in this list?
+ val sampleData : List[List[Array[Byte]]] = List(
+ ("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", statusType1Bytes),
+ ("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", statusType1Bytes),
+ ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", statusType2Bytes),
+ ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", statusType2Bytes),
+ ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", statusType2Bytes),
+ ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", statusType2Bytes),
+ ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", statusType1Bytes),
+ ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", statusType2Bytes))
+ .map(pair => List(Bytes.toBytes(pair._1), pair._2))
+
+ val statusType1Count = sampleData.count(lst => lst(1) == statusType1Bytes)
+ val statusType2Count = sampleData.count(lst => lst(1) == statusType2Bytes)
+
+ JobTest("sandcrawler.HBaseStatusCodeCountJob")
+ .arg("test", "")
+ .arg("app.conf.path", "app.conf")
+ .arg("output", output)
+ .arg("hbase-table", testTable)
+ .arg("zookeeper-hosts", testHost)
+ .arg("debug", "true")
+ .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"),
+ sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
+ .sink[Tuple](TypedTsv[(Long, Long)](output)) {
+ outputBuffer =>
+ it("should return a correct number of elements.") {
+ assert(outputBuffer.size === 2)
+ }
+
+ // Convert List[Tuple] to Map[Long, Long].
+ val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap
+ it("should have the appropriate number of each status type") {
+ assert(counts(statusType1) == statusType1Count)
+ assert(counts(statusType2) == statusType2Count)
+ }
+ }
+ .run
+ .finish
+}
diff --git a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
index 0da0b9c..7e91af3 100644
--- a/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
+++ b/scalding/src/test/scala/sandcrawler/HBaseStatusCountTest.scala
@@ -24,22 +24,20 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions {
val log = LoggerFactory.getLogger(this.getClass.getName)
- val statusType1 : Long = 200
- val statusType2 : Long = 404
- val statusType1Bytes = Bytes.toBytes(statusType1)
- val statusType2Bytes = Bytes.toBytes(statusType2)
-
- val sampleData : List[List[Array[Byte]]] = List(
- // TODO(bnewbold): now to express a null (empty value) in this list?
- List(Bytes.toBytes("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q"), statusType1Bytes),
- List(Bytes.toBytes("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU"), statusType1Bytes),
- List(Bytes.toBytes("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT"), statusType2Bytes),
- List(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56"), statusType2Bytes),
- List(Bytes.toBytes("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ"), statusType2Bytes),
- List(Bytes.toBytes("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6"), statusType2Bytes),
- List(Bytes.toBytes("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ"), statusType1Bytes),
- List(Bytes.toBytes("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT"), statusType2Bytes)
- )
+ val statusType1Bytes = Bytes.toBytes("""{"status": "success"}""")
+ val statusType2Bytes = Bytes.toBytes("""{"status": "partial"}""")
+
+ // TODO(bnewbold): now to express a null (empty value) in this list?
+ val sampleData : List[List[Array[Byte]]] = List(
+ ("sha1:K2DKSSVTXWPRMFDTWSTCQW3RVWRIOV3Q", statusType1Bytes),
+ ("sha1:C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ3WU", statusType1Bytes),
+ ("sha1:SDKUVHC3YNNEGH5WAG5ZAAXWAEBNX4WT", statusType2Bytes),
+ ("sha1:35985C3YNNEGH5WAG5ZAAXWAEBNXJW56", statusType2Bytes),
+ ("sha1:885C3YNNEGH5WAG5ZAAXWA8BNXJWT6CZ", statusType2Bytes),
+ ("sha1:00904C3YNNEGH5WAG5ZA9XWAEBNXJWT6", statusType2Bytes),
+ ("sha1:249C3YNNEGH5WAG5ZAAXWAEBNXJWT6CZ", statusType1Bytes),
+ ("sha1:095893C3YNNEGH5WAG5ZAAXWAEBNXJWT", statusType2Bytes))
+ .map(pair => List(Bytes.toBytes(pair._1), pair._2))
val statusType1Count = sampleData.count(lst => lst(1) == statusType1Bytes)
val statusType2Count = sampleData.count(lst => lst(1) == statusType2Bytes)
@@ -51,20 +49,13 @@ class HBaseStatusCountTest extends FunSpec with TupleConversions {
.arg("hbase-table", testTable)
.arg("zookeeper-hosts", testHost)
.arg("debug", "true")
- .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status_code"),
+ .source[Tuple](HBaseCountJob.getHBaseSource(testTable, testHost, "grobid0:status"),
sampleData.map(l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*)))
- .sink[Tuple](TypedTsv[(Long, Long)](output)) {
+ .sink[Tuple](TypedTsv[(String, Long)](output)) {
outputBuffer =>
it("should return a 2-element list.") {
assert(outputBuffer.size === 2)
}
-
- // Convert List[Tuple] to Map[Long, Long].
- val counts = outputBuffer.map(t => (t.getLong(0), t.getLong(1))).toMap
- it("should have the appropriate number of each status type") {
- assert(counts(statusType1) == statusType1Count)
- assert(counts(statusType2) == statusType2Count)
- }
}
.run
.finish
diff --git a/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala b/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala
index 80d92aa..d742384 100644
--- a/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala
+++ b/scalding/src/test/scala/sandcrawler/ScorableFeaturesTest.scala
@@ -1,16 +1,21 @@
package sandcrawler
+import java.io.InputStream
+
+import scala.io.Source
+
import org.scalatest._
// scalastyle:off null
class ScorableFeaturesTest extends FlatSpec with Matchers {
+
private def titleToSlug(s : String) : String = {
- new ScorableFeatures(title = s).toSlug
+ ScorableFeatures.create(title = s).toSlug
}
"toMapFeatures()" should "work with gnarly inputs" in {
- new ScorableFeatures(title = null).toMapFeatures
- new ScorableFeatures(title = "something", doi = null, sha1 = null, year = 123).toMapFeatures
+ ScorableFeatures.create(title = null).toMapFeatures
+ ScorableFeatures.create(title = "something", doi = null, sha1 = null, year = 123).toMapFeatures
}
"mapToSlug()" should "extract the parts of titles before a colon" in {
@@ -44,7 +49,7 @@ class ScorableFeaturesTest extends FlatSpec with Matchers {
}
it should "strip special characters" in {
- titleToSlug(":;!',|\"\'`.#?!-@*/\\=+~%$^{}()[]<>-_") shouldBe Scorable.NoSlug
+ titleToSlug(":;!',|\"\'`.#?!-@*/\\=+~%$^{}()[]<>-_’·“”‘’“”«»「」") shouldBe Scorable.NoSlug
// TODO: titleToSlug("©™₨№…") shouldBe Scorable.NoSlug
// TODO: titleToSlug("πµΣσ") shouldBe Scorable.NoSlug
}
diff --git a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala
index f92ba31..35c31e5 100644
--- a/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala
+++ b/scalding/src/test/scala/sandcrawler/ScoreJobTest.scala
@@ -161,9 +161,12 @@ class ScoreJobTest extends FlatSpec with Matchers {
.map { case (sha, json, status) => List(Bytes.toBytes(sha), Bytes.toBytes(json), Bytes.toBytes(status)) }
.map { l => new Tuple(l.map(s => {new ImmutableBytesWritable(s)}):_*) }
+ // scalastyle:off null
// Add example of lines without GROBID data
+ // scalastyle:off null
val SampleData = SampleDataHead :+ new Tuple(
new ImmutableBytesWritable(Bytes.toBytes("sha1:35985C3YNNEGH5WAG5ZAA88888888888")), null, null)
+ // scalastyle:on null
JobTest("sandcrawler.ScoreJob")
.arg("test", "")
@@ -179,6 +182,8 @@ class ScoreJobTest extends FlatSpec with Matchers {
1 -> CrossrefStrings(1),
2 -> CrossrefStrings(2),
3 -> CrossrefStrings(3)))
+ .sink[(String, ReduceFeatures)](TypedTsv[(String, ReduceFeatures)](output + ".trapped")) {
+ _ => () }
.sink[(String, Int, String, String)](TypedTsv[(String, Int, String, String)](output)) {
// Grobid titles and slugs (in parentheses):
// Title 1 (title1)