Celery tasks: scraping and processing

Celery runs scheduled tasks, primarily to scrape and process data. To run celery, install Redis (as a taskrunner and db for Celery). The other dependencies are installed with pip when initially installing the requirements for the app with pip install -r requirements.txt.

Redis: install and start

  • On Ubuntu:

Install

$ sudo apt update
$ sudo apt install redis-server
Reading package lists... Done
Fetched 634 kB in 0s (24.3 MB/s)...

Start sudo systemctl restart redis.service

To confirm that it is running: sudo systemctl status redis

If necessary, edit /etc/redis/redis.conf. Our set-up should not require any special settings

How to run celery task.

Development

For a celery worker, open one terminal, go to the Django project root directory (in our case …​/Flatgov/server_py/flatgov), then activate the virtual environment.

Run the command below (Run the celery worker).

celery worker -Q bill -A flatgov.celery:app -n flatgov.%%h --loglevel=info

For celery scheduler, open another terminal, go to the Django project root directory (in our case …​/Flatgov/server_py/flatgov), then activate the virtual environment.

Run the command below (Run the celery redbeat)

celery beat -S redbeat.RedBeatScheduler -A flatgov.celery:app --loglevel=info

Then the background tasks (celery tasks ) run daily at midnight.

Production: Run Celeryd as a daemon on the Ubuntu server

  1. Init-script: celeryd

Before configuring it, go to the deployment_scripts/conf_celeryd and update all the paths with the absolute paths

Copy deployment_scripts/bill_celeryd file to /etc/init.d/celeryd.

Make celeryd executable (Run following commands from the terminal.)

sudo chmod 755 /etc/init.d/celeryd

sudo chown root:root /etc/init.d/celeryd

For configuration, copy deployment_scripts/conf_celeryd file to /etc/default/celeryd.

You can check if the worker is active by:

(flatgov) ubuntu:/opt/flatgov/FlatGov/server_py/flatgov/flatgov$  sudo chown -R ubuntu:root /var/run/celery/
(flatgov) ubuntu/opt/flatgov/FlatGov/server_py/flatgov/flatgov$ sudo chown -R ubuntu:root /var/log/celery/
(flatgov) ubuntu:/opt/flatgov/FlatGov/server_py/flatgov/flatgov$ sudo /etc/init.d/celeryd start
celery init v10.1.
Using config script: /etc/default/celeryd
celery multi v4.4.2 (cliffs)
> Starting nodes...
	> celery@ip-172-31-58-205: OK
Note
On Ubuntu, using the default ubuntu user, the settings are as follows.
CELERY_BIN="/home/ubuntu/.pyenv/versions/flatgov/bin/celery"
CELERY_APP="flatgov.celery:app"

CELERYD_CHDIR="/opt/flatgov/FlatGov/server_py/flatgov/"
CELERYD_OPTS="--time-limit=300 --concurrency=3 -Q bill -l INFO"
CELERYD_LOG_FILE="/var/log/celery/link_%n%I.log"
CELERYD_PID_FILE="/var/run/celery/link_%n.pid"
CELERYD_USER="ubuntu"
CELERYD_GROUP="ubuntu"
CELERY_CREATE_DIRS=1

To test:

(flatgov) ubuntu:/opt/flatgov/FlatGov/server_py/flatgov/flatgov$ sudo /etc/init.d/celeryd status
celery init v10.1.
Using config script: /etc/default/celeryd
celeryd (node link_celery) (pid 26679) is up...

+ 2. Init-script: celerybeat

Before configuring it, go to the deployment_scripts/celerybeat and update all the paths with the absolute paths

Copy deployment_scripts/celerybeat file to /etc/init.d/celerybeat.

Make celerybeat executable (Run following commands from the terminal.)

sudo chmod 755 /etc/init.d/celerybeat

sudo chown root:root /etc/init.d/celerybeat

For configuration, copy deployment_scripts/conf_celerybeat file to /etc/default/celerybeat.

You can check if the beat is active by:

sudo /etc/init.d/celerybeat start

sudo /etc/init.d/celerybeat status

+ 3. Maintenance

As was shown, the following commands control worker and beat:

/etc/init.d/celeryd {start|stop|restart}

/etc/init.d/celerybeat {start|stop|restart}

