pip install « apache-airflow==2.9.3 » –constraint « https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt »
Troubleshooting
Problem:
The Airflow program is not available after the installation
Solution:
Look in the path variable to see if the airflow directory is present, if it isn’t, add it such as:
:/home/david/.local/bin
Problem:
We update the code of a DAG but the changes are not effective, even after a restart of Airflow.
Solution:
List the DAG errors with this command:
airflow dags list-import-errors
airflow startup
Start individual components in background and by specifying the port:
airflow webserver -p 5006 -D
airflow scheduler -D
Stop the web server:
kill -9 $(cat airflow-webserver.pid)
kill -9 $(cat airflow-scheduler.pid)
kill -9 $(cat airflow-webserver-monitor.pid)
ps -aux | grep -i 'airflow scheduler' | awk '{print $2}' | xargs -I% kill -9 %
ps -aux | grep -i 'gunicorn: master' | awk '{print $2}' | xargs -I% kill -9 %
ps -aux | grep -i 'gunicorn: worker' | awk '{print $2}' | xargs -I% kill -9 %
Airflow components
By default when we deploy airflow with helm we have these components:
– celery seen as a worker in pods
– redis as celery backend
– triggerer
– webserver
– scheduler
– statsd
airflow cli commands
airflow dags list
airflow dags list-import-errors
filepath | error =========================================================================+===================================================================================================================== .........../airflow/example_dags/tutorial_taskflow_api.py | AirflowDagDuplicatedIdException: Ignoring DAG tutorial_taskflow_api from | /home/david/.local/lib/python3.8/site-packages/airflow/example_dags/tutorial_taskflow_api.py - also found in | /home/david/airflow/dags/tutorial_taskflow_api.py |
Display the configuration for the API:
airflow config get-value api auth_backends
Enable the basic authentication method for my API:
In airflow.cfg, set:
auth_backends = airflow.api.auth.backend.basic_auth
Create a new user:
airflow users create --username david --firstname Peter --lastname Parker --role Admin --email ebundy@gmail.com
Test if the authentication api works:
curl -X GET --user "david:pass" "http://localhost:30100/api/v1/pools"
List the runs of a dag:
curl -X GET --user "david:pass" http://localhost:30100/api/v1/dags/execute_function_single_taskflow_api/dagRuns
Trigger a new dag run:
curl -H "Content-Type: application/json" -d '{}' -X POST --user "david:pass" http://localhost:30100/api/v1/dags/execute_function_single_taskflow_api/dagRuns
Example of response:
{ "conf": {}, "dag_id": "execute_function_single_taskflow_api", "dag_run_id": "manual__2024-08-02T14:04:44.110493+00:00", "data_interval_end": "2024-08-02T14:04:44.110493+00:00", "data_interval_start": "2024-08-02T14:04:44.110493+00:00", "end_date": null, "execution_date": "2024-08-02T14:04:44.110493+00:00", "external_trigger": true, "logical_date": "2024-08-02T14:04:44.110493+00:00", "note": null, "run_type": "manual", "start_date": null, "state": "queued" } |
Get logs for a specific task instance and its try number.
To get log from specific character position, following way of using URLSafeSerializer can be used.
/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number}
Example:
curl -H "Content-Type: application/json" -d '{}' -X GET --user "david:pass" http://localhost:30100/api/v1/dags/execute_function_single_taskflow_api/dagRuns/manual__2024-08-02T14:15:47.021558+00:00/taskInstances/execute_function/logs/1?full_content=false
Get the xcom return value of a task:
curl -H "Content-Type: application/json" -d '{}' -X GET --user "david:pass" http://localhost:30100/api/v1/dags/execute_function_single_taskflow_api/dagRuns/manual__2024-08-07T17:36:21.394783+00:00/taskInst ances/execute_function/xcomEntries/return_value
Example of response:
{ "dag_id": "execute_function_single_taskflow_api", "execution_date": "2024-08-07T17:36:21.394783+00:00", "key": "return_value", "map_index": -1, "task_id": "execute_function", "timestamp": "2024-08-07T17:36:24.603657+00:00", "value": "{'error_type': None, 'error_msg': None, 'result': '12.3', 'stacktrace': None}" } |
Macro commands
re-install-airflow-chart.sh: script to uninstall our helm package and delete all kubernetes secrets and persistent volumes that can cause errors during the reinstallation of the package
#!/bin/bash helm uninstall modeler-airflow --keep-history kubectl delete secret modeler-airflow-broker-url kubectl delete secret modeler-airflow-fernet-key kubectl delete secret modeler-airflow-redis-password kubectl delete secret sh.helm.release.v1.modeler-airflow.v1 kubectl delete secret sh.helm.release.v1.modeler-airflow.v2 kubectl delete secret sh.helm.release.v1.modeler-airflow.v3 kubectl delete secret sh.helm.release.v1.modeler-airflow.v4 kubectl delete secret sh.helm.release.v1.modeler-airflow.v5 kubectl delete secret sh.helm.release.v1.modeler-airflow.v6 kubectl delete secret sh.helm.release.v1.modeler-airflow.v7 kubectl delete pvc data-modeler-airflow-postgresql-0 kubectl delete pvc logs-modeler-airflow-triggerer-0 kubectl delete pvc logs-modeler-airflow-worker-0 kubectl delete pvc redis-db-modeler-airflow-redis-0 cd modeler-airflow-chart/ helm upgrade -i modeler-airflow . --debug |