diff options
-rw-r--r-- | python/sandcrawler/db.py | 45 | ||||
-rw-r--r-- | python/sandcrawler/persist.py | 24 |
2 files changed, 45 insertions, 24 deletions
diff --git a/python/sandcrawler/db.py b/python/sandcrawler/db.py index adf6967..0054e07 100644 --- a/python/sandcrawler/db.py +++ b/python/sandcrawler/db.py @@ -45,6 +45,14 @@ class SandcrawlerPostgresClient: def commit(self): return self.conn.commit() + def _inserts_and_updates(self, resp, on_conflict): + inserts = len([r for r in resp if r == 0]) + if on_conflict == "update": + updates = len([r for r in resp if r != 0]) + else: + updates = 0 + return (inserts, updates) + def insert_cdx(self, cur, batch, on_conflict="nothing"): sql = """ INSERT INTO @@ -56,7 +64,7 @@ class SandcrawlerPostgresClient: sql += " NOTHING" else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) - sql += " RETURNING 1;" + sql += " RETURNING xmax;" batch = [d for d in batch if d.get('warc_offset')] if not batch: @@ -69,8 +77,8 @@ class SandcrawlerPostgresClient: d['warc_csize'], d['warc_offset']) for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) - return len(res) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) def insert_file_meta(self, cur, batch, on_conflict="nothing"): sql = """ @@ -81,17 +89,24 @@ class SandcrawlerPostgresClient: """ if on_conflict.lower() == "nothing": sql += " NOTHING" + elif on_conflict.lower() == "update": + sql += """ UPDATE SET + sha256hex=EXCLUDED.sha256hex, + md5hex=EXCLUDED.md5hex, + size_bytes=EXCLUDED.size_bytes, + mimetype=EXCLUDED.mimetype + """ else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) - sql += " RETURNING 1;" + sql += " RETURNING xmax;" batch = [(d['sha1hex'], d['sha256hex'], d['md5hex'], int(d['size_bytes']), d['mimetype']) for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) - return len(res) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) def insert_grobid(self, cur, batch, on_conflict="nothing"): sql = """ @@ -113,7 +128,7 @@ class SandcrawlerPostgresClient: """ else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) - sql += " RETURNING 1;" + sql += " RETURNING xmax;" for r in batch: if r.get('metadata'): r['metadata'] = json.dumps(r['metadata'], sort_keys=True) @@ -126,8 +141,8 @@ class SandcrawlerPostgresClient: d.get('metadata') or None , ) for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) - return len(res) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) def insert_ingest_request(self, cur, batch, on_conflict="nothing"): sql = """ @@ -140,7 +155,7 @@ class SandcrawlerPostgresClient: sql += " NOTHING" else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) - sql += " RETURNING 1;" + sql += " RETURNING xmax;" for r in batch: extra = dict() for k in ('ext_ids', 'fatcat_release', 'edit_extra'): @@ -157,8 +172,8 @@ class SandcrawlerPostgresClient: d.get('extra') or None, ) for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) - return len(res) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) def insert_ingest_file_result(self, cur, batch, on_conflict="nothing"): sql = """ @@ -181,7 +196,7 @@ class SandcrawlerPostgresClient: """ else: raise NotImplementedError("on_conflict: {}".format(on_conflict)) - sql += " RETURNING 1;" + sql += " RETURNING xmax;" batch = [(d['ingest_type'], d['base_url'], bool(d['hit']), @@ -192,5 +207,5 @@ class SandcrawlerPostgresClient: d.get('terminal_sha1hex'), ) for d in batch] - res = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) - return len(res) + resp = psycopg2.extras.execute_values(cur, sql, batch, page_size=250, fetch=True) + return self._inserts_and_updates(resp, on_conflict) diff --git a/python/sandcrawler/persist.py b/python/sandcrawler/persist.py index 9f8171c..3b9cde9 100644 --- a/python/sandcrawler/persist.py +++ b/python/sandcrawler/persist.py @@ -43,7 +43,8 @@ class PersistCdxWorker(SandcrawlerWorker): def push_batch(self, batch): self.counts['total'] += len(batch) resp = self.db.insert_cdx(self.cur, batch) - self.counts['insert-cdx'] += resp + self.counts['insert-cdx'] += resp[0] + self.counts['update-cdx'] += resp[1] self.db.commit() return [] @@ -85,7 +86,7 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): for k in ('ingest_type', 'base_url', 'link_source', 'link_source_id'): if not k in raw: - self.counts['skip-fields'] += 1 + self.counts['skip-request-fields'] += 1 return None if raw['ingest_type'] not in ('pdf', 'xml'): print(raw['ingest_type']) @@ -120,10 +121,10 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): """ for k in ('request', 'hit', 'status'): if not k in raw: - self.counts['skip-fields'] += 1 + self.counts['skip-result-fields'] += 1 return None if not 'base_url' in raw['request']: - self.counts['skip-fields'] += 1 + self.counts['skip-result-fields'] += 1 return None ingest_type = raw['request'].get('ingest_type') if ingest_type == 'file': @@ -162,21 +163,26 @@ class PersistIngestFileResultWorker(SandcrawlerWorker): if requests: resp = self.db.insert_ingest_request(self.cur, requests) - self.counts['insert-requests'] += resp + self.counts['insert-requests'] += resp[0] + self.counts['update-requests'] += resp[1] if results: resp = self.db.insert_ingest_file_result(self.cur, results, on_conflict="update") - self.counts['insert-results'] += resp + self.counts['insert-results'] += resp[0] + self.counts['update-results'] += resp[1] # these schemas match, so can just pass through - # TODO: need to include warc_path etc in ingest-result + # TODO: need to include warc_path etc in ingest worker, when possible cdx_batch = [r['cdx'] for r in batch if r.get('hit') and r.get('cdx') and r['cdx'].get('warc_path')] if cdx_batch: resp = self.db.insert_cdx(self.cur, cdx_batch) - self.counts['insert-cdx'] += resp + self.counts['insert-cdx'] += resp[0] + self.counts['update-cdx'] += resp[1] + file_meta_batch = [r['file_meta'] for r in batch if r.get('hit') and r.get('file_meta')] if file_meta_batch: resp = self.db.insert_file_meta(self.cur, file_meta_batch) - self.counts['insert-file_meta'] += resp + self.counts['insert-file_meta'] += resp[0] + self.counts['update-file_meta'] += resp[1] self.db.commit() return [] |