+ 4. Run a task manually

If you need to run a task manually (e.g. to test, or to get data off schedule), run a separate Celery worker:

(flatgov) ubuntu:/opt/flatgov/FlatGov/server_py/flatgov$ celery worker -Q bill -A flatgov.celery:app -n flatgov.%%h --loglevel=info

 -------------- celery@flatgov.%ip-... v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Linux-5.4.0-1041-aws-x86_64-with-glibc2.27 2021-04-01 18:18:04
- *** --- * ---
- ** ---------- [config]

Then in a separate terminal run pyenv activate flatgov. Then:

(flatgov) ubuntu:/opt/flatgov/FlatGov/server_py/flatgov$ python manage.py shell
Python 3.8.3 (default, Sep 24 2020, 22:52:34)
[GCC 7.5.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> from bills.tasks import sap_scrapy_task
>>> from celery import current_app
>>> current_app.send_task('bills.tasks.sap_scrapy_task', queue='bill')
<AsyncResult: a5d7d336-0125-4bdf-8819-5628b2341081>

OR for the uscongress update task:

>>> from uscongress.tasks import update_bill_task
>>> from celery import current_app
>>> current_app.send_task('uscongress.tasks.update_bill_task', queue='bill')
<AsyncResult: f05d3449-d473-498f-b6f0-87f663cd20e3>

Then you can track the task by looking in the celery logs, or on the original celery terminal, e.g.:

2021-04-01 18:27:47,069: WARNING/ForkPoolWorker-1] 2021-04-01 18:27:47 [scrapy.statscollectors] INFO: Dumping Scrapy stats:
{'downloader/request_bytes': 486,
 'downloader/request_count': 2,
 'downloader/request_method_count/GET': 2,
 'downloader/response_bytes': 27570,
 'downloader/response_count': 2,
 'downloader/response_status_count/200': 2,
 'elapsed_time_seconds': 0.393221,
 'finish_reason': 'finished',
 'finish_time': datetime.datetime(2021, 4, 1, 18, 27, 47, 68878),
 'item_scraped_count': 10,
 'log_count/DEBUG': 12,
 'log_count/INFO': 10,
 'log_count/WARNING': 22,
 'memusage/max': 83107840,
 'memusage/startup': 83107840,
 'response_received_count': 2,
 'robotstxt/request_count': 1,
 'robotstxt/response_count': 1,
 'robotstxt/response_status_count/200': 1,
 'scheduler/dequeued': 1,
 'scheduler/dequeued/memory': 1,
 'scheduler/enqueued': 1,
 'scheduler/enqueued/memory': 1,
 'start_time': datetime.datetime(2021, 4, 1, 18, 27, 46, 675657)}
[2021-04-01 18:27:47,069: INFO/ForkPoolWorker-1] Spider closed (finished)

For a different task, e.g. CommitteeDocument, the commands are: celery worker -Q bill -A flatgov.celery:app -n flatgov.%%h --loglevel=info

  • Open another shell and run django shell →

python manage.py shell
from celery import current_app
current_app.send_task("bills.tasks.committee_report_scrapy_task", queue="bill")

Then you can keep track of the task status on the terminal that celery is running on or you can see the CommitteeDocument records in the django admin dashboard. The initial data loading will take a long time; there are about 17,000 records.

Celery Tasks - What they do.

Celery tasks:

uscongress

update uscongress bill and metadata

statementAdminPolicy

updates statements of administration policy from the current White House (OMB) website

committeereport

update committee reports associated with bills

cbo

update cbo reports associated with bills

crs

update crs reports associated with bills

US Congress Task

Within the US Congress task, there are six celery tasks, that run sequentially: update_bill_task, bill_data_task, process_bill_meta_task, related_bill_task, elastic_load_task, bill_similarity_task

  1. Download uscongress bill and metadata using sitemaps to efficiently determine what needs to be updated.

With the open source scraper itself, we run ./run govinfo --collections=BILLS --congress=117 --extract=mods,xml,premis --bulkdata=BILLSTATUS

Then ./run bills

It will create data.json out of data.xml and text_versions.

Instead of open source scraper command, we are using celery task named update_bill_task to download new bill metadata and text.

The celery task is running daily at midnight in the server background.

