aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/importers
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/importers')
-rw-r--r--python/fatcat_tools/importers/__init__.py2
-rw-r--r--python/fatcat_tools/importers/arabesque.py3
-rw-r--r--python/fatcat_tools/importers/common.py45
-rw-r--r--python/fatcat_tools/importers/ingest.py85
-rw-r--r--python/fatcat_tools/importers/matched.py3
5 files changed, 118 insertions, 20 deletions
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py
index 025a111c..bb9c5b17 100644
--- a/python/fatcat_tools/importers/__init__.py
+++ b/python/fatcat_tools/importers/__init__.py
@@ -26,4 +26,4 @@ from .orcid import OrcidImporter
from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE
from .wayback_static import auto_wayback_static
from .cdl_dash_dat import auto_cdl_dash_dat
-from .ingest import IngestFileResultImporter
+from .ingest import IngestFileResultImporter, SavePaperNowFileImporter
diff --git a/python/fatcat_tools/importers/arabesque.py b/python/fatcat_tools/importers/arabesque.py
index 7017c56c..acfc2b87 100644
--- a/python/fatcat_tools/importers/arabesque.py
+++ b/python/fatcat_tools/importers/arabesque.py
@@ -42,8 +42,7 @@ class ArabesqueMatchImporter(EntityImporter):
def __init__(self, api, extid_type, require_grobid=True, **kwargs):
- eg_desc = kwargs.get('editgroup_description',
- "Match web crawl files to releases based on identifier/URL seedlist")
+ eg_desc = kwargs.get('editgroup_description', None) or "Match web crawl files to releases based on identifier/URL seedlist"
eg_extra = kwargs.get('editgroup_extra', dict())
eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.ArabesqueMatchImporter')
if kwargs.get('crawl_id'):
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index 073725ad..d51a5ff4 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -6,6 +6,7 @@ import json
import ftfy
import base64
import sqlite3
+import datetime
import subprocess
import unicodedata
from collections import Counter
@@ -112,7 +113,7 @@ def clean(thing, force_xml=False):
This function is appropriate to be called on any random, non-markup string,
such as author names, titles, etc.
- It will try to clean up commong unicode mangles, HTML characters, etc.
+ It will try to clean up common unicode mangles, HTML characters, etc.
This will detect XML/HTML and "do the right thing" (aka, not remove
entities like '&amp' if there are tags in the string), unless you pass the
@@ -271,6 +272,11 @@ class EntityImporter:
if didn't update or insert because of existing)
self.counts['update'] += 1
if updated an entity
+
+ Parameters:
+
+ submit_mode: instead of accepting editgroups, only submits them.
+ implementors must write insert_batch appropriately
"""
def __init__(self, api, **kwargs):
@@ -282,6 +288,7 @@ class EntityImporter:
self.api = api
self.bezerk_mode = kwargs.get('bezerk_mode', False)
+ self.submit_mode = kwargs.get('submit_mode', False)
self.edit_batch_size = kwargs.get('edit_batch_size', 100)
self.editgroup_description = kwargs.get('editgroup_description')
self.editgroup_extra = eg_extra
@@ -330,8 +337,20 @@ class EntityImporter:
raise NotImplementedError
def finish(self):
+ """
+ Gets called as cleanup at the end of imports, but can also be called at
+ any time to "snip off" current editgroup progress. In other words, safe
+ to call this but then continue pushing records.
+
+ For example, in a persistent worker could call this if there have been
+ no new entities fed in for more than some time period, to ensure that
+ entities actually get created within a reasonable time frame.
+ """
if self._edit_count > 0:
- self.api.accept_editgroup(self._editgroup_id)
+ if self.submit_mode:
+ self.api.submit_editgroup(self._editgroup_id)
+ else:
+ self.api.accept_editgroup(self._editgroup_id)
self._editgroup_id = None
self._edit_count = 0
self._edits_inflight = []
@@ -345,7 +364,10 @@ class EntityImporter:
def get_editgroup_id(self, edits=1):
if self._edit_count >= self.edit_batch_size:
- self.api.accept_editgroup(self._editgroup_id)
+ if self.submit_mode:
+ self.api.submit_editgroup(self._editgroup_id)
+ else:
+ self.api.accept_editgroup(self._editgroup_id)
self._editgroup_id = None
self._edit_count = 0
self._edits_inflight = []
@@ -715,22 +737,28 @@ class KafkaJsonPusher(RecordPusher):
def run(self):
count = 0
+ last_push = datetime.datetime.now()
while True:
- # TODO: this is batch-oriented, because underlying importer is
+ # Note: this is batch-oriented, because underlying importer is
# often batch-oriented, but this doesn't confirm that entire batch
# has been pushed to fatcat before commiting offset. Eg, consider
# case where there there is one update and thousands of creates;
# update would be lingering in importer, and if importer crashed
- # never created. Not great.
+ # never created.
+ # This is partially mitigated for the worker case by flushing any
+ # outstanding editgroups every 5 minutes, but there is still that
+ # window when editgroups might be hanging (unsubmitted).
batch = self.consumer.consume(
num_messages=self.consume_batch_size,
timeout=self.poll_interval)
print("... got {} kafka messages ({}sec poll interval)".format(
len(batch), self.poll_interval))
if not batch:
- # TODO: could have some larger timeout here and
- # self.importer.finish() if it's been more than, eg, a couple
- # minutes
+ if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5):
+ # it has been some time, so flush any current editgroup
+ self.importer.finish()
+ last_push = datetime.datetime.now()
+ #print("Flushed any partial import batch: {}".format(self.importer.counts))
continue
# first check errors on entire batch...
for msg in batch:
@@ -743,6 +771,7 @@ class KafkaJsonPusher(RecordPusher):
count += 1
if count % 500 == 0:
print("Import counts: {}".format(self.importer.counts))
+ last_push = datetime.datetime.now()
for msg in batch:
# locally store offsets of processed messages; will be
# auto-commited by librdkafka from this "stored" value
diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py
index deb4ef51..c47f0aa7 100644
--- a/python/fatcat_tools/importers/ingest.py
+++ b/python/fatcat_tools/importers/ingest.py
@@ -11,8 +11,7 @@ class IngestFileResultImporter(EntityImporter):
def __init__(self, api, require_grobid=True, **kwargs):
- eg_desc = kwargs.pop('editgroup_description',
- "Files crawled from web using sandcrawler ingest tool")
+ eg_desc = kwargs.pop('editgroup_description', None) or "Files crawled from web using sandcrawler ingest tool"
eg_extra = kwargs.pop('editgroup_extra', dict())
eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileResultImporter')
super().__init__(api,
@@ -21,7 +20,6 @@ class IngestFileResultImporter(EntityImporter):
**kwargs)
self.default_link_rel = kwargs.get("default_link_rel", "web")
assert self.default_link_rel
- self.default_mimetype = kwargs.get("default_mimetype", None)
self.do_updates = kwargs.get("do_updates", False)
self.require_grobid = require_grobid
if self.require_grobid:
@@ -53,9 +51,14 @@ class IngestFileResultImporter(EntityImporter):
if row.get('hit') != True:
self.counts['skip-hit'] += 1
return False
- if self.ingest_request_source_whitelist and row['request'].get('ingest_request_source') not in self.ingest_request_source_whitelist:
+ source = row['request'].get('ingest_request_source')
+ if self.ingest_request_source_whitelist and source not in self.ingest_request_source_whitelist:
self.counts['skip-ingest_request_source'] += 1
return False
+ if source.startswith('savepapernow'):
+ # never process async savepapernow requests
+ self.counts['skip-savepapernow'] += 1
+ return False
if not row.get('file_meta'):
self.counts['skip-file-meta'] += 1
return False
@@ -123,16 +126,21 @@ class IngestFileResultImporter(EntityImporter):
sha1=file_meta['sha1hex'],
sha256=file_meta['sha256hex'],
size=file_meta['size_bytes'],
- mimetype=file_meta['mimetype'] or self.default_mimetype,
+ mimetype=file_meta['mimetype'],
release_ids=[release_ident],
urls=urls,
)
if fatcat and fatcat.get('edit_extra'):
fe.edit_extra = fatcat['edit_extra']
+ else:
+ fe.edit_extra = dict()
if request.get('ingest_request_source'):
- if not fe.edit_extra:
- fe.edit_extra = dict()
fe.edit_extra['ingest_request_source'] = request['ingest_request_source']
+ if request.get('link_source') and request.get('link_source_id'):
+ fe.edit_extra['link_source'] = request['link_source']
+ fe.edit_extra['link_source_id'] = request['link_source_id']
+ if not fe.edit_extra:
+ fe.edit_extra = None
return fe
def try_update(self, fe):
@@ -152,6 +160,12 @@ class IngestFileResultImporter(EntityImporter):
self.counts['exists'] += 1
return False
+ # check for existing edits-in-progress with same file hash
+ for other in self._entity_queue:
+ if other.sha1 == fe.sha1:
+ self.counts['skip-in-queue'] += 1
+ return False
+
if not self.do_updates:
self.counts['skip-update-disabled'] += 1
return False
@@ -167,3 +181,60 @@ class IngestFileResultImporter(EntityImporter):
extra=self.editgroup_extra),
entity_list=batch))
+
+class SavePaperNowFileImporter(IngestFileResultImporter):
+ """
+ This worker ingests from the same feed as IngestFileResultImporter, but
+ only imports files from anonymous save-paper-now requests, and "submits"
+ them for further human review (as opposed to accepting by default).
+ """
+
+ def __init__(self, api, submit_mode=True, **kwargs):
+
+ eg_desc = kwargs.pop('editgroup_description', None) or "Files crawled after a public 'Save Paper Now' request"
+ eg_extra = kwargs.pop('editgroup_extra', dict())
+ eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileSavePaperNow')
+ kwargs['submit_mode'] = submit_mode
+ kwargs['require_grobid'] = True
+ kwargs['do_updates'] = False
+ super().__init__(api,
+ editgroup_description=eg_desc,
+ editgroup_extra=eg_extra,
+ **kwargs)
+
+ def want(self, row):
+
+ source = row['request'].get('ingest_request_source')
+ if not source.startswith('savepapernow'):
+ self.counts['skip-not-savepapernow'] += 1
+ return False
+ if row.get('hit') != True:
+ self.counts['skip-hit'] += 1
+ return False
+ if not row.get('file_meta'):
+ self.counts['skip-file-meta'] += 1
+ return False
+ if self.require_grobid and row.get('grobid', {}).get('status_code') != 200:
+ self.counts['skip-grobid'] += 1
+ return False
+
+ return True
+
+ def insert_batch(self, batch):
+ """
+ Usually running in submit_mode, so we can't use auto_batch method
+ """
+ if self.submit_mode:
+ eg = self.api.create_editgroup(fatcat_openapi_client.Editgroup(
+ description=self.editgroup_description,
+ extra=self.editgroup_extra))
+ for fe in batch:
+ self.api.create_file(eg.editgroup_id, fe)
+ self.api.update_editgroup(eg.editgroup_id, eg, submit=True)
+ else:
+ self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch(
+ editgroup=fatcat_openapi_client.Editgroup(
+ description=self.editgroup_description,
+ extra=self.editgroup_extra),
+ entity_list=batch))
+
diff --git a/python/fatcat_tools/importers/matched.py b/python/fatcat_tools/importers/matched.py
index dbb78ec9..180d7ba3 100644
--- a/python/fatcat_tools/importers/matched.py
+++ b/python/fatcat_tools/importers/matched.py
@@ -35,8 +35,7 @@ class MatchedImporter(EntityImporter):
def __init__(self, api, **kwargs):
- eg_desc = kwargs.pop('editgroup_description',
- "Import of large-scale file-to-release match results. Source of metadata varies.")
+ eg_desc = kwargs.pop('editgroup_description', None) or "Import of large-scale file-to-release match results. Source of metadata varies."
eg_extra = kwargs.pop('editgroup_extra', dict())
eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.MatchedImporter')
super().__init__(api,