summaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rwxr-xr-xpython/fatcat_import.py33
-rw-r--r--python/fatcat_tools/importers/__init__.py2
-rw-r--r--python/fatcat_tools/importers/common.py21
-rw-r--r--python/fatcat_tools/importers/ingest.py122
4 files changed, 139 insertions, 39 deletions
diff --git a/python/fatcat_import.py b/python/fatcat_import.py
index 1dcfec21..b82e81c7 100755
--- a/python/fatcat_import.py
+++ b/python/fatcat_import.py
@@ -159,6 +159,7 @@ def run_ingest_web(args):
"fatcat-{}-ingest-web-result".format(args.kafka_env),
kafka_namespace="sandcrawler",
consume_batch_size=args.batch_size,
+ force_flush=True,
).run()
else:
JsonLinePusher(iwri, args.json_file).run()
@@ -176,6 +177,25 @@ def run_savepapernow_file(args):
"fatcat-{}-savepapernow-file-result".format(args.kafka_env),
kafka_namespace="sandcrawler",
consume_batch_size=args.batch_size,
+ force_flush=True,
+ ).run()
+ else:
+ JsonLinePusher(ifri, args.json_file).run()
+
+def run_savepapernow_web(args):
+ ifri = SavePaperNowWebImporter(args.api,
+ editgroup_description=args.editgroup_description_override,
+ edit_batch_size=args.batch_size)
+ if args.kafka_mode:
+ KafkaJsonPusher(
+ ifri,
+ args.kafka_hosts,
+ args.kafka_env,
+ "ingest-file-results",
+ "fatcat-{}-savepapernow-web-result".format(args.kafka_env),
+ kafka_namespace="sandcrawler",
+ consume_batch_size=args.batch_size,
+ force_flush=True,
).run()
else:
JsonLinePusher(ifri, args.json_file).run()
@@ -554,6 +574,19 @@ def main():
action='store_true',
help="consume from kafka topic (not stdin)")
+ sub_savepapernow_web = subparsers.add_parser('savepapernow-web-results',
+ help="add webcapture entities crawled due to async Save Paper Now request")
+ sub_savepapernow_web.set_defaults(
+ func=run_savepapernow_web,
+ auth_var="FATCAT_AUTH_WORKER_SAVEPAPERNOW",
+ )
+ sub_savepapernow_web.add_argument('json_file',
+ help="ingest-file JSON file to import from",
+ default=sys.stdin, type=argparse.FileType('r'))
+ sub_savepapernow_web.add_argument('--kafka-mode',
+ action='store_true',
+ help="consume from kafka topic (not stdin)")
+
sub_grobid_metadata = subparsers.add_parser('grobid-metadata',
help="create release and file entities based on GROBID PDF metadata extraction")
sub_grobid_metadata.set_defaults(
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py
index 6a2edeac..9cb18506 100644
--- a/python/fatcat_tools/importers/__init__.py
+++ b/python/fatcat_tools/importers/__init__.py
@@ -27,7 +27,7 @@ from .orcid import OrcidImporter
from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE
from .wayback_static import auto_wayback_static
from .cdl_dash_dat import auto_cdl_dash_dat
-from .ingest import IngestFileResultImporter, SavePaperNowFileImporter, IngestWebResultImporter
+from .ingest import IngestFileResultImporter, SavePaperNowFileImporter, IngestWebResultImporter, SavePaperNowWebImporter
from .shadow import ShadowLibraryImporter
from .file_meta import FileMetaImporter
from .doaj_article import DoajArticleImporter
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index 6815a155..e936477c 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -732,8 +732,8 @@ class KafkaBs4XmlPusher(RecordPusher):
batch = self.consumer.consume(
num_messages=self.consume_batch_size,
timeout=self.poll_interval)
- print("... got {} kafka messages ({}sec poll interval)".format(
- len(batch), self.poll_interval))
+ print("... got {} kafka messages ({}sec poll interval) {}".format(
+ len(batch), self.poll_interval, self.importer.counts))
if not batch:
if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5):
# it has been some time, so flush any current editgroup
@@ -779,10 +779,12 @@ class KafkaJsonPusher(RecordPusher):
)
self.poll_interval = kwargs.get('poll_interval', 5.0)
self.consume_batch_size = kwargs.get('consume_batch_size', 100)
+ self.force_flush = kwargs.get('force_flush', False)
def run(self):
count = 0
last_push = datetime.datetime.now()
+ last_force_flush = datetime.datetime.now()
while True:
# Note: this is batch-oriented, because underlying importer is
# often batch-oriented, but this doesn't confirm that entire batch
@@ -796,13 +798,24 @@ class KafkaJsonPusher(RecordPusher):
batch = self.consumer.consume(
num_messages=self.consume_batch_size,
timeout=self.poll_interval)
- print("... got {} kafka messages ({}sec poll interval)".format(
- len(batch), self.poll_interval))
+ print("... got {} kafka messages ({}sec poll interval) {}".format(
+ len(batch), self.poll_interval, self.importer.counts))
+ if self.force_flush:
+ # this flushing happens even if there have been 'push' events
+ # more recently. it is intended for, eg, importers off the
+ # ingest file stream *other than* the usual file importer. the
+ # web/HTML and savepapernow importers get frequent messages,
+ # but rare 'want() == True', so need an extra flush
+ if datetime.datetime.now() - last_force_flush > datetime.timedelta(minutes=5):
+ self.importer.finish()
+ last_push = datetime.datetime.now()
+ last_force_flush = datetime.datetime.now()
if not batch:
if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5):
# it has been some time, so flush any current editgroup
self.importer.finish()
last_push = datetime.datetime.now()
+ last_force_flush = datetime.datetime.now()
#print("Flushed any partial import batch: {}".format(self.importer.counts))
continue
# first check errors on entire batch...
diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py
index ae3e147a..bc759219 100644
--- a/python/fatcat_tools/importers/ingest.py
+++ b/python/fatcat_tools/importers/ingest.py
@@ -222,6 +222,12 @@ class IngestFileResultImporter(EntityImporter):
if edit_extra['link_source'] == 'doi':
edit_extra['link_source_id'] = edit_extra['link_source_id'].lower()
+ # GROBID metadata, for SPN requests (when there might not be 'success')
+ if request.get('ingest_type') == 'pdf':
+ if row.get('grobid') and row['grobid'].get('status') != 'success':
+ edit_extra['grobid_status_code'] = row['grobid']['status_code']
+ edit_extra['grobid_version'] = row['grobid'].get('grobid_version')
+
return edit_extra
def parse_record(self, row):
@@ -304,11 +310,19 @@ class IngestFileResultImporter(EntityImporter):
return False
def insert_batch(self, batch):
- self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch(
- editgroup=fatcat_openapi_client.Editgroup(
+ if self.submit_mode:
+ eg = self.api.create_editgroup(fatcat_openapi_client.Editgroup(
description=self.editgroup_description,
- extra=self.editgroup_extra),
- entity_list=batch))
+ extra=self.editgroup_extra))
+ for fe in batch:
+ self.api.create_file(eg.editgroup_id, fe)
+ self.api.update_editgroup(eg.editgroup_id, eg, submit=True)
+ else:
+ self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch(
+ editgroup=fatcat_openapi_client.Editgroup(
+ description=self.editgroup_description,
+ extra=self.editgroup_extra),
+ entity_list=batch))
class SavePaperNowFileImporter(IngestFileResultImporter):
@@ -324,7 +338,7 @@ class SavePaperNowFileImporter(IngestFileResultImporter):
eg_extra = kwargs.pop('editgroup_extra', dict())
eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileSavePaperNow')
kwargs['submit_mode'] = submit_mode
- kwargs['require_grobid'] = True
+ kwargs['require_grobid'] = False
kwargs['do_updates'] = False
super().__init__(api,
editgroup_description=eg_desc,
@@ -333,9 +347,6 @@ class SavePaperNowFileImporter(IngestFileResultImporter):
def want(self, row):
- if not self.want_file(row):
- return False
-
source = row['request'].get('ingest_request_source')
if not source:
self.counts['skip-ingest_request_source'] += 1
@@ -343,29 +354,15 @@ class SavePaperNowFileImporter(IngestFileResultImporter):
if not source.startswith('savepapernow'):
self.counts['skip-not-savepapernow'] += 1
return False
+
if row.get('hit') != True:
self.counts['skip-hit'] += 1
return False
- return True
+ if not self.want_file(row):
+ return False
- def insert_batch(self, batch):
- """
- Usually running in submit_mode, so we can't use auto_batch method
- """
- if self.submit_mode:
- eg = self.api.create_editgroup(fatcat_openapi_client.Editgroup(
- description=self.editgroup_description,
- extra=self.editgroup_extra))
- for fe in batch:
- self.api.create_file(eg.editgroup_id, fe)
- self.api.update_editgroup(eg.editgroup_id, eg, submit=True)
- else:
- self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch(
- editgroup=fatcat_openapi_client.Editgroup(
- description=self.editgroup_description,
- extra=self.editgroup_extra),
- entity_list=batch))
+ return True
class IngestWebResultImporter(IngestFileResultImporter):
@@ -390,14 +387,13 @@ class IngestWebResultImporter(IngestFileResultImporter):
if not self.want_ingest(row):
return False
- if not row.get('file_meta'):
- self.counts['skip-file-meta'] += 1
- return False
-
# webcapture-specific filters
if row['request'].get('ingest_type') != 'html':
self.counts['skip-ingest-type'] += 1
return False
+ if not row.get('file_meta'):
+ self.counts['skip-file-meta'] += 1
+ return False
if row['file_meta'].get('mimetype') not in ("text/html", "application/xhtml+xml"):
self.counts['skip-mimetype'] += 1
return False
@@ -514,8 +510,66 @@ class IngestWebResultImporter(IngestFileResultImporter):
return True
def insert_batch(self, batch):
- self.api.create_webcapture_auto_batch(fatcat_openapi_client.WebcaptureAutoBatch(
- editgroup=fatcat_openapi_client.Editgroup(
+ if self.submit_mode:
+ eg = self.api.create_editgroup(fatcat_openapi_client.Editgroup(
description=self.editgroup_description,
- extra=self.editgroup_extra),
- entity_list=batch))
+ extra=self.editgroup_extra))
+ for fe in batch:
+ self.api.create_webcapture(eg.editgroup_id, fe)
+ self.api.update_editgroup(eg.editgroup_id, eg, submit=True)
+ else:
+ self.api.create_webcapture_auto_batch(fatcat_openapi_client.WebcaptureAutoBatch(
+ editgroup=fatcat_openapi_client.Editgroup(
+ description=self.editgroup_description,
+ extra=self.editgroup_extra),
+ entity_list=batch))
+
+class SavePaperNowWebImporter(IngestWebResultImporter):
+ """
+ Like SavePaperNowFileImporter, but for webcapture (HTML) ingest.
+ """
+
+ def __init__(self, api, submit_mode=True, **kwargs):
+
+ eg_desc = kwargs.pop('editgroup_description', None) or "Webcaptures crawled after a public 'Save Paper Now' request"
+ eg_extra = kwargs.pop('editgroup_extra', dict())
+ eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestWebSavePaperNow')
+ kwargs['submit_mode'] = submit_mode
+ kwargs['do_updates'] = False
+ super().__init__(api,
+ editgroup_description=eg_desc,
+ editgroup_extra=eg_extra,
+ **kwargs)
+
+ def want(self, row):
+ """
+ Relatively custom want() here, a synthesis of other filters.
+
+ We do currently allow unknown-scope through for this specific code
+ path, which means allowing hit=false.
+ """
+
+ source = row['request'].get('ingest_request_source')
+ if not source:
+ self.counts['skip-ingest_request_source'] += 1
+ return False
+ if not source.startswith('savepapernow'):
+ self.counts['skip-not-savepapernow'] += 1
+ return False
+
+ # webcapture-specific filters
+ if row['request'].get('ingest_type') != 'html':
+ self.counts['skip-ingest-type'] += 1
+ return False
+ if not row.get('file_meta'):
+ self.counts['skip-file-meta'] += 1
+ return False
+ if row['file_meta'].get('mimetype') not in ("text/html", "application/xhtml+xml"):
+ self.counts['skip-mimetype'] += 1
+ return False
+
+ if row.get('status') not in ['success', 'unknown-scope']:
+ self.counts['skip-hit'] += 1
+ return False
+
+ return True