Whenever celery task started running, it creates UscongressUpdateJob table record in the database to track the task status.

The fields in the UscongressUpdateJob:

  • job_id : celery task id

  • fdsys_status : choice field (pending, success, failed) : It represents the status of uscongress bill download (fdsys and text versions) what the celery task update_bill_task does. Once it’s finished, the field value turns to success or failed

  • saved : the list of bill congress numbers downloaded by running the celery task update_bill_task.

  • skips : the list of bill congress numbers skipped by running the celery task update_bill_task.

  • data_status : choice field (pending, success, failed) : Once download is finished, the celery task update_bill_task creates data.json out of data.xml and text_versions that is exactly same as the ./run bills does.

  • bill_status : choice field (pending, success, failed) : After creating data.json, we creates billList.json and billsMeta.json by running another celery task named bill_data_task. the field represents the status of the celery task to see if it’s finished (succeed or failed)

  • meta_status : choice field (pending, success, failed) : After creating billList.json and billsMeta.json, we process the metadata by running the celery task named process_bill_meta_task. It represents the status of the task to see if it’s finished or in pending.

  • related_status : choice field (pending, success, failed) : Once the celery task process_bill_meta_task is finished, the other celery task named related_bill_task to get related bills. This field represents the status of the celery task.

  • elastic_status : choice field (pending, success, failed) : Once the task related_bill_task is finished, the other celery task named elastic_load_task runs in order to update loading of the new bills into Elasticsearch. This field represents the status of the task.

  • similarity : choice field (pending, success, failed) : After finishing elastic_load_task, we update es_similarity field of each bill in the database. This is the field of the task status.

  • created : the date time field represents the time when the record is created.

    1. Create billList.json and billsMeta.json

Once the celery task update_bill_task is finished (complete the download bill text and metadata), the other celery task named bill_data_task runs. This is triggered in the save function of the uscongress.models.py:

 def save(self, *args, **kwargs):
        super().save(*args, **kwargs)
        if self.pk and self.data_status == self.SUCCESS and self.bill_status == self.PENDING:
            current_app.send_task(
                'uscongress.tasks.bill_data_task',
                args=(self.pk, ),
                queue='bill'
            )
        if self.pk and self.bill_status == self.SUCCESS and self.meta_status == self.PENDING:
            current_app.send_task(
                'uscongress.tasks.process_bill_meta_task',
                args=(self.pk, ),
                queue='bill'
            )
			...

As mentioned above, whenever the update_bill_task runs, it creates UscongressUpdateJob table record in the database.

In saved fields, there is the list of bill congress number that represents which bills are newly downloaded.

The task bill_data_task creates billList.json and billsMeta.json file with the list in saved field and dump related bill json files.

data.json files at the top level (not the data.json in the text versions) are used to create metadata.

We will provide an option, which will be the default, to get bill XML from the directory structure that is created by uscongress open source scraper. (In other case, flat structure could be used to get bill XML)

  1. Process bill meta data.

After completing bill_data_task, process_bill_meta_task runs. This processes and organizes the metadata for bills.

  1. Create Related bills.

Next, the related_bill_task runs.

In the task it creates bill instances in the Bill table in the database.

  1. Update loading of the new bills into Elasticsearch

The celery task elastic_load_task update loading of the new bills into Elasticsearch

The xml for bill similarity is in text_versions that is the bill document itself.

We use them.

  1. Update the bill similarity

The celery task bill_similarity_task update the bill similarity.

It only update the new bills since the new bill list is in the saved field in the UscongressUpdateJob table record.

The xml for bill similarity is in text_versions that is the bill document itself, so we use them.

Flat structure

├── 110 │ ├── dtd │ └── pdf ├── 111 │ ├── dtd │ └── pdf ├── 112 │ ├── dtd │ └── pdf ├── 113 │ ├── dtd └── pdf ├── 114 │ ├── dtd │ ├── pdf ├── 115 │ ├── dtd │ ├── pdf ├── 115-bk │ ├── dtd │ ├── pdf ├── 116 │ ├── dtd │ ├── pdf

Statements of Administration Policy

Found in server_py/bills/tasks, the Statements of Administration Policy task scrapes the links of SAP from the White House website and stores to the database using the original_pdf_link as a unique field to avoid duplicates.