avatar
дата инженеретта
@data_engineerette
12.05.2026 08:50
Airflow для менеджеров

В Ariflow 3.1 появилась группа hitl-операторов, которая позволяет что-то вводить пользаку во время работы дага

HITL = Human-in-the-loop


Что делает пример на картинках?

Мы задаем даты, выбираем сервисы из списка, вводим почту и получаем отчет. Операторы по сути помогают менеджерам не дергать DA/DE своими адхоками, а пойти самому накликать и выгрузить то, что нужно

А как это происходит?

В базовых примерах после разворачивания airflow появится даг example_hitl_operator. Там есть несколько операторов из пакета airflow.providers.standard.operators.hitl:

emojiHITLOperator — выбрать одну или несколько опций из списка
emojiHITLEntryOperator — ввести любой текст
emojiHITLBranchOperator — выбрать следующую таску
emojiApprovalOperator — одобрить или отклонить

В коде выглядит это вот так:


wait_for_multiple_options = HITLOperator(
task_id="wait_for_multiple_options",
subject="Please choose option to proceed: ",
options=["option 1", "option 2", "option 3"],
multiple=True,
defaults=["option 1"],
)


Когда заходите в UI, после запуска дага появляется доп вкладка Required Actions (1), где и нужно прожать опцию. Это все потом отправляется в xcom:


{
"params_input": {},
"responded_at": "datetime.datetime@version=2(tz=(UTC,pendulum.tz.timezone.Timezone,1,True),timestamp=1778505898.269782)",
"chosen_options": [
"option 2",
"option 3"
],
"responded_by_user": {
"id": "1",
"name": "airflow"
}
}


Из xcom потом можно достать в других тасках. Сначала обращаетесь по названию таски, а потом работаете, как с обычным словарем:


{{ ti.xcom_pull(task_ids='wait_for_multiple_options')["chosen_options"] }}

{{ ti.xcom_pull(task_ids='wait_for_input')["params_input"]["information"] }}


Use case очень прикольный. Интересно, а вот на практике этим будут пользоваться?

@data_engineerette
12
🔥 7
👍 6
14 26 952

Обсуждение 14

Обсуждение не доступно в веб-версии. Чтобы написать комментарий, перейдите в приложение Telegram.

Обсудить в Telegram