Airflow basic

airflow standalone
pip install « apache-airflow==2.9.3 » –constraint « https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt »

Troubleshooting

Problem:
The Airflow program is not available after the installation
Solution:
Look in the path variable to see if the airflow directory is present, if it isn’t, add it such as:
:/home/david/.local/bin

Problem:
We update the code of a DAG but the changes are not effective, even after a restart of Airflow.
Solution:
List the DAG errors with this command:
airflow dags list-import-errors

airflow startup

Start individual components in background and by specifying the port:
airflow webserver -p 5006 -D
airflow scheduler -D

Stop the web server:
kill -9 $(cat airflow-webserver.pid)
kill -9 $(cat airflow-scheduler.pid)
kill -9 $(cat airflow-webserver-monitor.pid)
ps -aux | grep -i 'airflow scheduler' | awk '{print $2}' | xargs -I% kill -9 %
ps -aux | grep -i 'gunicorn: master' | awk '{print $2}' | xargs -I% kill -9 %
ps -aux | grep -i 'gunicorn: worker' | awk '{print $2}' | xargs -I% kill -9 %

Airflow components

By default when we deploy airflow with helm we have these components:
– celery seen as a worker in pods
– redis as celery backend
– triggerer
– webserver
– scheduler
– statsd

airflow cli commands

airflow dags list

airflow dags list-import-errors

filepath                                                  | error
=========================================================================+=====================================================================================================================
.........../airflow/example_dags/tutorial_taskflow_api.py | AirflowDagDuplicatedIdException: 
                                                            Ignoring DAG tutorial_taskflow_api from                                                                                             
                                                            | /home/david/.local/lib/python3.8/site-packages/airflow/example_dags/tutorial_taskflow_api.py - also found in                          
                                                            | /home/david/airflow/dags/tutorial_taskflow_api.py

Display the configuration for the API:
airflow config get-value api auth_backends

Enable the basic authentication method for my API:
In airflow.cfg, set:
auth_backends = airflow.api.auth.backend.basic_auth

Create a new user:
airflow users create --username david --firstname Peter --lastname Parker --role Admin --email ebundy@gmail.com

Test if the authentication api works:
curl -X GET --user "david:pass" "http://localhost:30100/api/v1/pools"

List the runs of a dag:
curl -X GET --user "david:pass" http://localhost:30100/api/v1/dags/execute_function_single_taskflow_api/dagRuns

Trigger a new dag run:
curl -H "Content-Type: application/json" -d '{}' -X POST --user "david:pass" http://localhost:30100/api/v1/dags/execute_function_single_taskflow_api/dagRuns
Example of response:

{
  "conf": {},
  "dag_id": "execute_function_single_taskflow_api",
  "dag_run_id": "manual__2024-08-02T14:04:44.110493+00:00",
  "data_interval_end": "2024-08-02T14:04:44.110493+00:00",
  "data_interval_start": "2024-08-02T14:04:44.110493+00:00",
  "end_date": null,
  "execution_date": "2024-08-02T14:04:44.110493+00:00",
  "external_trigger": true,
  "logical_date": "2024-08-02T14:04:44.110493+00:00",
  "note": null,
  "run_type": "manual",
  "start_date": null,
  "state": "queued"
}

Get logs for a specific task instance and its try number.
To get log from specific character position, following way of using URLSafeSerializer can be used.
/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{task_try_number}
Example: curl -H "Content-Type: application/json" -d '{}' -X GET --user "david:pass" http://localhost:30100/api/v1/dags/execute_function_single_taskflow_api/dagRuns/manual__2024-08-02T14:15:47.021558+00:00/taskInstances/execute_function/logs/1?full_content=false

Get the xcom return value of a task:
curl -H "Content-Type: application/json" -d '{}' -X GET --user "david:pass" http://localhost:30100/api/v1/dags/execute_function_single_taskflow_api/dagRuns/manual__2024-08-07T17:36:21.394783+00:00/taskInst ances/execute_function/xcomEntries/return_value
Example of response:

{
  "dag_id": "execute_function_single_taskflow_api",
  "execution_date": "2024-08-07T17:36:21.394783+00:00",
  "key": "return_value",
  "map_index": -1,
  "task_id": "execute_function",
  "timestamp": "2024-08-07T17:36:24.603657+00:00",
  "value": "{'error_type': None, 'error_msg': None, 'result': '12.3', 'stacktrace': None}"
}

Macro commands

re-install-airflow-chart.sh: script to uninstall our helm package and delete all kubernetes secrets and persistent volumes that can cause errors during the reinstallation of the package

#!/bin/bash
helm uninstall modeler-airflow --keep-history
 
kubectl delete secret modeler-airflow-broker-url
kubectl delete secret modeler-airflow-fernet-key
kubectl delete secret modeler-airflow-redis-password
kubectl delete secret sh.helm.release.v1.modeler-airflow.v1
kubectl delete secret sh.helm.release.v1.modeler-airflow.v2
kubectl delete secret sh.helm.release.v1.modeler-airflow.v3
kubectl delete secret sh.helm.release.v1.modeler-airflow.v4
kubectl delete secret sh.helm.release.v1.modeler-airflow.v5
kubectl delete secret sh.helm.release.v1.modeler-airflow.v6
kubectl delete secret sh.helm.release.v1.modeler-airflow.v7
 
kubectl delete pvc data-modeler-airflow-postgresql-0
kubectl delete pvc logs-modeler-airflow-triggerer-0
kubectl delete pvc logs-modeler-airflow-worker-0
kubectl delete pvc redis-db-modeler-airflow-redis-0
 
cd modeler-airflow-chart/
helm upgrade -i  modeler-airflow . --debug
Ce contenu a été publié dans Non classé. Vous pouvez le mettre en favoris avec ce permalien.

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *