aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/sandcrawler/db.py2
-rw-r--r--python/sandcrawler/persist.py27
2 files changed, 17 insertions, 12 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py
index 1a47b0b..a2407b5 100644
--- a/python/sandcrawler/db.py
+++ b/python/sandcrawler/db.py
@@ -156,7 +156,7 @@ class SandcrawlerPostgresClient:
d.get('grobid_version') or None,
d['status_code'],
d['status'],
- d.get('fatcat_release') or None,
+ d.get('fatcat_release') or d.get('metadata', {}).get('fatcat_release') or None,
d.get('updated') or datetime.datetime.now(),
d.get('metadata') or None ,
)
diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py
index 7cb4f8d..2463afa 100644
--- a/python/sandcrawler/persist.py
+++ b/python/sandcrawler/persist.py
@@ -43,7 +43,11 @@ class PersistCdxWorker(SandcrawlerWorker):
def push_batch(self, batch):
self.counts['total'] += len(batch)
- resp = self.db.insert_cdx(self.cur, batch)
+ # filter to full CDX lines, no liveweb
+ cdx_batch = [r for r in batch if r.get('warc_path') and ("/" in r['warc_path'])]
+ resp = self.db.insert_cdx(self.cur, cdx_batch)
+ if len(cdx_batch) < len(batch):
+ self.counts['skip'] += len(batch) - len(cdx_batch)
self.counts['insert-cdx'] += resp[0]
self.counts['update-cdx'] += resp[1]
self.db.commit()
@@ -144,14 +148,12 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
}
terminal = raw.get('terminal')
if terminal:
- result['terminal_url'] = terminal['url']
- if terminal.get('status_code') == None and terminal.get('http_status'):
- terminal['status_code'] = terminal['http_status']
- result['terminal_status_code'] = int(terminal['status_code'])
- if raw.get('file_meta'):
- result['terminal_sha1hex'] = raw['file_meta']['sha1hex']
- if raw.get('cdx') and raw['cdx']['url'] == terminal['url']:
- result['terminal_dt'] = raw['cdx']['datetime']
+ result['terminal_url'] = terminal.get('terminal_url') or terminal.get('url')
+ result['terminal_dt'] = terminal.get('terminal_dt')
+ result['terminal_status_code'] = terminal.get('terminal_status_code') or terminal.get('status_code') or terminal.get('http_code')
+ if result['terminal_status_code']:
+ result['terminal_status_code'] = int(result['terminal_status_code'])
+ result['terminal_sha1hex'] = terminal.get('terminal_sha1hex')
return result
def push_batch(self, batch):
@@ -176,8 +178,11 @@ class PersistIngestFileResultWorker(SandcrawlerWorker):
self.counts['update-results'] += resp[1]
# these schemas match, so can just pass through
- # TODO: need to include warc_path etc in ingest worker, when possible
- cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx') and r['cdx'].get('warc_path')]
+ cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx')]
+ revisit_cdx_batch = [r['revisit_cdx'] for r in batch if r.get('hit') and r.get('revisit_cdx')]
+ cdx_batch.extend(revisit_cdx_batch)
+ # filter to full CDX lines, with full warc_paths (not liveweb)
+ cdx_batch = [r for r in cdx_batch if r.get('warc_path') and ("/" in r['warc_path'])]
if cdx_batch:
resp = self.db.insert_cdx(self.cur, cdx_batch)
self.counts['insert-cdx'] += resp[0]