aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2018-11-12 23:13:22 -0800
committerBryan Newbold <bnewbold@robocracy.org>2018-11-12 23:13:22 -0800
commit055c464deea8cdaccf3ed384995d4409b0f51409 (patch)
tree582e3d62d192d51fafed0c5e3321fd21a3a42d34
parent6828313ecf2fea06e500f447f3abb92e5651c1fa (diff)
parent942f23257bcf1059a2502a3b019ce5af1bde7de5 (diff)
downloadfatcat-055c464deea8cdaccf3ed384995d4409b0f51409.tar.gz
fatcat-055c464deea8cdaccf3ed384995d4409b0f51409.zip
Merge branch 'kafka'
-rw-r--r--python/Pipfile2
-rw-r--r--python/Pipfile.lock384
-rw-r--r--python/fatcat/changelog_workers.py122
-rw-r--r--python/fatcat/elastic_workers.py47
-rw-r--r--python/fatcat/entity_helpers.py100
-rw-r--r--python/fatcat/worker_common.py25
-rwxr-xr-xpython/fatcat_worker.py62
-rw-r--r--python/tests/entity_helpers.py15
8 files changed, 597 insertions, 160 deletions
diff --git a/python/Pipfile b/python/Pipfile
index 86091c56..be62c111 100644
--- a/python/Pipfile
+++ b/python/Pipfile
@@ -24,6 +24,8 @@ flask-marshmallow = "*"
flask-uuid = "*"
"psycopg2" = "*"
flask-debugtoolbar = "*"
+pykafka = "*"
+python-dateutil = "*"
[requires]
python_version = "3.5"
diff --git a/python/Pipfile.lock b/python/Pipfile.lock
index 83b7ebca..da96a24f 100644
--- a/python/Pipfile.lock
+++ b/python/Pipfile.lock
@@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
- "sha256": "ab1ff8d09f4d9f779b1662e16feacadb82f179a76466efca078997b02cb64724"
+ "sha256": "c99945057fc87c7a825a76cfe4d1abdcb99d0e70dc71db770cf259b761c6835c"
},
"pipfile-spec": 6,
"requires": {
@@ -24,10 +24,10 @@
},
"certifi": {
"hashes": [
- "sha256:13e698f54293db9f89122b0581843a782ad0934a4fe0172d2a980ba77fc61bb7",
- "sha256:9fa520c1bacfb634fa7af20a76bcbd3d5fb390481724c597da32c719a7dca4b0"
+ "sha256:339dc09518b07e2fa7eda5450740925974815557727d6bd35d319c1524a04a4c",
+ "sha256:6d58c986d22b038c8c0df30d639f23a3e6d172a05c3583e766f4c0b785c0986a"
],
- "version": "==2018.4.16"
+ "version": "==2018.10.15"
},
"chardet": {
"hashes": [
@@ -38,10 +38,10 @@
},
"click": {
"hashes": [
- "sha256:29f99fc6125fbc931b758dc053b3114e55c77a6e4c6c3a2674a2dc986016381d",
- "sha256:f15516df478d5a56180fbf80e68f206010e6d160fc39fa508b65e035fd75130b"
+ "sha256:2335065e6395b9e67ca716de5f7526736bfa6ceead690adf616d925bdc622b13",
+ "sha256:5b94b49521f6456670fdb30cd82a4eca9412788a93fa6dd6df72c94d5a8ff2d7"
],
- "version": "==6.7"
+ "version": "==7.0"
},
"flask": {
"hashes": [
@@ -84,16 +84,17 @@
},
"idna": {
"hashes": [
- "sha256:2c6a5de3089009e3da7c5dde64a141dbc8551d5b7f6cf4ed7c2568d0cc520a8f",
- "sha256:8c7309c718f94b3a625cb648ace320157ad16ff131ae0af362c9f21b80ef6ec4"
+ "sha256:156a6814fb5ac1fc6850fb002e0852d56c0c8d2531923a51032d1b70760e186e",
+ "sha256:684a38a6f903c1d71d6d5fac066b58d7768af4de2b832e426ec79c30daa94a16"
],
- "version": "==2.6"
+ "version": "==2.7"
},
"itsdangerous": {
"hashes": [
- "sha256:cbb3fcf8d3e33df861709ecaf89d9e6629cff0a217bc2848f1b41cd30d360519"
+ "sha256:321b033d07f2a4136d3ec762eac9f16a10ccd60f53c0c91af90217ace7ba1f19",
+ "sha256:b12271b2047cb23eeb98c8b5622e2e5c5e9abd9784a153e9d8ef9cb4dd09d749"
],
- "version": "==0.24"
+ "version": "==1.1.0"
},
"jinja2": {
"hashes": [
@@ -102,6 +103,13 @@
],
"version": "==2.10"
},
+ "kazoo": {
+ "hashes": [
+ "sha256:8db774f7bdece7d0dc7decb21539ff0852e42c2ffe1c28d7f1ff6f9292a1c3a4",
+ "sha256:a5fa2e400c5068cfee9e86b35cf0dab8232b574152d8e3590d823b3e2426ab5e"
+ ],
+ "version": "==2.5.0"
+ },
"markupsafe": {
"hashes": [
"sha256:a6be69091dac236ea9c6bc7d012beab42010fa914c459791d627dad4910eb665"
@@ -110,51 +118,70 @@
},
"marshmallow": {
"hashes": [
- "sha256:171f409d48b44786b7df2793cbd7f1a9062f0fe2c14d547da536b5010f671ade",
- "sha256:c231784b5a5d2b26e50c90f3038004a3552ec27658cde6e0a5a7279d0c5a8e26"
+ "sha256:a2052f62b18f6dad520f465e437f63ab8812423975d48b9ebd30a735466e782a",
+ "sha256:e1b79eb3b815b49918c64114dda691b8767b48a1f66dd1d8c0cd5842b74257c2"
],
- "version": "==2.15.3"
+ "version": "==2.16.3"
},
"marshmallow-sqlalchemy": {
"hashes": [
- "sha256:32ff19350a8892b3e8dc954eeeac796576bb89356512f9e1ccd33da63f856930",
- "sha256:755b16b64c0273fd8083dbe1e938dff0d49359c76d6238898c9082e3fc55368d"
+ "sha256:a42cdbd6b623059fca601e1b572cab28f00d4acf36e2cef38094c88424b3dcf1",
+ "sha256:aacb0a7e0f6b5d489cdb3c10d1ab420f74c21538838026337738e4c6e8848fd8"
],
"index": "pypi",
- "version": "==0.14.0"
+ "version": "==0.14.1"
},
"psycopg2": {
"hashes": [
- "sha256:027ae518d0e3b8fff41990e598bc7774c3d08a3a20e9ecc0b59fb2aaaf152f7f",
- "sha256:092a80da1b052a181b6e6c765849c9b32d46c5dac3b81bf8c9b83e697f3cdbe8",
- "sha256:0b9851e798bae024ed1a2a6377a8dab4b8a128a56ed406f572f9f06194e4b275",
- "sha256:179c52eb870110a8c1b460c86d4f696d58510ea025602cd3f81453746fccb94f",
- "sha256:19983b77ec1fc2a210092aa0333ee48811fd9fb5f194c6cd5b927ed409aea5f8",
- "sha256:1d90379d01d0dc50ae9b40c863933d87ff82d51dd7d52cea5d1cb7019afd72cd",
- "sha256:27467fd5af1dcc0a82d72927113b8f92da8f44b2efbdb8906bd76face95b596d",
- "sha256:32702e3bd8bfe12b36226ba9846ed9e22336fc4bd710039d594b36bd432ae255",
- "sha256:33f9e1032095e1436fa9ec424abcbd4c170da934fb70e391c5d78275d0307c75",
- "sha256:36030ca7f4b4519ee4f52a74edc4ec73c75abfb6ea1d80ac7480953d1c0aa3c3",
- "sha256:363fbbf4189722fc46779be1fad2597e2c40b3f577dc618f353a46391cf5d235",
- "sha256:6f302c486132f8dd11f143e919e236ea4467d53bf18c451cac577e6988ecbd05",
- "sha256:733166464598c239323142c071fa4c9b91c14359176e5ae7e202db6bcc1d2eb5",
- "sha256:7cbc3b21ce2f681ca9ad2d8c0901090b23a30c955e980ebf1006d41f37068a95",
- "sha256:888bba7841116e529f407f15c6d28fe3ef0760df8c45257442ec2f14f161c871",
- "sha256:8966829cb0d21a08a3c5ac971a2eb67c3927ae27c247300a8476554cc0ce2ae8",
- "sha256:8bf51191d60f6987482ef0cfe8511bbf4877a5aa7f313d7b488b53189cf26209",
- "sha256:8eb94c0625c529215b53c08fb4e461546e2f3fc96a49c13d5474b5ad7aeab6cf",
- "sha256:8ebba5314c609a05c6955e5773c7e0e57b8dd817e4f751f30de729be58fa5e78",
- "sha256:932a4c101af007cb3132b1f8a9ffef23386acc53dad46536dc5ba43a3235ae02",
- "sha256:ad75fe10bea19ad2188c5cb5fc4cdf53ee808d9b44578c94a3cd1e9fc2beb656",
- "sha256:aeaba399254ca79c299d9fe6aa811d3c3eac61458dee10270de7f4e71c624998",
- "sha256:b178e0923c93393e16646155794521e063ec17b7cc9f943f15b7d4b39776ea2c",
- "sha256:b68e89bb086a9476fa85298caab43f92d0a6af135a5f433d1f6b6d82cafa7b55",
- "sha256:d74cf9234ba76426add5e123449be08993a9b13ff434c6efa3a07caa305a619f",
- "sha256:f3d3a88128f0c219bdc5b2d9ccd496517199660cea021c560a3252116df91cbd",
- "sha256:fe6a7f87356116f5ea840c65b032af17deef0e1a5c34013a2962dd6f99b860dd"
+ "sha256:0b9e48a1c1505699a64ac58815ca99104aacace8321e455072cee4f7fe7b2698",
+ "sha256:0f4c784e1b5a320efb434c66a50b8dd7e30a7dc047e8f45c0a8d2694bfe72781",
+ "sha256:0fdbaa32c9eb09ef09d425dc154628fca6fa69d2f7c1a33f889abb7e0efb3909",
+ "sha256:11fbf688d5c953c0a5ba625cc42dea9aeb2321942c7c5ed9341a68f865dc8cb1",
+ "sha256:19eaac4eb25ab078bd0f28304a0cb08702d120caadfe76bb1e6846ed1f68635e",
+ "sha256:3232ec1a3bf4dba97fbf9b03ce12e4b6c1d01ea3c85773903a67ced725728232",
+ "sha256:36f8f9c216fcca048006f6dd60e4d3e6f406afde26cfb99e063f137070139eaf",
+ "sha256:59c1a0e4f9abe970062ed35d0720935197800a7ef7a62b3a9e3a70588d9ca40b",
+ "sha256:6506c5ff88750948c28d41852c09c5d2a49f51f28c6d90cbf1b6808e18c64e88",
+ "sha256:6bc3e68ee16f571681b8c0b6d5c0a77bef3c589012352b3f0cf5520e674e9d01",
+ "sha256:6dbbd7aabbc861eec6b910522534894d9dbb507d5819bc982032c3ea2e974f51",
+ "sha256:6e737915de826650d1a5f7ff4ac6cf888a26f021a647390ca7bafdba0e85462b",
+ "sha256:6ed9b2cfe85abc720e8943c1808eeffd41daa73e18b7c1e1a228b0b91f768ccc",
+ "sha256:711ec617ba453fdfc66616db2520db3a6d9a891e3bf62ef9aba4c95bb4e61230",
+ "sha256:844dacdf7530c5c612718cf12bc001f59b2d9329d35b495f1ff25045161aa6af",
+ "sha256:86b52e146da13c896e50c5a3341a9448151f1092b1a4153e425d1e8b62fec508",
+ "sha256:985c06c2a0f227131733ae58d6a541a5bc8b665e7305494782bebdb74202b793",
+ "sha256:a86dfe45f4f9c55b1a2312ff20a59b30da8d39c0e8821d00018372a2a177098f",
+ "sha256:aa3cd07f7f7e3183b63d48300666f920828a9dbd7d7ec53d450df2c4953687a9",
+ "sha256:b1964ed645ef8317806d615d9ff006c0dadc09dfc54b99ae67f9ba7a1ec9d5d2",
+ "sha256:b2abbff9e4141484bb89b96eb8eae186d77bc6d5ffbec6b01783ee5c3c467351",
+ "sha256:cc33c3a90492e21713260095f02b12bee02b8d1f2c03a221d763ce04fa90e2e9",
+ "sha256:d7de3bf0986d777807611c36e809b77a13bf1888f5c8db0ebf24b47a52d10726",
+ "sha256:db5e3c52576cc5b93a959a03ccc3b02cb8f0af1fbbdc80645f7a215f0b864f3a",
+ "sha256:e168aa795ffbb11379c942cf95bf813c7db9aa55538eb61de8c6815e092416f5",
+ "sha256:e9ca911f8e2d3117e5241d5fa9aaa991cb22fb0792627eeada47425d706b5ec8",
+ "sha256:eccf962d41ca46e6326b97c8fe0a6687b58dfc1a5f6540ed071ff1474cea749e",
+ "sha256:efa19deae6b9e504a74347fe5e25c2cb9343766c489c2ae921b05f37338b18d1",
+ "sha256:f4b0460a21f784abe17b496f66e74157a6c36116fa86da8bf6aa028b9e8ad5fe",
+ "sha256:f93d508ca64d924d478fb11e272e09524698f0c581d9032e68958cfbdd41faef"
+ ],
+ "index": "pypi",
+ "version": "==2.7.5"
+ },
+ "pykafka": {
+ "hashes": [
+ "sha256:6b075909a52cb0c95325bc16ab797bbcdbb37386652ea460705ed4472ce91459",
+ "sha256:f0bbd394ae6970042a587c99fe4dc0966e67787249d963d4ce2f810dc9490577"
],
"index": "pypi",
- "version": "==2.7.4"
+ "version": "==2.8.0"
+ },
+ "python-dateutil": {
+ "hashes": [
+ "sha256:063df5763652e21de43de7d9e00ccf239f953a832941e37be541614732cdfc93",
+ "sha256:88f9287c0174266bb0d8cedd395cfba9c58e87e5ad86b2ce58859bc11be3cf02"
+ ],
+ "index": "pypi",
+ "version": "==2.7.5"
},
"raven": {
"hashes": [
@@ -166,11 +193,11 @@
},
"requests": {
"hashes": [
- "sha256:6a1b267aa90cac58ac3a765d067950e7dbbf75b1da07e895d1f594193a40a38b",
- "sha256:9c443e7324ba5b85070c4a818ade28bfabedf16ea10206da1132edaa6dda237e"
+ "sha256:99dcfdaaeb17caf6e526f32b6a7b780461512ab3f1d992187801694cba42770c",
+ "sha256:a84b8c9ab6239b578f22d1c21d51b696dcfe004032bb80ea832398d6909d7279"
],
"index": "pypi",
- "version": "==2.18.4"
+ "version": "==2.20.0"
},
"six": {
"hashes": [
@@ -181,17 +208,23 @@
},
"sqlalchemy": {
"hashes": [
- "sha256:2d5f08f714a886a1382c18be501e614bce50d362384dc089474019ce0768151c"
+ "sha256:84412de3794acee05630e7788f25e80e81f78eb4837e7b71d0499129f660486a"
],
"index": "pypi",
- "version": "==1.2.8"
+ "version": "==1.2.13"
+ },
+ "tabulate": {
+ "hashes": [
+ "sha256:e4ca13f26d0a6be2a2915428dc21e732f1e44dad7f76d7030b2ef1ec251cf7f2"
+ ],
+ "version": "==0.8.2"
},
"urllib3": {
"hashes": [
- "sha256:06330f386d6e4b195fbfc736b297f58c5a892e4440e54d294d7004e3a9bbea1b",
- "sha256:cc44da8e1145637334317feebd728bd869a35285b93cbb4cca2577da7e62db4f"
+ "sha256:61bf29cada3fc2fbefad4fdf059ea4bd1b4a86d2b6d15e1c7c0b582b9752fe39",
+ "sha256:de9529817c93f27c8ccbfead6985011db27bd0ddfcdb2d86f3f663385c6a9c22"
],
- "version": "==1.22"
+ "version": "==1.24.1"
},
"werkzeug": {
"hashes": [
@@ -204,24 +237,24 @@
"develop": {
"astroid": {
"hashes": [
- "sha256:032f6e09161e96f417ea7fad46d3fac7a9019c775f202182c22df0e4f714cb1c",
- "sha256:dea42ae6e0b789b543f728ddae7ddb6740ba33a49fb52c4a4d9cb7bb4aa6ec09"
+ "sha256:292fa429e69d60e4161e7612cb7cc8fa3609e2e309f80c224d93a76d5e7b58be",
+ "sha256:c7013d119ec95eb626f7a2011f0b63d0c9a095df9ad06d8507b37084eada1a8d"
],
- "version": "==1.6.4"
+ "version": "==2.0.4"
},
"atomicwrites": {
"hashes": [
- "sha256:240831ea22da9ab882b551b31d4225591e5e447a68c5e188db5b89ca1d487585",
- "sha256:a24da68318b08ac9c9c45029f4a10371ab5b20e4226738e150e6e7c571630ae6"
+ "sha256:0312ad34fcad8fac3704d441f7b317e50af620823353ec657a53e981f92920c0",
+ "sha256:ec9ae8adaae229e4f8446952d204a3e4b5fdd2d099f9be3aaf556120135fb3ee"
],
- "version": "==1.1.5"
+ "version": "==1.2.1"
},
"attrs": {
"hashes": [
- "sha256:4b90b09eeeb9b88c35bc642cbac057e45a5fd85367b985bd2809c62b7b939265",
- "sha256:e0d0eb91441a3b53dab4d9b743eafc1ac44476296a2053b6ca3af0b139faf87b"
+ "sha256:10cbf6e27dbce8c30807caf056c8eb50917e0eaafe86347671b57254006c3e69",
+ "sha256:ca4be454458f9dec299268d472aaa5a11f67a4ff70093396e1ceae9c76cf4bbb"
],
- "version": "==18.1.0"
+ "version": "==18.2.0"
},
"backcall": {
"hashes": [
@@ -232,10 +265,10 @@
},
"certifi": {
"hashes": [
- "sha256:13e698f54293db9f89122b0581843a782ad0934a4fe0172d2a980ba77fc61bb7",
- "sha256:9fa520c1bacfb634fa7af20a76bcbd3d5fb390481724c597da32c719a7dca4b0"
+ "sha256:339dc09518b07e2fa7eda5450740925974815557727d6bd35d319c1524a04a4c",
+ "sha256:6d58c986d22b038c8c0df30d639f23a3e6d172a05c3583e766f4c0b785c0986a"
],
- "version": "==2018.4.16"
+ "version": "==2018.10.15"
},
"chardet": {
"hashes": [
@@ -244,22 +277,19 @@
],
"version": "==3.0.4"
},
- "cookies": {
- "hashes": [
- "sha256:15bee753002dff684987b8df8c235288eb8d45f8191ae056254812dfd42c81d3",
- "sha256:d6b698788cae4cfa4e62ef8643a9ca332b79bd96cb314294b864ae8d7eb3ee8e"
- ],
- "version": "==2.2.1"
- },
"coverage": {
"hashes": [
"sha256:03481e81d558d30d230bc12999e3edffe392d244349a90f4ef9b88425fac74ba",
"sha256:0b136648de27201056c1869a6c0d4e23f464750fd9a9ba9750b8336a244429ed",
+ "sha256:0bf8cbbd71adfff0ef1f3a1531e6402d13b7b01ac50a79c97ca15f030dba6306",
"sha256:104ab3934abaf5be871a583541e8829d6c19ce7bde2923b2751e0d3ca44db60a",
+ "sha256:10a46017fef60e16694a30627319f38a2b9b52e90182dddb6e37dcdab0f4bf95",
"sha256:15b111b6a0f46ee1a485414a52a7ad1d703bdf984e9ed3c288a4414d3871dcbd",
"sha256:198626739a79b09fa0a2f06e083ffd12eb55449b5f8bfdbeed1df4910b2ca640",
"sha256:1c383d2ef13ade2acc636556fd544dba6e14fa30755f26812f54300e401f98f2",
+ "sha256:23d341cdd4a0371820eb2b0bd6b88f5003a7438bbedb33688cd33b8eae59affd",
"sha256:28b2191e7283f4f3568962e373b47ef7f0392993bb6660d079c62bd50fe9d162",
+ "sha256:2a5b73210bad5279ddb558d9a2bfedc7f4bf6ad7f3c988641d83c40293deaec1",
"sha256:2eb564bbf7816a9d68dd3369a510be3327f1c618d2357fa6b1216994c2e3d508",
"sha256:337ded681dd2ef9ca04ef5d93cfc87e52e09db2594c296b4a0a3662cb1b41249",
"sha256:3a2184c6d797a125dca8367878d3b9a178b6fdd05fdc2d35d758c3006a1cd694",
@@ -287,6 +317,7 @@
"sha256:de4418dadaa1c01d497e539210cb6baa015965526ff5afc078c57ca69160108d",
"sha256:e05cb4d9aad6233d67e0541caa7e511fa4047ed7750ec2510d466e806e0255d6",
"sha256:e4d96c07229f58cb686120f168276e434660e4358cc9cf3b0464210b04913e77",
+ "sha256:f05a636b4564104120111800021a92e43397bc12a5c72fed7036be8556e0029e",
"sha256:f3f501f345f24383c0000395b26b726e46758b71393267aeae0bd36f8b3ade80",
"sha256:f8a923a85cb099422ad5a2e345fe877bbc89a8a8b23235824a93488150e45f6e"
],
@@ -301,18 +332,18 @@
},
"idna": {
"hashes": [
- "sha256:2c6a5de3089009e3da7c5dde64a141dbc8551d5b7f6cf4ed7c2568d0cc520a8f",
- "sha256:8c7309c718f94b3a625cb648ace320157ad16ff131ae0af362c9f21b80ef6ec4"
+ "sha256:156a6814fb5ac1fc6850fb002e0852d56c0c8d2531923a51032d1b70760e186e",
+ "sha256:684a38a6f903c1d71d6d5fac066b58d7768af4de2b832e426ec79c30daa94a16"
],
- "version": "==2.6"
+ "version": "==2.7"
},
"ipython": {
"hashes": [
- "sha256:a0c96853549b246991046f32d19db7140f5b1a644cc31f0dc1edc86713b7676f",
- "sha256:eca537aa61592aca2fef4adea12af8e42f5c335004dfa80c78caf80e8b525e5c"
+ "sha256:a5781d6934a3341a1f9acb4ea5acdc7ea0a0855e689dbe755d070ca51e995435",
+ "sha256:b10a7ddd03657c761fc503495bc36471c8158e3fc948573fb9fe82a7029d8efd"
],
"index": "pypi",
- "version": "==6.4.0"
+ "version": "==7.1.1"
},
"ipython-genutils": {
"hashes": [
@@ -331,10 +362,10 @@
},
"jedi": {
"hashes": [
- "sha256:1972f694c6bc66a2fac8718299e2ab73011d653a6d8059790c3476d2353b99ad",
- "sha256:5861f6dc0c16e024cbb0044999f9cf8013b292c05f287df06d3d991a87a4eb89"
+ "sha256:0191c447165f798e6a730285f2eee783fff81b0d3df261945ecb80983b5c3ca7",
+ "sha256:b7493f73a2febe0dc33d51c99b474547f7f6c0b2c8fb2b21f453eef204c12148"
],
- "version": "==0.12.0"
+ "version": "==0.13.1"
},
"lazy-object-proxy": {
"hashes": [
@@ -379,18 +410,26 @@
},
"more-itertools": {
"hashes": [
- "sha256:2b6b9893337bfd9166bee6a62c2b0c9fe7735dcf85948b387ec8cba30e85d8e8",
- "sha256:6703844a52d3588f951883005efcf555e49566a48afd4db4e965d69b883980d3",
- "sha256:a18d870ef2ffca2b8463c0070ad17b5978056f403fb64e3f15fe62a52db21cc0"
+ "sha256:c187a73da93e7a8acc0001572aebc7e3c69daf7bf6881a2cea10650bd4420092",
+ "sha256:c476b5d3a34e12d40130bc2f935028b5f636df8f372dc2c1c01dc19681b2039e",
+ "sha256:fcbfeaea0be121980e15bc97b3817b5202ca73d0eae185b4550cbfce2a3ebb3d"
],
- "version": "==4.2.0"
+ "version": "==4.3.0"
},
"parso": {
"hashes": [
- "sha256:cdef26e8adc10d589f3ec4eb444bd0a29f3f1eb6d72a4292ab8afcb9d68976a6",
- "sha256:f0604a40b96e062b0fd99cf134cc2d5cdf66939d0902f8267d938b0d5b26707f"
+ "sha256:35704a43a3c113cce4de228ddb39aab374b8004f4f2407d070b6a2ca784ce8a2",
+ "sha256:895c63e93b94ac1e1690f5fdd40b65f07c8171e3e53cbd7793b5b96c0e0a7f24"
],
- "version": "==0.2.1"
+ "version": "==0.3.1"
+ },
+ "pathlib2": {
+ "hashes": [
+ "sha256:8eb170f8d0d61825e09a95b38be068299ddeda82f35e96c3301a8a5e7604cb83",
+ "sha256:d1aa2a11ba7b8f7b21ab852b1fb5afb277e1bb99d5dfc663380b5015c0d80c5a"
+ ],
+ "markers": "python_version < '3.6'",
+ "version": "==2.3.2"
},
"pexpect": {
"hashes": [
@@ -409,73 +448,75 @@
},
"pickleshare": {
"hashes": [
- "sha256:84a9257227dfdd6fe1b4be1319096c20eb85ff1e82c7932f36efccfe1b09737b",
- "sha256:c9a2541f25aeabc070f12f452e1f2a8eae2abd51e1cd19e8430402bdf4c1d8b5"
+ "sha256:87683d47965c1da65cdacaf31c8441d12b8044cdec9aca500cd78fc2c683afca",
+ "sha256:9649af414d74d4df115d5d718f82acb59c9d418196b7b4290ed47a12ce62df56"
],
- "version": "==0.7.4"
+ "version": "==0.7.5"
},
"pluggy": {
"hashes": [
- "sha256:7f8ae7f5bdf75671a718d2daf0a64b7885f74510bcd98b1a0bb420eb9a9d0cff",
- "sha256:d345c8fe681115900d6da8d048ba67c25df42973bda370783cd58826442dcd7c",
- "sha256:e160a7fcf25762bb60efc7e171d4497ff1d8d2d75a3d0df7a21b76821ecbf5c5"
+ "sha256:447ba94990e8014ee25ec853339faf7b0fc8050cdc3289d4d71f7f410fb90095",
+ "sha256:bde19360a8ec4dfd8a20dcb811780a30998101f078fc7ded6162f0076f50508f"
],
- "version": "==0.6.0"
+ "version": "==0.8.0"
},
"prompt-toolkit": {
"hashes": [
- "sha256:1df952620eccb399c53ebb359cc7d9a8d3a9538cb34c5a1344bdbeb29fbcc381",
- "sha256:3f473ae040ddaa52b52f97f6b4a493cfa9f5920c255a12dc56a7d34397a398a4",
- "sha256:858588f1983ca497f1cf4ffde01d978a3ea02b01c8a26a8bbc5cd2e66d816917"
+ "sha256:c1d6aff5252ab2ef391c2fe498ed8c088066f66bc64a8d5c095bbf795d9fec34",
+ "sha256:d4c47f79b635a0e70b84fdb97ebd9a274203706b1ee5ed44c10da62755cf3ec9",
+ "sha256:fd17048d8335c1e6d5ee403c3569953ba3eb8555d710bfc548faf0712666ea39"
],
- "version": "==1.0.15"
+ "version": "==2.0.7"
},
"psycopg2": {
"hashes": [
- "sha256:027ae518d0e3b8fff41990e598bc7774c3d08a3a20e9ecc0b59fb2aaaf152f7f",
- "sha256:092a80da1b052a181b6e6c765849c9b32d46c5dac3b81bf8c9b83e697f3cdbe8",
- "sha256:0b9851e798bae024ed1a2a6377a8dab4b8a128a56ed406f572f9f06194e4b275",
- "sha256:179c52eb870110a8c1b460c86d4f696d58510ea025602cd3f81453746fccb94f",
- "sha256:19983b77ec1fc2a210092aa0333ee48811fd9fb5f194c6cd5b927ed409aea5f8",
- "sha256:1d90379d01d0dc50ae9b40c863933d87ff82d51dd7d52cea5d1cb7019afd72cd",
- "sha256:27467fd5af1dcc0a82d72927113b8f92da8f44b2efbdb8906bd76face95b596d",
- "sha256:32702e3bd8bfe12b36226ba9846ed9e22336fc4bd710039d594b36bd432ae255",
- "sha256:33f9e1032095e1436fa9ec424abcbd4c170da934fb70e391c5d78275d0307c75",
- "sha256:36030ca7f4b4519ee4f52a74edc4ec73c75abfb6ea1d80ac7480953d1c0aa3c3",
- "sha256:363fbbf4189722fc46779be1fad2597e2c40b3f577dc618f353a46391cf5d235",
- "sha256:6f302c486132f8dd11f143e919e236ea4467d53bf18c451cac577e6988ecbd05",
- "sha256:733166464598c239323142c071fa4c9b91c14359176e5ae7e202db6bcc1d2eb5",
- "sha256:7cbc3b21ce2f681ca9ad2d8c0901090b23a30c955e980ebf1006d41f37068a95",
- "sha256:888bba7841116e529f407f15c6d28fe3ef0760df8c45257442ec2f14f161c871",
- "sha256:8966829cb0d21a08a3c5ac971a2eb67c3927ae27c247300a8476554cc0ce2ae8",
- "sha256:8bf51191d60f6987482ef0cfe8511bbf4877a5aa7f313d7b488b53189cf26209",
- "sha256:8eb94c0625c529215b53c08fb4e461546e2f3fc96a49c13d5474b5ad7aeab6cf",
- "sha256:8ebba5314c609a05c6955e5773c7e0e57b8dd817e4f751f30de729be58fa5e78",
- "sha256:932a4c101af007cb3132b1f8a9ffef23386acc53dad46536dc5ba43a3235ae02",
- "sha256:ad75fe10bea19ad2188c5cb5fc4cdf53ee808d9b44578c94a3cd1e9fc2beb656",
- "sha256:aeaba399254ca79c299d9fe6aa811d3c3eac61458dee10270de7f4e71c624998",
- "sha256:b178e0923c93393e16646155794521e063ec17b7cc9f943f15b7d4b39776ea2c",
- "sha256:b68e89bb086a9476fa85298caab43f92d0a6af135a5f433d1f6b6d82cafa7b55",
- "sha256:d74cf9234ba76426add5e123449be08993a9b13ff434c6efa3a07caa305a619f",
- "sha256:f3d3a88128f0c219bdc5b2d9ccd496517199660cea021c560a3252116df91cbd",
- "sha256:fe6a7f87356116f5ea840c65b032af17deef0e1a5c34013a2962dd6f99b860dd"
+ "sha256:0b9e48a1c1505699a64ac58815ca99104aacace8321e455072cee4f7fe7b2698",
+ "sha256:0f4c784e1b5a320efb434c66a50b8dd7e30a7dc047e8f45c0a8d2694bfe72781",
+ "sha256:0fdbaa32c9eb09ef09d425dc154628fca6fa69d2f7c1a33f889abb7e0efb3909",
+ "sha256:11fbf688d5c953c0a5ba625cc42dea9aeb2321942c7c5ed9341a68f865dc8cb1",
+ "sha256:19eaac4eb25ab078bd0f28304a0cb08702d120caadfe76bb1e6846ed1f68635e",
+ "sha256:3232ec1a3bf4dba97fbf9b03ce12e4b6c1d01ea3c85773903a67ced725728232",
+ "sha256:36f8f9c216fcca048006f6dd60e4d3e6f406afde26cfb99e063f137070139eaf",
+ "sha256:59c1a0e4f9abe970062ed35d0720935197800a7ef7a62b3a9e3a70588d9ca40b",
+ "sha256:6506c5ff88750948c28d41852c09c5d2a49f51f28c6d90cbf1b6808e18c64e88",
+ "sha256:6bc3e68ee16f571681b8c0b6d5c0a77bef3c589012352b3f0cf5520e674e9d01",
+ "sha256:6dbbd7aabbc861eec6b910522534894d9dbb507d5819bc982032c3ea2e974f51",
+ "sha256:6e737915de826650d1a5f7ff4ac6cf888a26f021a647390ca7bafdba0e85462b",
+ "sha256:6ed9b2cfe85abc720e8943c1808eeffd41daa73e18b7c1e1a228b0b91f768ccc",
+ "sha256:711ec617ba453fdfc66616db2520db3a6d9a891e3bf62ef9aba4c95bb4e61230",
+ "sha256:844dacdf7530c5c612718cf12bc001f59b2d9329d35b495f1ff25045161aa6af",
+ "sha256:86b52e146da13c896e50c5a3341a9448151f1092b1a4153e425d1e8b62fec508",
+ "sha256:985c06c2a0f227131733ae58d6a541a5bc8b665e7305494782bebdb74202b793",
+ "sha256:a86dfe45f4f9c55b1a2312ff20a59b30da8d39c0e8821d00018372a2a177098f",
+ "sha256:aa3cd07f7f7e3183b63d48300666f920828a9dbd7d7ec53d450df2c4953687a9",
+ "sha256:b1964ed645ef8317806d615d9ff006c0dadc09dfc54b99ae67f9ba7a1ec9d5d2",
+ "sha256:b2abbff9e4141484bb89b96eb8eae186d77bc6d5ffbec6b01783ee5c3c467351",
+ "sha256:cc33c3a90492e21713260095f02b12bee02b8d1f2c03a221d763ce04fa90e2e9",
+ "sha256:d7de3bf0986d777807611c36e809b77a13bf1888f5c8db0ebf24b47a52d10726",
+ "sha256:db5e3c52576cc5b93a959a03ccc3b02cb8f0af1fbbdc80645f7a215f0b864f3a",
+ "sha256:e168aa795ffbb11379c942cf95bf813c7db9aa55538eb61de8c6815e092416f5",
+ "sha256:e9ca911f8e2d3117e5241d5fa9aaa991cb22fb0792627eeada47425d706b5ec8",
+ "sha256:eccf962d41ca46e6326b97c8fe0a6687b58dfc1a5f6540ed071ff1474cea749e",
+ "sha256:efa19deae6b9e504a74347fe5e25c2cb9343766c489c2ae921b05f37338b18d1",
+ "sha256:f4b0460a21f784abe17b496f66e74157a6c36116fa86da8bf6aa028b9e8ad5fe",
+ "sha256:f93d508ca64d924d478fb11e272e09524698f0c581d9032e68958cfbdd41faef"
],
"index": "pypi",
- "version": "==2.7.4"
+ "version": "==2.7.5"
},
"ptyprocess": {
"hashes": [
- "sha256:e64193f0047ad603b71f202332ab5527c5e52aa7c8b609704fc28c0dc20c4365",
- "sha256:e8c43b5eee76b2083a9badde89fd1bbce6c8942d1045146e100b7b5e014f4f1a"
+ "sha256:923f299cc5ad920c68f2bc0bc98b75b9f838b93b599941a6b63ddbc2476394c0",
+ "sha256:d7cc528d76e76342423ca640335bd3633420dc1366f258cb31d05e865ef5ca1f"
],
- "version": "==0.5.2"
+ "version": "==0.6.0"
},
"py": {
"hashes": [
- "sha256:29c9fab495d7528e80ba1e343b958684f4ace687327e6f789a94bf3d1915f881",
- "sha256:983f77f3331356039fdd792e9220b7b8ee1aa6bd2b25f567a963ff1de5a64f6a"
+ "sha256:bf92637198836372b520efcba9e020c330123be8ce527e535d185ed4b6f45694",
+ "sha256:e76826342cefe3c3d5f7e8ee4316b80d1dd8a300781612ddbc765c17ba25a6c6"
],
- "version": "==1.5.3"
+ "version": "==1.7.0"
},
"pygments": {
"hashes": [
@@ -486,56 +527,50 @@
},
"pylint": {
"hashes": [
- "sha256:aa519865f8890a5905fa34924fed0f3bfc7d84fc9f9142c16dac52ffecd25a39",
- "sha256:c353d8225195b37cc3aef18248b8f3fe94c5a6a95affaf885ae21a24ca31d8eb"
+ "sha256:1d6d3622c94b4887115fe5204982eee66fdd8a951cf98635ee5caee6ec98c3ec",
+ "sha256:31142f764d2a7cd41df5196f9933b12b7ee55e73ef12204b648ad7e556c119fb"
],
"index": "pypi",
- "version": "==1.9.1"
+ "version": "==2.1.1"
},
"pytest": {
"hashes": [
- "sha256:39555d023af3200d004d09e51b4dd9fdd828baa863cded3fd6ba2f29f757ae2d",
- "sha256:c76e93f3145a44812955e8d46cdd302d8a45fbfc7bf22be24fe231f9d8d8853a"
+ "sha256:a9e5e8d7ab9d5b0747f37740276eb362e6a76275d76cebbb52c6049d93b475db",
+ "sha256:bf47e8ed20d03764f963f0070ff1c8fda6e2671fc5dd562a4d3b7148ad60f5ca"
],
"index": "pypi",
- "version": "==3.6.0"
+ "version": "==3.9.3"
},
"pytest-cov": {
"hashes": [
- "sha256:03aa752cf11db41d281ea1d807d954c4eda35cfa1b21d6971966cc041bbf6e2d",
- "sha256:890fe5565400902b0c78b5357004aab1c814115894f4f21370e2433256a3eeec"
+ "sha256:513c425e931a0344944f84ea47f3956be0e416d95acbd897a44970c8d926d5d7",
+ "sha256:e360f048b7dae3f2f2a9a4d067b2dd6b6a015d384d1577c994a43f3f7cbad762"
],
"index": "pypi",
- "version": "==2.5.1"
+ "version": "==2.6.0"
},
"pytest-pythonpath": {
"hashes": [
- "sha256:f3d46b0a8276e856f7dc4f70ca97b88be6fbcf52d57ce36e35057d502388265e"
+ "sha256:63fc546ace7d2c845c1ee289e8f7a6362c2b6bae497d10c716e58e253e801d62"
],
"index": "pypi",
- "version": "==0.7.2"
+ "version": "==0.7.3"
},
"requests": {
"hashes": [
- "sha256:6a1b267aa90cac58ac3a765d067950e7dbbf75b1da07e895d1f594193a40a38b",
- "sha256:9c443e7324ba5b85070c4a818ade28bfabedf16ea10206da1132edaa6dda237e"
+ "sha256:99dcfdaaeb17caf6e526f32b6a7b780461512ab3f1d992187801694cba42770c",
+ "sha256:a84b8c9ab6239b578f22d1c21d51b696dcfe004032bb80ea832398d6909d7279"
],
"index": "pypi",
- "version": "==2.18.4"
+ "version": "==2.20.0"
},
"responses": {
"hashes": [
- "sha256:c6082710f4abfb60793899ca5f21e7ceb25aabf321560cc0726f8b59006811c9",
- "sha256:f23a29dca18b815d9d64a516b4a0abb1fbdccff6141d988ad8100facb81cf7b3"
+ "sha256:682fafb124e799eeee67ec15c9678d955a88affda5613b09788ef80c03987cf0",
+ "sha256:9b1c14871c66329f509711627e3de5779a2ae50bd532ac162297623424288756"
],
"index": "pypi",
- "version": "==0.9.0"
- },
- "simplegeneric": {
- "hashes": [
- "sha256:dc972e06094b9af5b855b3df4a646395e43d1c9d0d39ed345b7393560d0b9173"
- ],
- "version": "==0.8.1"
+ "version": "==0.10.2"
},
"six": {
"hashes": [
@@ -551,12 +586,41 @@
],
"version": "==4.3.2"
},
+ "typed-ast": {
+ "hashes": [
+ "sha256:0948004fa228ae071054f5208840a1e88747a357ec1101c17217bfe99b299d58",
+ "sha256:10703d3cec8dcd9eef5a630a04056bbc898abc19bac5691612acba7d1325b66d",
+ "sha256:1f6c4bd0bdc0f14246fd41262df7dfc018d65bb05f6e16390b7ea26ca454a291",
+ "sha256:25d8feefe27eb0303b73545416b13d108c6067b846b543738a25ff304824ed9a",
+ "sha256:29464a177d56e4e055b5f7b629935af7f49c196be47528cc94e0a7bf83fbc2b9",
+ "sha256:2e214b72168ea0275efd6c884b114ab42e316de3ffa125b267e732ed2abda892",
+ "sha256:3e0d5e48e3a23e9a4d1a9f698e32a542a4a288c871d33ed8df1b092a40f3a0f9",
+ "sha256:519425deca5c2b2bdac49f77b2c5625781abbaf9a809d727d3a5596b30bb4ded",
+ "sha256:57fe287f0cdd9ceaf69e7b71a2e94a24b5d268b35df251a88fef5cc241bf73aa",
+ "sha256:668d0cec391d9aed1c6a388b0d5b97cd22e6073eaa5fbaa6d2946603b4871efe",
+ "sha256:68ba70684990f59497680ff90d18e756a47bf4863c604098f10de9716b2c0bdd",
+ "sha256:6de012d2b166fe7a4cdf505eee3aaa12192f7ba365beeefaca4ec10e31241a85",
+ "sha256:79b91ebe5a28d349b6d0d323023350133e927b4de5b651a8aa2db69c761420c6",
+ "sha256:8550177fa5d4c1f09b5e5f524411c44633c80ec69b24e0e98906dd761941ca46",
+ "sha256:898f818399cafcdb93cbbe15fc83a33d05f18e29fb498ddc09b0214cdfc7cd51",
+ "sha256:94b091dc0f19291adcb279a108f5d38de2430411068b219f41b343c03b28fb1f",
+ "sha256:a26863198902cda15ab4503991e8cf1ca874219e0118cbf07c126bce7c4db129",
+ "sha256:a8034021801bc0440f2e027c354b4eafd95891b573e12ff0418dec385c76785c",
+ "sha256:bc978ac17468fe868ee589c795d06777f75496b1ed576d308002c8a5756fb9ea",
+ "sha256:c05b41bc1deade9f90ddc5d988fe506208019ebba9f2578c622516fd201f5863",
+ "sha256:c9b060bd1e5a26ab6e8267fd46fc9e02b54eb15fffb16d112d4c7b1c12987559",
+ "sha256:edb04bdd45bfd76c8292c4d9654568efaedf76fe78eb246dde69bdb13b2dad87",
+ "sha256:f19f2a4f547505fe9072e15f6f4ae714af51b5a681a97f187971f50c283193b6"
+ ],
+ "markers": "python_version < '3.7' and implementation_name == 'cpython'",
+ "version": "==1.1.0"
+ },
"urllib3": {
"hashes": [
- "sha256:06330f386d6e4b195fbfc736b297f58c5a892e4440e54d294d7004e3a9bbea1b",
- "sha256:cc44da8e1145637334317feebd728bd869a35285b93cbb4cca2577da7e62db4f"
+ "sha256:61bf29cada3fc2fbefad4fdf059ea4bd1b4a86d2b6d15e1c7c0b582b9752fe39",
+ "sha256:de9529817c93f27c8ccbfead6985011db27bd0ddfcdb2d86f3f663385c6a9c22"
],
- "version": "==1.22"
+ "version": "==1.24.1"
},
"wcwidth": {
"hashes": [
diff --git a/python/fatcat/changelog_workers.py b/python/fatcat/changelog_workers.py
new file mode 100644
index 00000000..e341ea32
--- /dev/null
+++ b/python/fatcat/changelog_workers.py
@@ -0,0 +1,122 @@
+
+import json
+import time
+from itertools import islice
+from fatcat.worker_common import FatcatWorker
+from pykafka.common import OffsetType
+
+
+class FatcatChangelogWorker(FatcatWorker):
+ """
+ Periodically polls the fatcat API looking for new changelogs. When they are
+ found, fetch them and push (as JSON) into a Kafka topic.
+ """
+
+ def __init__(self, api_host_url, kafka_hosts, produce_topic, poll_interval=10.0, offset=None):
+ # TODO: should be offset=0
+ super().__init__(kafka_hosts=kafka_hosts,
+ produce_topic=produce_topic,
+ api_host_url=api_host_url)
+ self.poll_interval = poll_interval
+ self.offset = offset # the fatcat changelog offset, not the kafka offset
+
+ def most_recent_message(self, topic):
+ """
+ Tries to fetch the most recent message from a given topic.
+ This only makes sense for single partition topics, though could be
+ extended with "last N" behavior.
+
+ Following "Consuming the last N messages from a topic"
+ from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns
+ """
+ consumer = topic.get_simple_consumer(
+ auto_offset_reset=OffsetType.LATEST,
+ reset_offset_on_start=True)
+ offsets = [(p, op.last_offset_consumed - 1)
+ for p, op in consumer._partitions.items()]
+ offsets = [(p, (o if o > -1 else -2)) for p, o in offsets]
+ if -2 in [o for p, o in offsets]:
+ return None
+ else:
+ consumer.reset_offsets(offsets)
+ msg = islice(consumer, 1)
+ if msg:
+ return list(msg)[0].value
+ else:
+ return None
+
+ def run(self):
+ topic = self.kafka.topics[self.produce_topic]
+ # On start, try to consume the most recent from the topic, and using
+ # that as the starting offset. Note that this is a single-partition
+ # topic
+ if self.offset is None:
+ print("Checking for most recent changelog offset...")
+ msg = self.most_recent_message(topic)
+ if msg:
+ self.offset = json.loads(msg.decode('utf-8'))['index']
+ else:
+ self.offset = 1
+
+ with topic.get_sync_producer() as producer:
+ while True:
+ latest = int(self.api.get_changelog(limit=1)[0].index)
+ if latest > self.offset:
+ print("Fetching changelogs from {} through {}".format(
+ self.offset+1, latest))
+ for i in range(self.offset+1, latest+1):
+ cle = self.api.get_changelog_entry(i)
+ obj = self.api.api_client.sanitize_for_serialization(cle)
+ producer.produce(
+ message=json.dumps(obj).encode('utf-8'),
+ partition_key=None,
+ timestamp=None,
+ #XXX: timestamp=cle.timestamp,
+ )
+ self.offset = i
+ print("Sleeping {} seconds...".format(self.poll_interval))
+ time.sleep(self.poll_interval)
+
+
+class FatcatEntityUpdatesWorker(FatcatWorker):
+ """
+ Consumes from the changelog topic and publishes expanded entities (fetched
+ from API) to update topics.
+
+ For now, only release updates are published.
+ """
+
+ def __init__(self, api_host_url, kafka_hosts, consume_topic, release_topic):
+ super().__init__(kafka_hosts=kafka_hosts,
+ consume_topic=consume_topic,
+ api_host_url=api_host_url)
+ self.release_topic = release_topic
+ self.consumer_group = "entity-updates"
+
+ def run(self):
+ changelog_topic = self.kafka.topics[self.consume_topic]
+ release_topic = self.kafka.topics[self.release_topic]
+
+ consumer = changelog_topic.get_balanced_consumer(
+ consumer_group=self.consumer_group,
+ managed=True,
+ auto_offset_reset=OffsetType.LATEST,
+ reset_offset_on_start=False,
+ )
+
+ with release_topic.get_sync_producer() as producer:
+ for msg in consumer:
+ cle = json.loads(msg.value.decode('utf-8'))
+ #print(cle)
+ release_edits = cle['editgroup']['edits']['releases']
+ for re in release_edits:
+ ident = re['ident']
+ release = self.api.get_release(ident, expand="files,container")
+ release_dict = self.api.api_client.sanitize_for_serialization(release)
+ producer.produce(
+ message=json.dumps(release_dict).encode('utf-8'),
+ partition_key=ident.encode('utf-8'),
+ timestamp=None,
+ )
+ consumer.commit_offsets()
+
diff --git a/python/fatcat/elastic_workers.py b/python/fatcat/elastic_workers.py
new file mode 100644
index 00000000..3d2e9c39
--- /dev/null
+++ b/python/fatcat/elastic_workers.py
@@ -0,0 +1,47 @@
+
+import json
+import time
+import requests
+from fatcat.worker_common import FatcatWorker
+from fatcat_client.models import ReleaseEntity
+from fatcat.entity_helpers import *
+from pykafka.common import OffsetType
+
+
+class FatcatElasticReleaseWorker(FatcatWorker):
+ """
+ Consumes from release-updates topic and pushes into (presumably local)
+ elasticsearch.
+
+ Uses a consumer group to manage offset.
+ """
+
+ def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
+ elastic_backend="http://localhost:9200", elastic_index="fatcat"):
+ super().__init__(kafka_hosts=kafka_hosts,
+ consume_topic=consume_topic,
+ api_host_url=None)
+ self.consumer_group = "elastic-updates"
+ self.elastic_backend = elastic_backend
+ self.elastic_index = elastic_index
+
+ def run(self):
+ consume_topic = self.kafka.topics[self.consume_topic]
+
+ consumer = consume_topic.get_balanced_consumer(
+ consumer_group=self.consumer_group,
+ managed=True,
+ )
+
+ for msg in consumer:
+ json_str = msg.value.decode('utf-8')
+ release = entity_from_json(json_str, ReleaseEntity)
+ #print(release)
+ elastic_endpoint = "{}/{}/release/{}".format(
+ self.elastic_backend,
+ self.elastic_index,
+ release.ident)
+ print("Updating document: {}".format(elastic_endpoint))
+ resp = requests.post(elastic_endpoint, json=release.to_elastic_dict())
+ assert resp.status_code in (200, 201)
+ consumer.commit_offsets()
diff --git a/python/fatcat/entity_helpers.py b/python/fatcat/entity_helpers.py
new file mode 100644
index 00000000..c454536b
--- /dev/null
+++ b/python/fatcat/entity_helpers.py
@@ -0,0 +1,100 @@
+
+import collections
+from fatcat_client.models import ReleaseEntity
+from fatcat_client.api_client import ApiClient
+
+def entity_to_json(entity):
+ ac = ApiClient()
+ return ac.sanitize_for_serialization(entity)
+
+def entity_from_json(json_str, entity_type):
+ """
+ Hack to take advantage of the code-generated deserialization code
+ """
+ ac = ApiClient()
+ thing = collections.namedtuple('Thing', ['data'])
+ thing.data = json_str
+ return ac.deserialize(thing, entity_type)
+
+def release_elastic_dict(release):
+ """
+ Converts from an entity model/schema to elasticsearch oriented schema.
+
+ Returns: dict
+ """
+
+ if release.state != 'active':
+ raise ValueError("Entity is not 'active'")
+
+ # First, the easy ones (direct copy)
+ t = dict(
+ ident = release.ident,
+ revision = release.revision,
+ title = release.title,
+ release_type = release.release_type,
+ release_status = release.release_status,
+ language = release.language,
+ doi = release.doi,
+ pmid = release.pmid,
+ pmcid = release.pmcid,
+ isbn13 = release.isbn13,
+ core_id = release.core_id,
+ wikidata_qid = release.wikidata_qid
+ )
+
+ if release.release_date:
+ # TODO: resolve why this can be either a string or datetime
+ if type(release.release_date) == str:
+ t['release_date'] = release.release_date
+ else:
+ t['release_date'] = release.release_date.strftime('%F')
+
+ container = release.container
+ container_is_kept = False
+ if container:
+ t['publisher'] = container.publisher
+ t['container_name'] = container.name
+ t['container_issnl'] = container.issnl
+ container_extra = container.extra
+ if container_extra:
+ t['container_is_oa'] = container_extra.get('is_oa')
+ container_is_kept = container_extra.get('is_kept', False)
+ t['container_is_longtail_oa'] = container_extra.get('is_longtail_oa')
+ else:
+ t['publisher'] = release.publisher
+
+ files = release.files or []
+ t['file_count'] = len(files)
+ in_wa = False
+ in_ia = False
+ t['file_pdf_url'] = None
+ for f in files:
+ is_pdf = 'pdf' in f.get('mimetype', '')
+ for url in f.get('urls', []):
+ if url.get('rel', '') == 'webarchive':
+ in_wa = True
+ if '//web.archive.org/' in url['url'] or '//archive.org/' in url['url']:
+ in_ia = True
+ if is_pdf:
+ t['file_pdf_url'] = url['url']
+ if not t['file_pdf_url'] and is_pdf:
+ t['file_pdf_url'] = url['url']
+ t['file_in_webarchive'] = in_wa
+ t['file_in_ia'] = in_ia
+
+ extra = release.extra or dict()
+ if extra:
+ t['in_shadow'] = extra.get('in_shadow')
+ if extra.get('grobid') and extra['grobid'].get('is_longtail_oa'):
+ t['container_is_longtail_oa'] = True
+ t['any_abstract'] = bool(release.abstracts)
+ t['is_kept'] = container_is_kept or extra.get('is_kept', False)
+
+ t['ref_count'] = len(release.refs or [])
+ t['contrib_count'] = len(release.contribs or [])
+ contrib_names = []
+ for c in (release.contribs or []):
+ if c.raw_name:
+ contrib_names.append(c.raw_name)
+ t['contrib_names'] = contrib_names
+ return t
diff --git a/python/fatcat/worker_common.py b/python/fatcat/worker_common.py
new file mode 100644
index 00000000..77ea2c15
--- /dev/null
+++ b/python/fatcat/worker_common.py
@@ -0,0 +1,25 @@
+
+import re
+import sys
+import csv
+import json
+import itertools
+import fatcat_client
+from pykafka import KafkaClient
+from fatcat_client.rest import ApiException
+
+
+class FatcatWorker:
+ """
+ Common code for for Kafka producers and consumers.
+ """
+
+ def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api_host_url=None):
+ if api_host_url:
+ conf = fatcat_client.Configuration()
+ conf.host = api_host_url
+ self.api = fatcat_client.DefaultApi(fatcat_client.ApiClient(conf))
+ self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0")
+ self.produce_topic = produce_topic
+ self.consume_topic = consume_topic
+
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py
new file mode 100755
index 00000000..50ff0fb7
--- /dev/null
+++ b/python/fatcat_worker.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python3
+
+import sys
+import argparse
+from fatcat.changelog_workers import FatcatChangelogWorker, FatcatEntityUpdatesWorker
+from fatcat.elastic_workers import FatcatElasticReleaseWorker
+
+def run_changelog_worker(args):
+ topic = "fatcat-{}.changelog".format(args.env)
+ worker = FatcatChangelogWorker(args.api_host_url, args.kafka_hosts, topic,
+ args.poll_interval)
+ worker.run()
+
+def run_entity_updates_worker(args):
+ changelog_topic = "fatcat-{}.changelog".format(args.env)
+ release_topic = "fatcat-{}.release-updates".format(args.env)
+ worker = FatcatEntityUpdatesWorker(args.api_host_url, args.kafka_hosts,
+ changelog_topic, release_topic)
+ worker.run()
+
+def run_elastic_release_worker(args):
+ consume_topic = "fatcat-{}.release-updates".format(args.env)
+ worker = FatcatElasticReleaseWorker(args.kafka_hosts,
+ consume_topic)
+ worker.run()
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument('--debug',
+ action='store_true',
+ help="enable debug logging")
+ parser.add_argument('--api-host-url',
+ default="http://localhost:9411/v0",
+ help="fatcat API host/port to use")
+ parser.add_argument('--kafka-hosts',
+ default="localhost:9092",
+ help="list of Kafka brokers (host/port) to use")
+ parser.add_argument('--env',
+ default="qa",
+ help="Kafka topic namespace to use (eg, prod, qa)")
+ subparsers = parser.add_subparsers()
+
+ sub_changelog = subparsers.add_parser('changelog')
+ sub_changelog.set_defaults(func=run_changelog_worker)
+ sub_changelog.add_argument('--poll-interval',
+ help="how long to wait between polling (seconds)",
+ default=10.0, type=float)
+
+ sub_entity_updates = subparsers.add_parser('entity-updates')
+ sub_entity_updates.set_defaults(func=run_entity_updates_worker)
+
+ sub_elastic_release = subparsers.add_parser('elastic-release')
+ sub_elastic_release.set_defaults(func=run_elastic_release_worker)
+
+ args = parser.parse_args()
+ if not args.__dict__.get("func"):
+ print("tell me what to do!")
+ sys.exit(-1)
+ args.func(args)
+
+if __name__ == '__main__':
+ main()
diff --git a/python/tests/entity_helpers.py b/python/tests/entity_helpers.py
new file mode 100644
index 00000000..dd6fa00a
--- /dev/null
+++ b/python/tests/entity_helpers.py
@@ -0,0 +1,15 @@
+
+import json
+import pytest
+from fatcat.crossref_importer import FatcatCrossrefImporter
+from fatcat.entity_helpers import *
+
+from crossref import crossref_importer
+
+def test_elastic_convert(crossref_importer):
+ with open('tests/files/crossref-works.single.json', 'r') as f:
+ # not a single line
+ raw = json.loads(f.read())
+ (r, c) = crossref_importer.parse_crossref_dict(raw)
+ r.state = 'active'
+ release_elastic_dict(r)