From d70abdd82955feba4eecdda24ff6d95f703e0598 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 2 Nov 2018 13:59:24 -0700 Subject: FatcatRelease: start wrapping entities with extra methods --- python/fatcat/__init__.py | 1 + python/fatcat/crossref_importer.py | 5 ++- python/fatcat/release_model.py | 85 ++++++++++++++++++++++++++++++++++++++ python/tests/release_model.py | 15 +++++++ 4 files changed, 104 insertions(+), 2 deletions(-) create mode 100644 python/fatcat/release_model.py create mode 100644 python/tests/release_model.py (limited to 'python') diff --git a/python/fatcat/__init__.py b/python/fatcat/__init__.py index aa12f972..b0492684 100644 --- a/python/fatcat/__init__.py +++ b/python/fatcat/__init__.py @@ -4,6 +4,7 @@ from flask_uuid import FlaskUUID from flask_debugtoolbar import DebugToolbarExtension from config import Config import fatcat_client +from fatcat.release_model import FatcatRelease toolbar = DebugToolbarExtension() app = Flask(__name__) diff --git a/python/fatcat/crossref_importer.py b/python/fatcat/crossref_importer.py index 37005965..fbf666a3 100644 --- a/python/fatcat/crossref_importer.py +++ b/python/fatcat/crossref_importer.py @@ -6,6 +6,7 @@ import datetime import itertools import fatcat_client from fatcat.importer_common import FatcatImporter +from fatcat import FatcatRelease class FatcatCrossrefImporter(FatcatImporter): @@ -38,7 +39,7 @@ class FatcatCrossrefImporter(FatcatImporter): def parse_crossref_dict(self, obj): """ obj is a python dict (parsed from json). - returns a ReleaseEntity + returns a FatcatRelease """ # This work is out of scope if it doesn't have authors and a title @@ -212,7 +213,7 @@ class FatcatCrossrefImporter(FatcatImporter): if release_date: release_date = release_date.isoformat() + "Z" - re = fatcat_client.ReleaseEntity( + re = FatcatRelease( work_id=None, title=obj['title'][0], contribs=contribs, diff --git a/python/fatcat/release_model.py b/python/fatcat/release_model.py new file mode 100644 index 00000000..a584c00b --- /dev/null +++ b/python/fatcat/release_model.py @@ -0,0 +1,85 @@ + +from fatcat_client.models import ReleaseEntity + +class FatcatRelease(ReleaseEntity): + """ + This is a wrapper class that extends the code-generated `ReleaseEntity` + class with extra methods. + """ + + def to_elastic_dict(self): + """ + Converts from an entity model/schema to elasticsearch oriented schema. + + Returns: dict + """ + + if self.state != 'active': + raise ValueError("Entity is not 'active'") + + # First, the easy ones (direct copy) + t = dict( + ident = self.ident, + revision = self.revision, + title = self.title, + release_date = self.release_date, + release_type = self.release_type, + release_status = self.release_status, + language = self.language, + doi = self.doi, + pmid = self.pmid, + pmcid = self.pmcid, + isbn13 = self.isbn13, + core_id = self.core_id, + wikidata_qid = self.wikidata_qid + ) + + container = self.container + container_is_kept = False + if container: + t['publisher'] = container.publisher + t['container_name'] = container.name + t['container_issnl'] = container.issnl + container_extra = container.extra + if container_extra: + t['container_is_oa'] = container_extra.get('is_oa') + container_is_kept = container_extra.get('is_kept', False) + t['container_is_longtail_oa'] = container_extra.get('is_longtail_oa') + else: + t['publisher'] = self.publisher + + files = self.files or [] + t['file_count'] = len(files) + in_wa = False + in_ia = False + t['file_pdf_url'] = None + for f in files: + is_pdf = 'pdf' in f.get('mimetype', '') + for url in f.get('urls', []): + if url.get('rel', '') == 'webarchive': + in_wa = True + if '//web.archive.org/' in url['url'] or '//archive.org/' in url['url']: + in_ia = True + if is_pdf: + t['file_pdf_url'] = url['url'] + if not t['file_pdf_url'] and is_pdf: + t['file_pdf_url'] = url['url'] + t['file_in_webarchive'] = in_wa + t['file_in_ia'] = in_ia + + extra = self.extra or dict() + if extra: + t['in_shadow'] = extra.get('in_shadow') + if extra.get('grobid') and extra['grobid'].get('is_longtail_oa'): + t['container_is_longtail_oa'] = True + t['any_abstract'] = bool(self.abstracts) + t['is_kept'] = container_is_kept or extra.get('is_kept', False) + + t['ref_count'] = len(self.refs or []) + t['contrib_count'] = len(self.contribs or []) + contrib_names = [] + for c in (self.contribs or []): + if c.raw_name: + contrib_names.append(c.raw_name) + t['contrib_names'] = contrib_names + return t diff --git a/python/tests/release_model.py b/python/tests/release_model.py new file mode 100644 index 00000000..4b9dddba --- /dev/null +++ b/python/tests/release_model.py @@ -0,0 +1,15 @@ + +import json +import pytest +from fatcat.crossref_importer import FatcatCrossrefImporter +from fatcat.release_model import FatcatRelease + +from crossref import crossref_importer + +def test_elastic_convert(crossref_importer): + with open('tests/files/crossref-works.single.json', 'r') as f: + # not a single line + raw = json.loads(f.read()) + (r, c) = crossref_importer.parse_crossref_dict(raw) + r.state = 'active' + r.to_elastic_dict() -- cgit v1.2.3 From f79491a4989902d376a1a26fe0f070345e6c9d62 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Sun, 4 Nov 2018 17:25:05 -0800 Subject: pipfile: pykafka --- python/Pipfile | 2 + python/Pipfile.lock | 384 ++++++++++++++++++++++++++++++---------------------- 2 files changed, 226 insertions(+), 160 deletions(-) (limited to 'python') diff --git a/python/Pipfile b/python/Pipfile index 86091c56..be62c111 100644 --- a/python/Pipfile +++ b/python/Pipfile @@ -24,6 +24,8 @@ flask-marshmallow = "*" flask-uuid = "*" "psycopg2" = "*" flask-debugtoolbar = "*" +pykafka = "*" +python-dateutil = "*" [requires] python_version = "3.5" diff --git a/python/Pipfile.lock b/python/Pipfile.lock index 83b7ebca..da96a24f 100644 --- a/python/Pipfile.lock +++ b/python/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "ab1ff8d09f4d9f779b1662e16feacadb82f179a76466efca078997b02cb64724" + "sha256": "c99945057fc87c7a825a76cfe4d1abdcb99d0e70dc71db770cf259b761c6835c" }, "pipfile-spec": 6, "requires": { @@ -24,10 +24,10 @@ }, "certifi": { "hashes": [ - "sha256:13e698f54293db9f89122b0581843a782ad0934a4fe0172d2a980ba77fc61bb7", - "sha256:9fa520c1bacfb634fa7af20a76bcbd3d5fb390481724c597da32c719a7dca4b0" + "sha256:339dc09518b07e2fa7eda5450740925974815557727d6bd35d319c1524a04a4c", + "sha256:6d58c986d22b038c8c0df30d639f23a3e6d172a05c3583e766f4c0b785c0986a" ], - "version": "==2018.4.16" + "version": "==2018.10.15" }, "chardet": { "hashes": [ @@ -38,10 +38,10 @@ }, "click": { "hashes": [ - "sha256:29f99fc6125fbc931b758dc053b3114e55c77a6e4c6c3a2674a2dc986016381d", - "sha256:f15516df478d5a56180fbf80e68f206010e6d160fc39fa508b65e035fd75130b" + "sha256:2335065e6395b9e67ca716de5f7526736bfa6ceead690adf616d925bdc622b13", + "sha256:5b94b49521f6456670fdb30cd82a4eca9412788a93fa6dd6df72c94d5a8ff2d7" ], - "version": "==6.7" + "version": "==7.0" }, "flask": { "hashes": [ @@ -84,16 +84,17 @@ }, "idna": { "hashes": [ - "sha256:2c6a5de3089009e3da7c5dde64a141dbc8551d5b7f6cf4ed7c2568d0cc520a8f", - "sha256:8c7309c718f94b3a625cb648ace320157ad16ff131ae0af362c9f21b80ef6ec4" + "sha256:156a6814fb5ac1fc6850fb002e0852d56c0c8d2531923a51032d1b70760e186e", + "sha256:684a38a6f903c1d71d6d5fac066b58d7768af4de2b832e426ec79c30daa94a16" ], - "version": "==2.6" + "version": "==2.7" }, "itsdangerous": { "hashes": [ - "sha256:cbb3fcf8d3e33df861709ecaf89d9e6629cff0a217bc2848f1b41cd30d360519" + "sha256:321b033d07f2a4136d3ec762eac9f16a10ccd60f53c0c91af90217ace7ba1f19", + "sha256:b12271b2047cb23eeb98c8b5622e2e5c5e9abd9784a153e9d8ef9cb4dd09d749" ], - "version": "==0.24" + "version": "==1.1.0" }, "jinja2": { "hashes": [ @@ -102,6 +103,13 @@ ], "version": "==2.10" }, + "kazoo": { + "hashes": [ + "sha256:8db774f7bdece7d0dc7decb21539ff0852e42c2ffe1c28d7f1ff6f9292a1c3a4", + "sha256:a5fa2e400c5068cfee9e86b35cf0dab8232b574152d8e3590d823b3e2426ab5e" + ], + "version": "==2.5.0" + }, "markupsafe": { "hashes": [ "sha256:a6be69091dac236ea9c6bc7d012beab42010fa914c459791d627dad4910eb665" @@ -110,51 +118,70 @@ }, "marshmallow": { "hashes": [ - "sha256:171f409d48b44786b7df2793cbd7f1a9062f0fe2c14d547da536b5010f671ade", - "sha256:c231784b5a5d2b26e50c90f3038004a3552ec27658cde6e0a5a7279d0c5a8e26" + "sha256:a2052f62b18f6dad520f465e437f63ab8812423975d48b9ebd30a735466e782a", + "sha256:e1b79eb3b815b49918c64114dda691b8767b48a1f66dd1d8c0cd5842b74257c2" ], - "version": "==2.15.3" + "version": "==2.16.3" }, "marshmallow-sqlalchemy": { "hashes": [ - "sha256:32ff19350a8892b3e8dc954eeeac796576bb89356512f9e1ccd33da63f856930", - "sha256:755b16b64c0273fd8083dbe1e938dff0d49359c76d6238898c9082e3fc55368d" + "sha256:a42cdbd6b623059fca601e1b572cab28f00d4acf36e2cef38094c88424b3dcf1", + "sha256:aacb0a7e0f6b5d489cdb3c10d1ab420f74c21538838026337738e4c6e8848fd8" ], "index": "pypi", - "version": "==0.14.0" + "version": "==0.14.1" }, "psycopg2": { "hashes": [ - "sha256:027ae518d0e3b8fff41990e598bc7774c3d08a3a20e9ecc0b59fb2aaaf152f7f", - "sha256:092a80da1b052a181b6e6c765849c9b32d46c5dac3b81bf8c9b83e697f3cdbe8", - "sha256:0b9851e798bae024ed1a2a6377a8dab4b8a128a56ed406f572f9f06194e4b275", - "sha256:179c52eb870110a8c1b460c86d4f696d58510ea025602cd3f81453746fccb94f", - "sha256:19983b77ec1fc2a210092aa0333ee48811fd9fb5f194c6cd5b927ed409aea5f8", - "sha256:1d90379d01d0dc50ae9b40c863933d87ff82d51dd7d52cea5d1cb7019afd72cd", - "sha256:27467fd5af1dcc0a82d72927113b8f92da8f44b2efbdb8906bd76face95b596d", - "sha256:32702e3bd8bfe12b36226ba9846ed9e22336fc4bd710039d594b36bd432ae255", - "sha256:33f9e1032095e1436fa9ec424abcbd4c170da934fb70e391c5d78275d0307c75", - "sha256:36030ca7f4b4519ee4f52a74edc4ec73c75abfb6ea1d80ac7480953d1c0aa3c3", - "sha256:363fbbf4189722fc46779be1fad2597e2c40b3f577dc618f353a46391cf5d235", - "sha256:6f302c486132f8dd11f143e919e236ea4467d53bf18c451cac577e6988ecbd05", - "sha256:733166464598c239323142c071fa4c9b91c14359176e5ae7e202db6bcc1d2eb5", - "sha256:7cbc3b21ce2f681ca9ad2d8c0901090b23a30c955e980ebf1006d41f37068a95", - "sha256:888bba7841116e529f407f15c6d28fe3ef0760df8c45257442ec2f14f161c871", - "sha256:8966829cb0d21a08a3c5ac971a2eb67c3927ae27c247300a8476554cc0ce2ae8", - "sha256:8bf51191d60f6987482ef0cfe8511bbf4877a5aa7f313d7b488b53189cf26209", - "sha256:8eb94c0625c529215b53c08fb4e461546e2f3fc96a49c13d5474b5ad7aeab6cf", - "sha256:8ebba5314c609a05c6955e5773c7e0e57b8dd817e4f751f30de729be58fa5e78", - "sha256:932a4c101af007cb3132b1f8a9ffef23386acc53dad46536dc5ba43a3235ae02", - "sha256:ad75fe10bea19ad2188c5cb5fc4cdf53ee808d9b44578c94a3cd1e9fc2beb656", - "sha256:aeaba399254ca79c299d9fe6aa811d3c3eac61458dee10270de7f4e71c624998", - "sha256:b178e0923c93393e16646155794521e063ec17b7cc9f943f15b7d4b39776ea2c", - "sha256:b68e89bb086a9476fa85298caab43f92d0a6af135a5f433d1f6b6d82cafa7b55", - "sha256:d74cf9234ba76426add5e123449be08993a9b13ff434c6efa3a07caa305a619f", - "sha256:f3d3a88128f0c219bdc5b2d9ccd496517199660cea021c560a3252116df91cbd", - "sha256:fe6a7f87356116f5ea840c65b032af17deef0e1a5c34013a2962dd6f99b860dd" + "sha256:0b9e48a1c1505699a64ac58815ca99104aacace8321e455072cee4f7fe7b2698", + "sha256:0f4c784e1b5a320efb434c66a50b8dd7e30a7dc047e8f45c0a8d2694bfe72781", + "sha256:0fdbaa32c9eb09ef09d425dc154628fca6fa69d2f7c1a33f889abb7e0efb3909", + "sha256:11fbf688d5c953c0a5ba625cc42dea9aeb2321942c7c5ed9341a68f865dc8cb1", + "sha256:19eaac4eb25ab078bd0f28304a0cb08702d120caadfe76bb1e6846ed1f68635e", + "sha256:3232ec1a3bf4dba97fbf9b03ce12e4b6c1d01ea3c85773903a67ced725728232", + "sha256:36f8f9c216fcca048006f6dd60e4d3e6f406afde26cfb99e063f137070139eaf", + "sha256:59c1a0e4f9abe970062ed35d0720935197800a7ef7a62b3a9e3a70588d9ca40b", + "sha256:6506c5ff88750948c28d41852c09c5d2a49f51f28c6d90cbf1b6808e18c64e88", + "sha256:6bc3e68ee16f571681b8c0b6d5c0a77bef3c589012352b3f0cf5520e674e9d01", + "sha256:6dbbd7aabbc861eec6b910522534894d9dbb507d5819bc982032c3ea2e974f51", + "sha256:6e737915de826650d1a5f7ff4ac6cf888a26f021a647390ca7bafdba0e85462b", + "sha256:6ed9b2cfe85abc720e8943c1808eeffd41daa73e18b7c1e1a228b0b91f768ccc", + "sha256:711ec617ba453fdfc66616db2520db3a6d9a891e3bf62ef9aba4c95bb4e61230", + "sha256:844dacdf7530c5c612718cf12bc001f59b2d9329d35b495f1ff25045161aa6af", + "sha256:86b52e146da13c896e50c5a3341a9448151f1092b1a4153e425d1e8b62fec508", + "sha256:985c06c2a0f227131733ae58d6a541a5bc8b665e7305494782bebdb74202b793", + "sha256:a86dfe45f4f9c55b1a2312ff20a59b30da8d39c0e8821d00018372a2a177098f", + "sha256:aa3cd07f7f7e3183b63d48300666f920828a9dbd7d7ec53d450df2c4953687a9", + "sha256:b1964ed645ef8317806d615d9ff006c0dadc09dfc54b99ae67f9ba7a1ec9d5d2", + "sha256:b2abbff9e4141484bb89b96eb8eae186d77bc6d5ffbec6b01783ee5c3c467351", + "sha256:cc33c3a90492e21713260095f02b12bee02b8d1f2c03a221d763ce04fa90e2e9", + "sha256:d7de3bf0986d777807611c36e809b77a13bf1888f5c8db0ebf24b47a52d10726", + "sha256:db5e3c52576cc5b93a959a03ccc3b02cb8f0af1fbbdc80645f7a215f0b864f3a", + "sha256:e168aa795ffbb11379c942cf95bf813c7db9aa55538eb61de8c6815e092416f5", + "sha256:e9ca911f8e2d3117e5241d5fa9aaa991cb22fb0792627eeada47425d706b5ec8", + "sha256:eccf962d41ca46e6326b97c8fe0a6687b58dfc1a5f6540ed071ff1474cea749e", + "sha256:efa19deae6b9e504a74347fe5e25c2cb9343766c489c2ae921b05f37338b18d1", + "sha256:f4b0460a21f784abe17b496f66e74157a6c36116fa86da8bf6aa028b9e8ad5fe", + "sha256:f93d508ca64d924d478fb11e272e09524698f0c581d9032e68958cfbdd41faef" + ], + "index": "pypi", + "version": "==2.7.5" + }, + "pykafka": { + "hashes": [ + "sha256:6b075909a52cb0c95325bc16ab797bbcdbb37386652ea460705ed4472ce91459", + "sha256:f0bbd394ae6970042a587c99fe4dc0966e67787249d963d4ce2f810dc9490577" ], "index": "pypi", - "version": "==2.7.4" + "version": "==2.8.0" + }, + "python-dateutil": { + "hashes": [ + "sha256:063df5763652e21de43de7d9e00ccf239f953a832941e37be541614732cdfc93", + "sha256:88f9287c0174266bb0d8cedd395cfba9c58e87e5ad86b2ce58859bc11be3cf02" + ], + "index": "pypi", + "version": "==2.7.5" }, "raven": { "hashes": [ @@ -166,11 +193,11 @@ }, "requests": { "hashes": [ - "sha256:6a1b267aa90cac58ac3a765d067950e7dbbf75b1da07e895d1f594193a40a38b", - "sha256:9c443e7324ba5b85070c4a818ade28bfabedf16ea10206da1132edaa6dda237e" + "sha256:99dcfdaaeb17caf6e526f32b6a7b780461512ab3f1d992187801694cba42770c", + "sha256:a84b8c9ab6239b578f22d1c21d51b696dcfe004032bb80ea832398d6909d7279" ], "index": "pypi", - "version": "==2.18.4" + "version": "==2.20.0" }, "six": { "hashes": [ @@ -181,17 +208,23 @@ }, "sqlalchemy": { "hashes": [ - "sha256:2d5f08f714a886a1382c18be501e614bce50d362384dc089474019ce0768151c" + "sha256:84412de3794acee05630e7788f25e80e81f78eb4837e7b71d0499129f660486a" ], "index": "pypi", - "version": "==1.2.8" + "version": "==1.2.13" + }, + "tabulate": { + "hashes": [ + "sha256:e4ca13f26d0a6be2a2915428dc21e732f1e44dad7f76d7030b2ef1ec251cf7f2" + ], + "version": "==0.8.2" }, "urllib3": { "hashes": [ - "sha256:06330f386d6e4b195fbfc736b297f58c5a892e4440e54d294d7004e3a9bbea1b", - "sha256:cc44da8e1145637334317feebd728bd869a35285b93cbb4cca2577da7e62db4f" + "sha256:61bf29cada3fc2fbefad4fdf059ea4bd1b4a86d2b6d15e1c7c0b582b9752fe39", + "sha256:de9529817c93f27c8ccbfead6985011db27bd0ddfcdb2d86f3f663385c6a9c22" ], - "version": "==1.22" + "version": "==1.24.1" }, "werkzeug": { "hashes": [ @@ -204,24 +237,24 @@ "develop": { "astroid": { "hashes": [ - "sha256:032f6e09161e96f417ea7fad46d3fac7a9019c775f202182c22df0e4f714cb1c", - "sha256:dea42ae6e0b789b543f728ddae7ddb6740ba33a49fb52c4a4d9cb7bb4aa6ec09" + "sha256:292fa429e69d60e4161e7612cb7cc8fa3609e2e309f80c224d93a76d5e7b58be", + "sha256:c7013d119ec95eb626f7a2011f0b63d0c9a095df9ad06d8507b37084eada1a8d" ], - "version": "==1.6.4" + "version": "==2.0.4" }, "atomicwrites": { "hashes": [ - "sha256:240831ea22da9ab882b551b31d4225591e5e447a68c5e188db5b89ca1d487585", - "sha256:a24da68318b08ac9c9c45029f4a10371ab5b20e4226738e150e6e7c571630ae6" + "sha256:0312ad34fcad8fac3704d441f7b317e50af620823353ec657a53e981f92920c0", + "sha256:ec9ae8adaae229e4f8446952d204a3e4b5fdd2d099f9be3aaf556120135fb3ee" ], - "version": "==1.1.5" + "version": "==1.2.1" }, "attrs": { "hashes": [ - "sha256:4b90b09eeeb9b88c35bc642cbac057e45a5fd85367b985bd2809c62b7b939265", - "sha256:e0d0eb91441a3b53dab4d9b743eafc1ac44476296a2053b6ca3af0b139faf87b" + "sha256:10cbf6e27dbce8c30807caf056c8eb50917e0eaafe86347671b57254006c3e69", + "sha256:ca4be454458f9dec299268d472aaa5a11f67a4ff70093396e1ceae9c76cf4bbb" ], - "version": "==18.1.0" + "version": "==18.2.0" }, "backcall": { "hashes": [ @@ -232,10 +265,10 @@ }, "certifi": { "hashes": [ - "sha256:13e698f54293db9f89122b0581843a782ad0934a4fe0172d2a980ba77fc61bb7", - "sha256:9fa520c1bacfb634fa7af20a76bcbd3d5fb390481724c597da32c719a7dca4b0" + "sha256:339dc09518b07e2fa7eda5450740925974815557727d6bd35d319c1524a04a4c", + "sha256:6d58c986d22b038c8c0df30d639f23a3e6d172a05c3583e766f4c0b785c0986a" ], - "version": "==2018.4.16" + "version": "==2018.10.15" }, "chardet": { "hashes": [ @@ -244,22 +277,19 @@ ], "version": "==3.0.4" }, - "cookies": { - "hashes": [ - "sha256:15bee753002dff684987b8df8c235288eb8d45f8191ae056254812dfd42c81d3", - "sha256:d6b698788cae4cfa4e62ef8643a9ca332b79bd96cb314294b864ae8d7eb3ee8e" - ], - "version": "==2.2.1" - }, "coverage": { "hashes": [ "sha256:03481e81d558d30d230bc12999e3edffe392d244349a90f4ef9b88425fac74ba", "sha256:0b136648de27201056c1869a6c0d4e23f464750fd9a9ba9750b8336a244429ed", + "sha256:0bf8cbbd71adfff0ef1f3a1531e6402d13b7b01ac50a79c97ca15f030dba6306", "sha256:104ab3934abaf5be871a583541e8829d6c19ce7bde2923b2751e0d3ca44db60a", + "sha256:10a46017fef60e16694a30627319f38a2b9b52e90182dddb6e37dcdab0f4bf95", "sha256:15b111b6a0f46ee1a485414a52a7ad1d703bdf984e9ed3c288a4414d3871dcbd", "sha256:198626739a79b09fa0a2f06e083ffd12eb55449b5f8bfdbeed1df4910b2ca640", "sha256:1c383d2ef13ade2acc636556fd544dba6e14fa30755f26812f54300e401f98f2", + "sha256:23d341cdd4a0371820eb2b0bd6b88f5003a7438bbedb33688cd33b8eae59affd", "sha256:28b2191e7283f4f3568962e373b47ef7f0392993bb6660d079c62bd50fe9d162", + "sha256:2a5b73210bad5279ddb558d9a2bfedc7f4bf6ad7f3c988641d83c40293deaec1", "sha256:2eb564bbf7816a9d68dd3369a510be3327f1c618d2357fa6b1216994c2e3d508", "sha256:337ded681dd2ef9ca04ef5d93cfc87e52e09db2594c296b4a0a3662cb1b41249", "sha256:3a2184c6d797a125dca8367878d3b9a178b6fdd05fdc2d35d758c3006a1cd694", @@ -287,6 +317,7 @@ "sha256:de4418dadaa1c01d497e539210cb6baa015965526ff5afc078c57ca69160108d", "sha256:e05cb4d9aad6233d67e0541caa7e511fa4047ed7750ec2510d466e806e0255d6", "sha256:e4d96c07229f58cb686120f168276e434660e4358cc9cf3b0464210b04913e77", + "sha256:f05a636b4564104120111800021a92e43397bc12a5c72fed7036be8556e0029e", "sha256:f3f501f345f24383c0000395b26b726e46758b71393267aeae0bd36f8b3ade80", "sha256:f8a923a85cb099422ad5a2e345fe877bbc89a8a8b23235824a93488150e45f6e" ], @@ -301,18 +332,18 @@ }, "idna": { "hashes": [ - "sha256:2c6a5de3089009e3da7c5dde64a141dbc8551d5b7f6cf4ed7c2568d0cc520a8f", - "sha256:8c7309c718f94b3a625cb648ace320157ad16ff131ae0af362c9f21b80ef6ec4" + "sha256:156a6814fb5ac1fc6850fb002e0852d56c0c8d2531923a51032d1b70760e186e", + "sha256:684a38a6f903c1d71d6d5fac066b58d7768af4de2b832e426ec79c30daa94a16" ], - "version": "==2.6" + "version": "==2.7" }, "ipython": { "hashes": [ - "sha256:a0c96853549b246991046f32d19db7140f5b1a644cc31f0dc1edc86713b7676f", - "sha256:eca537aa61592aca2fef4adea12af8e42f5c335004dfa80c78caf80e8b525e5c" + "sha256:a5781d6934a3341a1f9acb4ea5acdc7ea0a0855e689dbe755d070ca51e995435", + "sha256:b10a7ddd03657c761fc503495bc36471c8158e3fc948573fb9fe82a7029d8efd" ], "index": "pypi", - "version": "==6.4.0" + "version": "==7.1.1" }, "ipython-genutils": { "hashes": [ @@ -331,10 +362,10 @@ }, "jedi": { "hashes": [ - "sha256:1972f694c6bc66a2fac8718299e2ab73011d653a6d8059790c3476d2353b99ad", - "sha256:5861f6dc0c16e024cbb0044999f9cf8013b292c05f287df06d3d991a87a4eb89" + "sha256:0191c447165f798e6a730285f2eee783fff81b0d3df261945ecb80983b5c3ca7", + "sha256:b7493f73a2febe0dc33d51c99b474547f7f6c0b2c8fb2b21f453eef204c12148" ], - "version": "==0.12.0" + "version": "==0.13.1" }, "lazy-object-proxy": { "hashes": [ @@ -379,18 +410,26 @@ }, "more-itertools": { "hashes": [ - "sha256:2b6b9893337bfd9166bee6a62c2b0c9fe7735dcf85948b387ec8cba30e85d8e8", - "sha256:6703844a52d3588f951883005efcf555e49566a48afd4db4e965d69b883980d3", - "sha256:a18d870ef2ffca2b8463c0070ad17b5978056f403fb64e3f15fe62a52db21cc0" + "sha256:c187a73da93e7a8acc0001572aebc7e3c69daf7bf6881a2cea10650bd4420092", + "sha256:c476b5d3a34e12d40130bc2f935028b5f636df8f372dc2c1c01dc19681b2039e", + "sha256:fcbfeaea0be121980e15bc97b3817b5202ca73d0eae185b4550cbfce2a3ebb3d" ], - "version": "==4.2.0" + "version": "==4.3.0" }, "parso": { "hashes": [ - "sha256:cdef26e8adc10d589f3ec4eb444bd0a29f3f1eb6d72a4292ab8afcb9d68976a6", - "sha256:f0604a40b96e062b0fd99cf134cc2d5cdf66939d0902f8267d938b0d5b26707f" + "sha256:35704a43a3c113cce4de228ddb39aab374b8004f4f2407d070b6a2ca784ce8a2", + "sha256:895c63e93b94ac1e1690f5fdd40b65f07c8171e3e53cbd7793b5b96c0e0a7f24" ], - "version": "==0.2.1" + "version": "==0.3.1" + }, + "pathlib2": { + "hashes": [ + "sha256:8eb170f8d0d61825e09a95b38be068299ddeda82f35e96c3301a8a5e7604cb83", + "sha256:d1aa2a11ba7b8f7b21ab852b1fb5afb277e1bb99d5dfc663380b5015c0d80c5a" + ], + "markers": "python_version < '3.6'", + "version": "==2.3.2" }, "pexpect": { "hashes": [ @@ -409,73 +448,75 @@ }, "pickleshare": { "hashes": [ - "sha256:84a9257227dfdd6fe1b4be1319096c20eb85ff1e82c7932f36efccfe1b09737b", - "sha256:c9a2541f25aeabc070f12f452e1f2a8eae2abd51e1cd19e8430402bdf4c1d8b5" + "sha256:87683d47965c1da65cdacaf31c8441d12b8044cdec9aca500cd78fc2c683afca", + "sha256:9649af414d74d4df115d5d718f82acb59c9d418196b7b4290ed47a12ce62df56" ], - "version": "==0.7.4" + "version": "==0.7.5" }, "pluggy": { "hashes": [ - "sha256:7f8ae7f5bdf75671a718d2daf0a64b7885f74510bcd98b1a0bb420eb9a9d0cff", - "sha256:d345c8fe681115900d6da8d048ba67c25df42973bda370783cd58826442dcd7c", - "sha256:e160a7fcf25762bb60efc7e171d4497ff1d8d2d75a3d0df7a21b76821ecbf5c5" + "sha256:447ba94990e8014ee25ec853339faf7b0fc8050cdc3289d4d71f7f410fb90095", + "sha256:bde19360a8ec4dfd8a20dcb811780a30998101f078fc7ded6162f0076f50508f" ], - "version": "==0.6.0" + "version": "==0.8.0" }, "prompt-toolkit": { "hashes": [ - "sha256:1df952620eccb399c53ebb359cc7d9a8d3a9538cb34c5a1344bdbeb29fbcc381", - "sha256:3f473ae040ddaa52b52f97f6b4a493cfa9f5920c255a12dc56a7d34397a398a4", - "sha256:858588f1983ca497f1cf4ffde01d978a3ea02b01c8a26a8bbc5cd2e66d816917" + "sha256:c1d6aff5252ab2ef391c2fe498ed8c088066f66bc64a8d5c095bbf795d9fec34", + "sha256:d4c47f79b635a0e70b84fdb97ebd9a274203706b1ee5ed44c10da62755cf3ec9", + "sha256:fd17048d8335c1e6d5ee403c3569953ba3eb8555d710bfc548faf0712666ea39" ], - "version": "==1.0.15" + "version": "==2.0.7" }, "psycopg2": { "hashes": [ - "sha256:027ae518d0e3b8fff41990e598bc7774c3d08a3a20e9ecc0b59fb2aaaf152f7f", - "sha256:092a80da1b052a181b6e6c765849c9b32d46c5dac3b81bf8c9b83e697f3cdbe8", - "sha256:0b9851e798bae024ed1a2a6377a8dab4b8a128a56ed406f572f9f06194e4b275", - "sha256:179c52eb870110a8c1b460c86d4f696d58510ea025602cd3f81453746fccb94f", - "sha256:19983b77ec1fc2a210092aa0333ee48811fd9fb5f194c6cd5b927ed409aea5f8", - "sha256:1d90379d01d0dc50ae9b40c863933d87ff82d51dd7d52cea5d1cb7019afd72cd", - "sha256:27467fd5af1dcc0a82d72927113b8f92da8f44b2efbdb8906bd76face95b596d", - "sha256:32702e3bd8bfe12b36226ba9846ed9e22336fc4bd710039d594b36bd432ae255", - "sha256:33f9e1032095e1436fa9ec424abcbd4c170da934fb70e391c5d78275d0307c75", - "sha256:36030ca7f4b4519ee4f52a74edc4ec73c75abfb6ea1d80ac7480953d1c0aa3c3", - "sha256:363fbbf4189722fc46779be1fad2597e2c40b3f577dc618f353a46391cf5d235", - "sha256:6f302c486132f8dd11f143e919e236ea4467d53bf18c451cac577e6988ecbd05", - "sha256:733166464598c239323142c071fa4c9b91c14359176e5ae7e202db6bcc1d2eb5", - "sha256:7cbc3b21ce2f681ca9ad2d8c0901090b23a30c955e980ebf1006d41f37068a95", - "sha256:888bba7841116e529f407f15c6d28fe3ef0760df8c45257442ec2f14f161c871", - "sha256:8966829cb0d21a08a3c5ac971a2eb67c3927ae27c247300a8476554cc0ce2ae8", - "sha256:8bf51191d60f6987482ef0cfe8511bbf4877a5aa7f313d7b488b53189cf26209", - "sha256:8eb94c0625c529215b53c08fb4e461546e2f3fc96a49c13d5474b5ad7aeab6cf", - "sha256:8ebba5314c609a05c6955e5773c7e0e57b8dd817e4f751f30de729be58fa5e78", - "sha256:932a4c101af007cb3132b1f8a9ffef23386acc53dad46536dc5ba43a3235ae02", - "sha256:ad75fe10bea19ad2188c5cb5fc4cdf53ee808d9b44578c94a3cd1e9fc2beb656", - "sha256:aeaba399254ca79c299d9fe6aa811d3c3eac61458dee10270de7f4e71c624998", - "sha256:b178e0923c93393e16646155794521e063ec17b7cc9f943f15b7d4b39776ea2c", - "sha256:b68e89bb086a9476fa85298caab43f92d0a6af135a5f433d1f6b6d82cafa7b55", - "sha256:d74cf9234ba76426add5e123449be08993a9b13ff434c6efa3a07caa305a619f", - "sha256:f3d3a88128f0c219bdc5b2d9ccd496517199660cea021c560a3252116df91cbd", - "sha256:fe6a7f87356116f5ea840c65b032af17deef0e1a5c34013a2962dd6f99b860dd" + "sha256:0b9e48a1c1505699a64ac58815ca99104aacace8321e455072cee4f7fe7b2698", + "sha256:0f4c784e1b5a320efb434c66a50b8dd7e30a7dc047e8f45c0a8d2694bfe72781", + "sha256:0fdbaa32c9eb09ef09d425dc154628fca6fa69d2f7c1a33f889abb7e0efb3909", + "sha256:11fbf688d5c953c0a5ba625cc42dea9aeb2321942c7c5ed9341a68f865dc8cb1", + "sha256:19eaac4eb25ab078bd0f28304a0cb08702d120caadfe76bb1e6846ed1f68635e", + "sha256:3232ec1a3bf4dba97fbf9b03ce12e4b6c1d01ea3c85773903a67ced725728232", + "sha256:36f8f9c216fcca048006f6dd60e4d3e6f406afde26cfb99e063f137070139eaf", + "sha256:59c1a0e4f9abe970062ed35d0720935197800a7ef7a62b3a9e3a70588d9ca40b", + "sha256:6506c5ff88750948c28d41852c09c5d2a49f51f28c6d90cbf1b6808e18c64e88", + "sha256:6bc3e68ee16f571681b8c0b6d5c0a77bef3c589012352b3f0cf5520e674e9d01", + "sha256:6dbbd7aabbc861eec6b910522534894d9dbb507d5819bc982032c3ea2e974f51", + "sha256:6e737915de826650d1a5f7ff4ac6cf888a26f021a647390ca7bafdba0e85462b", + "sha256:6ed9b2cfe85abc720e8943c1808eeffd41daa73e18b7c1e1a228b0b91f768ccc", + "sha256:711ec617ba453fdfc66616db2520db3a6d9a891e3bf62ef9aba4c95bb4e61230", + "sha256:844dacdf7530c5c612718cf12bc001f59b2d9329d35b495f1ff25045161aa6af", + "sha256:86b52e146da13c896e50c5a3341a9448151f1092b1a4153e425d1e8b62fec508", + "sha256:985c06c2a0f227131733ae58d6a541a5bc8b665e7305494782bebdb74202b793", + "sha256:a86dfe45f4f9c55b1a2312ff20a59b30da8d39c0e8821d00018372a2a177098f", + "sha256:aa3cd07f7f7e3183b63d48300666f920828a9dbd7d7ec53d450df2c4953687a9", + "sha256:b1964ed645ef8317806d615d9ff006c0dadc09dfc54b99ae67f9ba7a1ec9d5d2", + "sha256:b2abbff9e4141484bb89b96eb8eae186d77bc6d5ffbec6b01783ee5c3c467351", + "sha256:cc33c3a90492e21713260095f02b12bee02b8d1f2c03a221d763ce04fa90e2e9", + "sha256:d7de3bf0986d777807611c36e809b77a13bf1888f5c8db0ebf24b47a52d10726", + "sha256:db5e3c52576cc5b93a959a03ccc3b02cb8f0af1fbbdc80645f7a215f0b864f3a", + "sha256:e168aa795ffbb11379c942cf95bf813c7db9aa55538eb61de8c6815e092416f5", + "sha256:e9ca911f8e2d3117e5241d5fa9aaa991cb22fb0792627eeada47425d706b5ec8", + "sha256:eccf962d41ca46e6326b97c8fe0a6687b58dfc1a5f6540ed071ff1474cea749e", + "sha256:efa19deae6b9e504a74347fe5e25c2cb9343766c489c2ae921b05f37338b18d1", + "sha256:f4b0460a21f784abe17b496f66e74157a6c36116fa86da8bf6aa028b9e8ad5fe", + "sha256:f93d508ca64d924d478fb11e272e09524698f0c581d9032e68958cfbdd41faef" ], "index": "pypi", - "version": "==2.7.4" + "version": "==2.7.5" }, "ptyprocess": { "hashes": [ - "sha256:e64193f0047ad603b71f202332ab5527c5e52aa7c8b609704fc28c0dc20c4365", - "sha256:e8c43b5eee76b2083a9badde89fd1bbce6c8942d1045146e100b7b5e014f4f1a" + "sha256:923f299cc5ad920c68f2bc0bc98b75b9f838b93b599941a6b63ddbc2476394c0", + "sha256:d7cc528d76e76342423ca640335bd3633420dc1366f258cb31d05e865ef5ca1f" ], - "version": "==0.5.2" + "version": "==0.6.0" }, "py": { "hashes": [ - "sha256:29c9fab495d7528e80ba1e343b958684f4ace687327e6f789a94bf3d1915f881", - "sha256:983f77f3331356039fdd792e9220b7b8ee1aa6bd2b25f567a963ff1de5a64f6a" + "sha256:bf92637198836372b520efcba9e020c330123be8ce527e535d185ed4b6f45694", + "sha256:e76826342cefe3c3d5f7e8ee4316b80d1dd8a300781612ddbc765c17ba25a6c6" ], - "version": "==1.5.3" + "version": "==1.7.0" }, "pygments": { "hashes": [ @@ -486,56 +527,50 @@ }, "pylint": { "hashes": [ - "sha256:aa519865f8890a5905fa34924fed0f3bfc7d84fc9f9142c16dac52ffecd25a39", - "sha256:c353d8225195b37cc3aef18248b8f3fe94c5a6a95affaf885ae21a24ca31d8eb" + "sha256:1d6d3622c94b4887115fe5204982eee66fdd8a951cf98635ee5caee6ec98c3ec", + "sha256:31142f764d2a7cd41df5196f9933b12b7ee55e73ef12204b648ad7e556c119fb" ], "index": "pypi", - "version": "==1.9.1" + "version": "==2.1.1" }, "pytest": { "hashes": [ - "sha256:39555d023af3200d004d09e51b4dd9fdd828baa863cded3fd6ba2f29f757ae2d", - "sha256:c76e93f3145a44812955e8d46cdd302d8a45fbfc7bf22be24fe231f9d8d8853a" + "sha256:a9e5e8d7ab9d5b0747f37740276eb362e6a76275d76cebbb52c6049d93b475db", + "sha256:bf47e8ed20d03764f963f0070ff1c8fda6e2671fc5dd562a4d3b7148ad60f5ca" ], "index": "pypi", - "version": "==3.6.0" + "version": "==3.9.3" }, "pytest-cov": { "hashes": [ - "sha256:03aa752cf11db41d281ea1d807d954c4eda35cfa1b21d6971966cc041bbf6e2d", - "sha256:890fe5565400902b0c78b5357004aab1c814115894f4f21370e2433256a3eeec" + "sha256:513c425e931a0344944f84ea47f3956be0e416d95acbd897a44970c8d926d5d7", + "sha256:e360f048b7dae3f2f2a9a4d067b2dd6b6a015d384d1577c994a43f3f7cbad762" ], "index": "pypi", - "version": "==2.5.1" + "version": "==2.6.0" }, "pytest-pythonpath": { "hashes": [ - "sha256:f3d46b0a8276e856f7dc4f70ca97b88be6fbcf52d57ce36e35057d502388265e" + "sha256:63fc546ace7d2c845c1ee289e8f7a6362c2b6bae497d10c716e58e253e801d62" ], "index": "pypi", - "version": "==0.7.2" + "version": "==0.7.3" }, "requests": { "hashes": [ - "sha256:6a1b267aa90cac58ac3a765d067950e7dbbf75b1da07e895d1f594193a40a38b", - "sha256:9c443e7324ba5b85070c4a818ade28bfabedf16ea10206da1132edaa6dda237e" + "sha256:99dcfdaaeb17caf6e526f32b6a7b780461512ab3f1d992187801694cba42770c", + "sha256:a84b8c9ab6239b578f22d1c21d51b696dcfe004032bb80ea832398d6909d7279" ], "index": "pypi", - "version": "==2.18.4" + "version": "==2.20.0" }, "responses": { "hashes": [ - "sha256:c6082710f4abfb60793899ca5f21e7ceb25aabf321560cc0726f8b59006811c9", - "sha256:f23a29dca18b815d9d64a516b4a0abb1fbdccff6141d988ad8100facb81cf7b3" + "sha256:682fafb124e799eeee67ec15c9678d955a88affda5613b09788ef80c03987cf0", + "sha256:9b1c14871c66329f509711627e3de5779a2ae50bd532ac162297623424288756" ], "index": "pypi", - "version": "==0.9.0" - }, - "simplegeneric": { - "hashes": [ - "sha256:dc972e06094b9af5b855b3df4a646395e43d1c9d0d39ed345b7393560d0b9173" - ], - "version": "==0.8.1" + "version": "==0.10.2" }, "six": { "hashes": [ @@ -551,12 +586,41 @@ ], "version": "==4.3.2" }, + "typed-ast": { + "hashes": [ + "sha256:0948004fa228ae071054f5208840a1e88747a357ec1101c17217bfe99b299d58", + "sha256:10703d3cec8dcd9eef5a630a04056bbc898abc19bac5691612acba7d1325b66d", + "sha256:1f6c4bd0bdc0f14246fd41262df7dfc018d65bb05f6e16390b7ea26ca454a291", + "sha256:25d8feefe27eb0303b73545416b13d108c6067b846b543738a25ff304824ed9a", + "sha256:29464a177d56e4e055b5f7b629935af7f49c196be47528cc94e0a7bf83fbc2b9", + "sha256:2e214b72168ea0275efd6c884b114ab42e316de3ffa125b267e732ed2abda892", + "sha256:3e0d5e48e3a23e9a4d1a9f698e32a542a4a288c871d33ed8df1b092a40f3a0f9", + "sha256:519425deca5c2b2bdac49f77b2c5625781abbaf9a809d727d3a5596b30bb4ded", + "sha256:57fe287f0cdd9ceaf69e7b71a2e94a24b5d268b35df251a88fef5cc241bf73aa", + "sha256:668d0cec391d9aed1c6a388b0d5b97cd22e6073eaa5fbaa6d2946603b4871efe", + "sha256:68ba70684990f59497680ff90d18e756a47bf4863c604098f10de9716b2c0bdd", + "sha256:6de012d2b166fe7a4cdf505eee3aaa12192f7ba365beeefaca4ec10e31241a85", + "sha256:79b91ebe5a28d349b6d0d323023350133e927b4de5b651a8aa2db69c761420c6", + "sha256:8550177fa5d4c1f09b5e5f524411c44633c80ec69b24e0e98906dd761941ca46", + "sha256:898f818399cafcdb93cbbe15fc83a33d05f18e29fb498ddc09b0214cdfc7cd51", + "sha256:94b091dc0f19291adcb279a108f5d38de2430411068b219f41b343c03b28fb1f", + "sha256:a26863198902cda15ab4503991e8cf1ca874219e0118cbf07c126bce7c4db129", + "sha256:a8034021801bc0440f2e027c354b4eafd95891b573e12ff0418dec385c76785c", + "sha256:bc978ac17468fe868ee589c795d06777f75496b1ed576d308002c8a5756fb9ea", + "sha256:c05b41bc1deade9f90ddc5d988fe506208019ebba9f2578c622516fd201f5863", + "sha256:c9b060bd1e5a26ab6e8267fd46fc9e02b54eb15fffb16d112d4c7b1c12987559", + "sha256:edb04bdd45bfd76c8292c4d9654568efaedf76fe78eb246dde69bdb13b2dad87", + "sha256:f19f2a4f547505fe9072e15f6f4ae714af51b5a681a97f187971f50c283193b6" + ], + "markers": "python_version < '3.7' and implementation_name == 'cpython'", + "version": "==1.1.0" + }, "urllib3": { "hashes": [ - "sha256:06330f386d6e4b195fbfc736b297f58c5a892e4440e54d294d7004e3a9bbea1b", - "sha256:cc44da8e1145637334317feebd728bd869a35285b93cbb4cca2577da7e62db4f" + "sha256:61bf29cada3fc2fbefad4fdf059ea4bd1b4a86d2b6d15e1c7c0b582b9752fe39", + "sha256:de9529817c93f27c8ccbfead6985011db27bd0ddfcdb2d86f3f663385c6a9c22" ], - "version": "==1.22" + "version": "==1.24.1" }, "wcwidth": { "hashes": [ -- cgit v1.2.3 From 87b8f1c710c7c94ed9c1b040f75c1601591a214b Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Sun, 4 Nov 2018 17:25:41 -0800 Subject: to_json() method for fatcatrelease --- python/fatcat/release_model.py | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'python') diff --git a/python/fatcat/release_model.py b/python/fatcat/release_model.py index a584c00b..aa0d2d9f 100644 --- a/python/fatcat/release_model.py +++ b/python/fatcat/release_model.py @@ -1,5 +1,6 @@ from fatcat_client.models import ReleaseEntity +from fatcat_client.api_client import ApiClient class FatcatRelease(ReleaseEntity): """ @@ -83,3 +84,7 @@ class FatcatRelease(ReleaseEntity): contrib_names.append(c.raw_name) t['contrib_names'] = contrib_names return t + + def to_json(self): + ac = ApiClient() + return ac.sanitize_for_serialization(self) -- cgit v1.2.3 From 881b46e3b1682974f48fc196f483c3fa2648b998 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Sun, 4 Nov 2018 17:26:28 -0800 Subject: first-draft kafka workers (changelog, release_update) --- python/fatcat/changelog_workers.py | 120 +++++++++++++++++++++++++++++++++++++ python/fatcat/worker_common.py | 25 ++++++++ python/fatcat_worker.py | 52 ++++++++++++++++ 3 files changed, 197 insertions(+) create mode 100644 python/fatcat/changelog_workers.py create mode 100644 python/fatcat/worker_common.py create mode 100755 python/fatcat_worker.py (limited to 'python') diff --git a/python/fatcat/changelog_workers.py b/python/fatcat/changelog_workers.py new file mode 100644 index 00000000..5f8621cf --- /dev/null +++ b/python/fatcat/changelog_workers.py @@ -0,0 +1,120 @@ + +import json +import time +from itertools import islice +from fatcat.worker_common import FatcatWorker +from pykafka.common import OffsetType + + +class FatcatChangelogWorker(FatcatWorker): + """ + Periodically polls the fatcat API looking for new changelogs. When they are + found, fetch them and push (as JSON) into a Kafka topic. + """ + + def __init__(self, api_host_url, kafka_hosts, produce_topic, poll_interval=10.0, offset=None): + # TODO: should be offset=0 + super().__init__(kafka_hosts=kafka_hosts, + produce_topic=produce_topic, + api_host_url=api_host_url) + self.poll_interval = poll_interval + self.offset = offset # the fatcat changelog offset, not the kafka offset + + def most_recent_message(self, topic): + """ + Tries to fetch the most recent message from a given topic. + This only makes sense for single partition topics, though could be + extended with "last N" behavior. + + Following "Consuming the last N messages from a topic" + from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns + """ + consumer = topic.get_simple_consumer( + auto_offset_reset=OffsetType.LATEST, + reset_offset_on_start=True) + offsets = [(p, op.last_offset_consumed - 1) + for p, op in consumer._partitions.items()] + offsets = [(p, (o if o > -1 else -2)) for p, o in offsets] + if -2 in [o for p, o in offsets]: + return None + else: + consumer.reset_offsets(offsets) + msg = islice(consumer, 1) + if msg: + return list(msg)[0].value + else: + return None + + def run(self): + topic = self.kafka.topics[self.produce_topic] + # On start, try to consume the most recent from the topic, and using + # that as the starting offset. Note that this is a single-partition + # topic + if self.offset is None: + print("Checking for most recent changelog offset...") + msg = self.most_recent_message(topic) + if msg: + self.offset = json.loads(msg.decode('utf-8'))['index'] + else: + self.offset = 1 + + with topic.get_sync_producer() as producer: + while True: + latest = int(self.api.get_changelog(limit=1)[0].index) + if latest > self.offset: + print("Fetching changelogs from {} through {}".format( + self.offset+1, latest)) + for i in range(self.offset+1, latest+1): + cle = self.api.get_changelog_entry(i) + obj = self.api.api_client.sanitize_for_serialization(cle) + producer.produce( + message=json.dumps(obj).encode('utf-8'), + partition_key=None, + timestamp=None, + #XXX: timestamp=cle.timestamp, + ) + self.offset = i + print("Sleeping {} seconds...".format(self.poll_interval)) + time.sleep(self.poll_interval) + + +class FatcatEntityUpdatesWorker(FatcatWorker): + """ + Consumes from the changelog topic and publishes expanded entities (fetched + from API) to update topics. + + For now, only release updates are published. + """ + + def __init__(self, api_host_url, kafka_hosts, consume_topic, release_topic): + super().__init__(kafka_hosts=kafka_hosts, + consume_topic=consume_topic, + api_host_url=api_host_url) + self.release_topic = release_topic + self.consumer_group = "entity-updates" + + def run(self): + changelog_topic = self.kafka.topics[self.consume_topic] + release_topic = self.kafka.topics[self.release_topic] + + consumer = changelog_topic.get_simple_consumer( + consumer_group=self.consumer_group, + auto_offset_reset=OffsetType.LATEST, + reset_offset_on_start=False, + ) + + with release_topic.get_sync_producer() as producer: + for msg in consumer: + cle = json.loads(msg.value.decode('utf-8')) + #print(cle) + release_edits = cle['editgroup']['edits']['releases'] + for re in release_edits: + ident = re['ident'] + release = self.api.get_release(ident, expand="files,container") + release_dict = self.api.api_client.sanitize_for_serialization(release) + producer.produce( + message=json.dumps(release_dict).encode('utf-8'), + partition_key=ident.encode('utf-8'), + timestamp=None, + ) + diff --git a/python/fatcat/worker_common.py b/python/fatcat/worker_common.py new file mode 100644 index 00000000..77ea2c15 --- /dev/null +++ b/python/fatcat/worker_common.py @@ -0,0 +1,25 @@ + +import re +import sys +import csv +import json +import itertools +import fatcat_client +from pykafka import KafkaClient +from fatcat_client.rest import ApiException + + +class FatcatWorker: + """ + Common code for for Kafka producers and consumers. + """ + + def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api_host_url=None): + if api_host_url: + conf = fatcat_client.Configuration() + conf.host = api_host_url + self.api = fatcat_client.DefaultApi(fatcat_client.ApiClient(conf)) + self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") + self.produce_topic = produce_topic + self.consume_topic = consume_topic + diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py new file mode 100755 index 00000000..cc11beca --- /dev/null +++ b/python/fatcat_worker.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 + +import sys +import argparse +from fatcat.changelog_workers import FatcatChangelogWorker, FatcatEntityUpdatesWorker + +def run_changelog_worker(args): + topic = "fatcat-{}.changelog".format(args.env) + worker = FatcatChangelogWorker(args.api_host_url, args.kafka_hosts, topic, + args.poll_interval) + worker.run() + +def run_entity_updates_worker(args): + changelog_topic = "fatcat-{}.changelog".format(args.env) + release_topic = "fatcat-{}.release-updates".format(args.env) + worker = FatcatEntityUpdatesWorker(args.api_host_url, args.kafka_hosts, + changelog_topic, release_topic) + worker.run() + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--debug', + action='store_true', + help="enable debug logging") + parser.add_argument('--api-host-url', + default="http://localhost:9411/v0", + help="fatcat API host/port to use") + parser.add_argument('--kafka-hosts', + default="localhost:9092", + help="list of Kafka brokers (host/port) to use") + parser.add_argument('--env', + default="qa", + help="Kafka topic namespace to use (eg, prod, qa)") + subparsers = parser.add_subparsers() + + sub_changelog = subparsers.add_parser('changelog') + sub_changelog.set_defaults(func=run_changelog_worker) + sub_changelog.add_argument('--poll-interval', + help="how long to wait between polling (seconds)", + default=10.0, type=float) + + sub_entity_updates = subparsers.add_parser('entity-updates') + sub_entity_updates.set_defaults(func=run_entity_updates_worker) + + args = parser.parse_args() + if not args.__dict__.get("func"): + print("tell me what to do!") + sys.exit(-1) + args.func(args) + +if __name__ == '__main__': + main() -- cgit v1.2.3 From e5486378d8d7adf8974b1f1ebaf0400445ba8791 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Sun, 4 Nov 2018 18:52:22 -0800 Subject: elastic release worker --- python/fatcat/elastic_workers.py | 46 ++++++++++++++++++++++++++++++++++++++++ python/fatcat/release_model.py | 15 ++++++++++++- python/fatcat_worker.py | 10 +++++++++ 3 files changed, 70 insertions(+), 1 deletion(-) create mode 100644 python/fatcat/elastic_workers.py (limited to 'python') diff --git a/python/fatcat/elastic_workers.py b/python/fatcat/elastic_workers.py new file mode 100644 index 00000000..c3226981 --- /dev/null +++ b/python/fatcat/elastic_workers.py @@ -0,0 +1,46 @@ + +import json +import time +import requests +from fatcat.worker_common import FatcatWorker +from fatcat.release_model import FatcatRelease +from pykafka.common import OffsetType + + +class FatcatElasticReleaseWorker(FatcatWorker): + """ + Consumes from release-updates topic and pushes into (presumably local) + elasticsearch. + + Uses a consumer group to manage offset. + """ + + def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, + elastic_backend="http://localhost:9200", elastic_index="fatcat"): + super().__init__(kafka_hosts=kafka_hosts, + consume_topic=consume_topic, + api_host_url=None) + self.consumer_group = "elastic-updates" + self.elastic_backend = elastic_backend + self.elastic_index = elastic_index + + def run(self): + consume_topic = self.kafka.topics[self.consume_topic] + + consumer = consume_topic.get_balanced_consumer( + consumer_group=self.consumer_group, + managed=True, + ) + + for msg in consumer: + json_str = msg.value.decode('utf-8') + release = FatcatRelease.from_json(json_str) + #print(release) + elastic_endpoint = "{}/{}/release/{}".format( + self.elastic_backend, + self.elastic_index, + release.ident) + print("Updating document: {}".format(elastic_endpoint)) + resp = requests.post(elastic_endpoint, json=release.to_elastic_dict()) + assert resp.status_code in (200, 201) + consumer.commit_offsets() diff --git a/python/fatcat/release_model.py b/python/fatcat/release_model.py index aa0d2d9f..403fc671 100644 --- a/python/fatcat/release_model.py +++ b/python/fatcat/release_model.py @@ -1,4 +1,5 @@ +import collections from fatcat_client.models import ReleaseEntity from fatcat_client.api_client import ApiClient @@ -23,7 +24,6 @@ class FatcatRelease(ReleaseEntity): ident = self.ident, revision = self.revision, title = self.title, - release_date = self.release_date, release_type = self.release_type, release_status = self.release_status, language = self.language, @@ -35,6 +35,9 @@ class FatcatRelease(ReleaseEntity): wikidata_qid = self.wikidata_qid ) + if self.release_date: + t['release_date'] = self.release_date.strftime('%F') + container = self.container container_is_kept = False if container: @@ -88,3 +91,13 @@ class FatcatRelease(ReleaseEntity): def to_json(self): ac = ApiClient() return ac.sanitize_for_serialization(self) + + def from_json(json_str): + """ + Hack to take advantage of the code-generated deserialization code + """ + ac = ApiClient() + thing = collections.namedtuple('Thing', ['data']) + thing.data = json_str + return ac.deserialize(thing, FatcatRelease) + diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index cc11beca..50ff0fb7 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -3,6 +3,7 @@ import sys import argparse from fatcat.changelog_workers import FatcatChangelogWorker, FatcatEntityUpdatesWorker +from fatcat.elastic_workers import FatcatElasticReleaseWorker def run_changelog_worker(args): topic = "fatcat-{}.changelog".format(args.env) @@ -17,6 +18,12 @@ def run_entity_updates_worker(args): changelog_topic, release_topic) worker.run() +def run_elastic_release_worker(args): + consume_topic = "fatcat-{}.release-updates".format(args.env) + worker = FatcatElasticReleaseWorker(args.kafka_hosts, + consume_topic) + worker.run() + def main(): parser = argparse.ArgumentParser() parser.add_argument('--debug', @@ -42,6 +49,9 @@ def main(): sub_entity_updates = subparsers.add_parser('entity-updates') sub_entity_updates.set_defaults(func=run_entity_updates_worker) + sub_elastic_release = subparsers.add_parser('elastic-release') + sub_elastic_release.set_defaults(func=run_elastic_release_worker) + args = parser.parse_args() if not args.__dict__.get("func"): print("tell me what to do!") -- cgit v1.2.3 From 53d91dbefeb598539b02d18fad33f79babe2bb94 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Sun, 4 Nov 2018 18:52:48 -0800 Subject: switch entity update worker to use balanced/manager consumer --- python/fatcat/changelog_workers.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'python') diff --git a/python/fatcat/changelog_workers.py b/python/fatcat/changelog_workers.py index 5f8621cf..e341ea32 100644 --- a/python/fatcat/changelog_workers.py +++ b/python/fatcat/changelog_workers.py @@ -97,8 +97,9 @@ class FatcatEntityUpdatesWorker(FatcatWorker): changelog_topic = self.kafka.topics[self.consume_topic] release_topic = self.kafka.topics[self.release_topic] - consumer = changelog_topic.get_simple_consumer( + consumer = changelog_topic.get_balanced_consumer( consumer_group=self.consumer_group, + managed=True, auto_offset_reset=OffsetType.LATEST, reset_offset_on_start=False, ) @@ -117,4 +118,5 @@ class FatcatEntityUpdatesWorker(FatcatWorker): partition_key=ident.encode('utf-8'), timestamp=None, ) + consumer.commit_offsets() -- cgit v1.2.3 From 0109c3a75e201e81036ad031d93602ba6c46ba08 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 12 Nov 2018 22:54:04 -0800 Subject: Revert "FatcatRelease: start wrapping entities with extra methods" This reverts commit d70abdd82955feba4eecdda24ff6d95f703e0598. Decided this wasn't the right approach. --- python/fatcat/__init__.py | 1 - python/fatcat/crossref_importer.py | 5 +- python/fatcat/release_model.py | 103 ------------------------------------- python/tests/release_model.py | 15 ------ 4 files changed, 2 insertions(+), 122 deletions(-) delete mode 100644 python/fatcat/release_model.py delete mode 100644 python/tests/release_model.py (limited to 'python') diff --git a/python/fatcat/__init__.py b/python/fatcat/__init__.py index b0492684..aa12f972 100644 --- a/python/fatcat/__init__.py +++ b/python/fatcat/__init__.py @@ -4,7 +4,6 @@ from flask_uuid import FlaskUUID from flask_debugtoolbar import DebugToolbarExtension from config import Config import fatcat_client -from fatcat.release_model import FatcatRelease toolbar = DebugToolbarExtension() app = Flask(__name__) diff --git a/python/fatcat/crossref_importer.py b/python/fatcat/crossref_importer.py index fbf666a3..37005965 100644 --- a/python/fatcat/crossref_importer.py +++ b/python/fatcat/crossref_importer.py @@ -6,7 +6,6 @@ import datetime import itertools import fatcat_client from fatcat.importer_common import FatcatImporter -from fatcat import FatcatRelease class FatcatCrossrefImporter(FatcatImporter): @@ -39,7 +38,7 @@ class FatcatCrossrefImporter(FatcatImporter): def parse_crossref_dict(self, obj): """ obj is a python dict (parsed from json). - returns a FatcatRelease + returns a ReleaseEntity """ # This work is out of scope if it doesn't have authors and a title @@ -213,7 +212,7 @@ class FatcatCrossrefImporter(FatcatImporter): if release_date: release_date = release_date.isoformat() + "Z" - re = FatcatRelease( + re = fatcat_client.ReleaseEntity( work_id=None, title=obj['title'][0], contribs=contribs, diff --git a/python/fatcat/release_model.py b/python/fatcat/release_model.py deleted file mode 100644 index 403fc671..00000000 --- a/python/fatcat/release_model.py +++ /dev/null @@ -1,103 +0,0 @@ - -import collections -from fatcat_client.models import ReleaseEntity -from fatcat_client.api_client import ApiClient - -class FatcatRelease(ReleaseEntity): - """ - This is a wrapper class that extends the code-generated `ReleaseEntity` - class with extra methods. - """ - - def to_elastic_dict(self): - """ - Converts from an entity model/schema to elasticsearch oriented schema. - - Returns: dict - """ - - if self.state != 'active': - raise ValueError("Entity is not 'active'") - - # First, the easy ones (direct copy) - t = dict( - ident = self.ident, - revision = self.revision, - title = self.title, - release_type = self.release_type, - release_status = self.release_status, - language = self.language, - doi = self.doi, - pmid = self.pmid, - pmcid = self.pmcid, - isbn13 = self.isbn13, - core_id = self.core_id, - wikidata_qid = self.wikidata_qid - ) - - if self.release_date: - t['release_date'] = self.release_date.strftime('%F') - - container = self.container - container_is_kept = False - if container: - t['publisher'] = container.publisher - t['container_name'] = container.name - t['container_issnl'] = container.issnl - container_extra = container.extra - if container_extra: - t['container_is_oa'] = container_extra.get('is_oa') - container_is_kept = container_extra.get('is_kept', False) - t['container_is_longtail_oa'] = container_extra.get('is_longtail_oa') - else: - t['publisher'] = self.publisher - - files = self.files or [] - t['file_count'] = len(files) - in_wa = False - in_ia = False - t['file_pdf_url'] = None - for f in files: - is_pdf = 'pdf' in f.get('mimetype', '') - for url in f.get('urls', []): - if url.get('rel', '') == 'webarchive': - in_wa = True - if '//web.archive.org/' in url['url'] or '//archive.org/' in url['url']: - in_ia = True - if is_pdf: - t['file_pdf_url'] = url['url'] - if not t['file_pdf_url'] and is_pdf: - t['file_pdf_url'] = url['url'] - t['file_in_webarchive'] = in_wa - t['file_in_ia'] = in_ia - - extra = self.extra or dict() - if extra: - t['in_shadow'] = extra.get('in_shadow') - if extra.get('grobid') and extra['grobid'].get('is_longtail_oa'): - t['container_is_longtail_oa'] = True - t['any_abstract'] = bool(self.abstracts) - t['is_kept'] = container_is_kept or extra.get('is_kept', False) - - t['ref_count'] = len(self.refs or []) - t['contrib_count'] = len(self.contribs or []) - contrib_names = [] - for c in (self.contribs or []): - if c.raw_name: - contrib_names.append(c.raw_name) - t['contrib_names'] = contrib_names - return t - - def to_json(self): - ac = ApiClient() - return ac.sanitize_for_serialization(self) - - def from_json(json_str): - """ - Hack to take advantage of the code-generated deserialization code - """ - ac = ApiClient() - thing = collections.namedtuple('Thing', ['data']) - thing.data = json_str - return ac.deserialize(thing, FatcatRelease) - diff --git a/python/tests/release_model.py b/python/tests/release_model.py deleted file mode 100644 index 4b9dddba..00000000 --- a/python/tests/release_model.py +++ /dev/null @@ -1,15 +0,0 @@ - -import json -import pytest -from fatcat.crossref_importer import FatcatCrossrefImporter -from fatcat.release_model import FatcatRelease - -from crossref import crossref_importer - -def test_elastic_convert(crossref_importer): - with open('tests/files/crossref-works.single.json', 'r') as f: - # not a single line - raw = json.loads(f.read()) - (r, c) = crossref_importer.parse_crossref_dict(raw) - r.state = 'active' - r.to_elastic_dict() -- cgit v1.2.3 From 942f23257bcf1059a2502a3b019ce5af1bde7de5 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 12 Nov 2018 23:12:55 -0800 Subject: refactor kafka branch to not use release_model --- python/fatcat/elastic_workers.py | 5 +- python/fatcat/entity_helpers.py | 100 +++++++++++++++++++++++++++++++++++++++ python/tests/entity_helpers.py | 15 ++++++ 3 files changed, 118 insertions(+), 2 deletions(-) create mode 100644 python/fatcat/entity_helpers.py create mode 100644 python/tests/entity_helpers.py (limited to 'python') diff --git a/python/fatcat/elastic_workers.py b/python/fatcat/elastic_workers.py index c3226981..3d2e9c39 100644 --- a/python/fatcat/elastic_workers.py +++ b/python/fatcat/elastic_workers.py @@ -3,7 +3,8 @@ import json import time import requests from fatcat.worker_common import FatcatWorker -from fatcat.release_model import FatcatRelease +from fatcat_client.models import ReleaseEntity +from fatcat.entity_helpers import * from pykafka.common import OffsetType @@ -34,7 +35,7 @@ class FatcatElasticReleaseWorker(FatcatWorker): for msg in consumer: json_str = msg.value.decode('utf-8') - release = FatcatRelease.from_json(json_str) + release = entity_from_json(json_str, ReleaseEntity) #print(release) elastic_endpoint = "{}/{}/release/{}".format( self.elastic_backend, diff --git a/python/fatcat/entity_helpers.py b/python/fatcat/entity_helpers.py new file mode 100644 index 00000000..c454536b --- /dev/null +++ b/python/fatcat/entity_helpers.py @@ -0,0 +1,100 @@ + +import collections +from fatcat_client.models import ReleaseEntity +from fatcat_client.api_client import ApiClient + +def entity_to_json(entity): + ac = ApiClient() + return ac.sanitize_for_serialization(entity) + +def entity_from_json(json_str, entity_type): + """ + Hack to take advantage of the code-generated deserialization code + """ + ac = ApiClient() + thing = collections.namedtuple('Thing', ['data']) + thing.data = json_str + return ac.deserialize(thing, entity_type) + +def release_elastic_dict(release): + """ + Converts from an entity model/schema to elasticsearch oriented schema. + + Returns: dict + """ + + if release.state != 'active': + raise ValueError("Entity is not 'active'") + + # First, the easy ones (direct copy) + t = dict( + ident = release.ident, + revision = release.revision, + title = release.title, + release_type = release.release_type, + release_status = release.release_status, + language = release.language, + doi = release.doi, + pmid = release.pmid, + pmcid = release.pmcid, + isbn13 = release.isbn13, + core_id = release.core_id, + wikidata_qid = release.wikidata_qid + ) + + if release.release_date: + # TODO: resolve why this can be either a string or datetime + if type(release.release_date) == str: + t['release_date'] = release.release_date + else: + t['release_date'] = release.release_date.strftime('%F') + + container = release.container + container_is_kept = False + if container: + t['publisher'] = container.publisher + t['container_name'] = container.name + t['container_issnl'] = container.issnl + container_extra = container.extra + if container_extra: + t['container_is_oa'] = container_extra.get('is_oa') + container_is_kept = container_extra.get('is_kept', False) + t['container_is_longtail_oa'] = container_extra.get('is_longtail_oa') + else: + t['publisher'] = release.publisher + + files = release.files or [] + t['file_count'] = len(files) + in_wa = False + in_ia = False + t['file_pdf_url'] = None + for f in files: + is_pdf = 'pdf' in f.get('mimetype', '') + for url in f.get('urls', []): + if url.get('rel', '') == 'webarchive': + in_wa = True + if '//web.archive.org/' in url['url'] or '//archive.org/' in url['url']: + in_ia = True + if is_pdf: + t['file_pdf_url'] = url['url'] + if not t['file_pdf_url'] and is_pdf: + t['file_pdf_url'] = url['url'] + t['file_in_webarchive'] = in_wa + t['file_in_ia'] = in_ia + + extra = release.extra or dict() + if extra: + t['in_shadow'] = extra.get('in_shadow') + if extra.get('grobid') and extra['grobid'].get('is_longtail_oa'): + t['container_is_longtail_oa'] = True + t['any_abstract'] = bool(release.abstracts) + t['is_kept'] = container_is_kept or extra.get('is_kept', False) + + t['ref_count'] = len(release.refs or []) + t['contrib_count'] = len(release.contribs or []) + contrib_names = [] + for c in (release.contribs or []): + if c.raw_name: + contrib_names.append(c.raw_name) + t['contrib_names'] = contrib_names + return t diff --git a/python/tests/entity_helpers.py b/python/tests/entity_helpers.py new file mode 100644 index 00000000..dd6fa00a --- /dev/null +++ b/python/tests/entity_helpers.py @@ -0,0 +1,15 @@ + +import json +import pytest +from fatcat.crossref_importer import FatcatCrossrefImporter +from fatcat.entity_helpers import * + +from crossref import crossref_importer + +def test_elastic_convert(crossref_importer): + with open('tests/files/crossref-works.single.json', 'r') as f: + # not a single line + raw = json.loads(f.read()) + (r, c) = crossref_importer.parse_crossref_dict(raw) + r.state = 'active' + release_elastic_dict(r) -- cgit v1.2.3