近日,調度系統遷移工具 Air2phin 宣布開源。借助 Air2phin,用戶可 2 步將調度系統從 Airflow 遷移至 Apache DolphinScheduler,為有調度系統遷移需要的用戶帶來極大便利。
Air2phin 是什么?
Air2phin 是一個最近宣布開源的調度系統遷移工具,旨在將 Apache Airflow DAGs 文件轉換成 Apache DolphinScheduler Python SDK 定義文件,從而實現用戶將調度系統(Workflow orchestration)從 Airflow 遷移到 DolphinScheduler 的目的。它是一個基于多規則的 AST 轉換器,使用 LibCST 來解析和轉換 Airflow 的 DAG 代碼,其全部規則使用 Yaml 文件定義,并提供了一定的自定義規則擴展能力。
(相關資料圖)
近期,Air2phin 已經發布了0.0.12 版本,提供了豐富的功能,可以更好地幫助用戶完成 Airflow 到 Apache DolphinScheduler 的遷移。
AST 是 Abstract Syntax Tree(抽象語法樹)的縮寫,它是一種以樹狀結構表示代碼語法結構的數據結構。在編譯器中,AST 是由詞法分析器和語法分析器生成的。詞法分析器將源代碼轉換成標記流(token stream),語法分析器將標記流轉換成抽象語法樹。AST 是一種樹狀結構,它由一系列節點組成,每個節點表示代碼中的一個語法結構(如表達式、語句、函數、類等),節點之間的關系表示語法結構之間的嵌套關系。
為什么開源 Air2phin?
可能有人會問,為什么我需要一個遷移工具?這是因為隨著業務的發展,企業或組織原來使用的工作流編排系統已經無法滿足當前的需求,需要將工作流編排系統遷移到新的平臺或者更新到新的版本。經過調研,很多用戶有了將調度系統從開源工作流編排系統 Airflow 遷移到 Apache DolphinScheduler 上來的需求。
在遷移過程中,由于數據處理任務可能涉及多個系統之間的依賴關系,遷移過程需要確保在不影響業務運行的前提下完成。此時,調度系統遷移工具就可以發揮重要作用,它能減少人工干預,盡量自動化地完成兩個調度系統間的遷移工作,并且能兼容多個系統間的多個版本,幾乎可以做到用戶無干預完成遷移。
為此,白鯨開源專門研發了開源遷移工具 Air2phin,可以讓用戶 2 步將調度系統從 Airflow 遷移至 Apache DolphinScheduler,為用戶帶來極大的便利。
為了讓大家更好地理解 Air2phin 的重要性,我們先從調度系統的相關背景知識開始,了解將調度系統從 Airflow 遷移至 Apache DolphinScheduler 的好處。
為什么要從 Airflow 遷移至 DolphinScheduler?
什么是工作流編排系統?
工作流編排系統,是以尊重編排規則和業務邏輯的方式管理數據流。工作流編排工具讓用戶可以將多個有關聯的任務轉換為可以安排、運行和觀測的工作流,幫助企業更好地管理和控制業務流程,從而提高業務效率。工作流編排是數據處理流程中不可或缺的組件之一,負責根據預先定義的規則和邏輯執行數據處理任務,確保數據處理流程按照預期順利執行,常見工作流編排系統包括 Apache DolphinScheduler、Apache Airflow、Apache Oozie, Azkaban 等。
Airflow是什么?
其中,Apache Airflow 是一個開源的工作流編排系統,它可以幫助用戶創建、調度和監控復雜的工作流程。Airflow 最初由 Airbnb 開發,并于 2016 年開源,現在由 Apache 軟件基金會維護。Airflow 使用 Python 語言編寫,具有高度的可擴展性和靈活性,支持多種任務類型,如計算、數據處理、通知、交互等。Airflow 的工作流程是通過編寫 Python 腳本來定義的,可以使用 Airflow 提供的操作符和鉤子,以及自定義操作符和鉤子來擴展其功能。但其有著不可忽視的缺陷,比如需要需要深度二次開發,脫離社區版本,升級成本高;Python 技術棧維護迭代成本高;scheduler loop 掃描 Dag folder 延遲降低性能的問題;以及在生產環境中使用穩定性差等。
在新數據時代業務需求下誕生的 Apache DolphinScheduler 是一個開源的分布式工作流調度系統,彌補了以往調度系統的弱勢,旨在為企業用戶提供一種可靠、高效、易于使用的工作流調度平臺,支持多種任務類型,如計算、數據處理、ETL 等。
與 Airflow 相比,DolphinScheduler 采用了分布式架構,提供了多種任務類型,用戶可以定義任務之間的依賴關系,設置任務的優先級和調度策略等,其使用可視化的界面來創建和管理工作流程的特性更是與 Airflow 形成鮮明對比,變得更加易于操作,對非編程人員來說更加友好。
經過調研對比,對于很多用戶來說,將調度系統遷移至 Apache DolphinScheduler 是一個降本增效的更優選擇。
Air2phin 如何安裝和使用
Air2phin 是一個 python 的包,可以通過 Python 的包安裝工具 pip 完成安裝,詳見air2phin getting start。
python -m pip install--upgrade air2phin一個簡單的例子
我們通過一個簡單的例子,來說明如何使用 Air2phin 的。我們截取了 airflow tutorial.py 中的部分代碼作為 Air2phin 轉化的例子,來說明 Air2phin 如何逐步完成轉化成 dolphinscheduler python sdk。
圖1:airflow tutorial.py 中的部分代碼
圖2:Air2phin 如何逐步完成轉化成 dolphinscheduler python sdk
假設將 airflow tutorial.py 部分內容保存至文件 tutorial_part.py,想要將其轉化成 dolphinscheduler python sdk 定義,只需要一行命令就能完成。結果如圖 2 所示,因為命令增加了 --inplace 參數,所以 Air2phin 會直接將原文件覆蓋,如果不需要覆蓋原問題,可以不使用 --inplace 參數,Air2phin 會新增一個 tutorial_part-air2phin.py 文件來保存轉化后的內容。
air2phinmigrate--inplacetutorial_part.py通過觀察,我們發現這次轉化分別觸發了多條轉化規則,包括
將 airflow.DAG 轉換成
pydolphinscheduler.core.process_definition.ProcessDefinition,這個規則在第三行(import語句)以及第六行 DAG context
將 airflow.operators.bash.BashOperator 轉換成
pydolphinscheduler.tasks.shell.Shell,這個規則在任務 t1,t2 中都被使用
除了對應的類轉化之外,我們需要將類的屬性進行轉化,如將
airflow.DAG.schedule_interval 轉換成了 ProcessDefinition.schedule,同時修改了部分值的內容,如將 timedelta(days=1) 轉成 "0 0 0 * * ? *"
最后,我們只需要安裝 pydolphinscheduler ,并且將轉化后的文件通過 python 運行,就能完成工作流的遷移了,詳見 pydolphinscheduler 使用(https://dolphinscheduler.apache.org/python/main/start.html#installing-pydolphinscheduler)。
# 安裝 apache-dolphinschedulerpython-m pip install apache-dolphinscheduler# 將工作流提交到 dolphinschedulerpythontutorial_part.py在運行 python tutorial_part.py 時,需要保證 dolphinscheduler API 和 python gateway 服務已經啟動,并且開放了對應的端口,詳見啟動 python gateway service。
至此,我們通過一個簡單的例子,說明了 Air2phin 是如何完成遷移的。
工作原理
Airflow 和 dolphinscheduler python sdk 如何工作?
在了解 Air2phin 如果工作之前,先了解 Airflow 和 dolphinscheduler python sdk 如何工作是非常重要的前置條件,幫助我們更好地了解 Air2phin 的遷移步驟,當遇到問題的時候也能更加從容地應對。
Airflow 如何工作 :A irflow 工作流相關的信息都保存在 DAG 文件中,之后將 DAG 文件放置到 Airflow 的指定目錄,Airflow 的 Scheduler 會間隔一定時間去掃描和解析 Airflow 的 DAG 文件,所以 DAG 文件是被動被掃描和更新的。 dolphinscheduler python sdk: 同 Airflow 類似,將全部工作流相關的信息都通過 Python 文件定義,但是 dolphinscheduler python sdk 是通過人為主動觸發的方式,將工作流信息提交,運行命令 python 工作流文件名 即可完成主動任務提交。Air2phin 工作流程
了解完兩者是如何使用,如何提交/發現工作流的,將更加利于我們對 Air2phin 的工作原理的理解。因為 Airflow 的 DAG 文件以及 DolphinScheduler 的 Python sdk 定義文件都是 Python 編寫的,所以 Air2phin 的大部分代碼都是處理兩者間的差異,最后將 Airflow 的代碼轉化成 dolphinscheduler python sdk 和定義。
Air2phin 使用了 LibCST(https://libcst.readthedocs.io/en/latest/) 來實現 airflow python DAG 代碼的抽象語法樹解析,然后通過 LibCST 的 Transformer(https://libcst.readthedocs.io/en/latest/tutorial.html#Build-Visitor-or-Transformer)結合轉化規則最后轉化成 dolphinscheduler python sdk 的定義。
Air2phin 整體工作流程如下:
從標準輸入或者文件中獲取原本的 Airflow DAG 內容
從 Yaml 文件加載所有轉換規則
將 Airflow DAG 內容通過 LibCST 解析成 CST 樹
通過 LibCST Transformer 轉換 dolphinscheduler python sdk 定義內容
Air2phin 最佳實踐
遷移整個文件夾而不是單個文件當用戶想要遷移 Airflow 到 DolphinScheduler 的時候,都是想要整體做遷移而不是單個文件遷移的,Air2phin 提供整體文件夾遷移的能力,只需要將路徑從文件路徑改成文件夾即可。
# 遷移整個 ~ /airflow/dags 文件夾 air2phin migrate --inplace ~ /airflow/dags 增加自定義的規則部分使用 Airflow 的用戶自定義 Hook 或者 Operator,用戶自定義的 Operator 無法通過 Air2phin 內置的轉化規則完成轉化,需要用戶增加自定義的規則,并告訴 Air2phin 規則的位置。例如我們有一個叫 MyCustomOperator 的算子是繼承 PostgresOperator 的大部分功能, 只是命名不一樣,其定義如下:
from airflow.providers.postgres.operators.postgres import PostgresOperatorclassMyCustomOperator(PostgresOperator):def__init__(self,*,sql: str | Iterable[str],my_custom_conn_id: str = "postgres_default",autocommit: bool = False,parameters: Iterable | Mapping | None = None,database: str | None = None,runtime_parameters: Mapping | None = None,**kwargs,)-> None:super.__init__(sql= sql,postgres_conn_id= my_custom_conn_id,autocommit= autocommit,parameters= parameters,database= database,runtime_parameters= runtime_parameters,**kwargs,)它在 Airflow 的多個 DAG 中被使用,使用的方式如下:
from custom.my_custom_operator import MyCustomOperatorwithDAG(dag_id= "my_custom_dag",default_args= default_args,schedule_interval= "@once",start_date= days_ago(2),tags= ["example"],)as dag:t1= MyCustomOperator(task_id= "my_custom_task",sql= "select * from table",my_custom_conn_id= "my_custom_conn_id",)現在需要對這個 Operator 進行轉化,我們可以自定義一個轉化規則,并將其命名為 MyCustomOperator.yaml,內容如下,最主要的內容是 migration.module 和 migration.parameter 的定義,其確定了轉化規則:
name: MyCustomOperatordeion: The configuration for migrating airflow custom operator MyCustomOperator to DolphinScheduler SQL task.migration: module: -action: replacesrc: custom.my_custom_operator.MyCustomOperatordest: pydolphinscheduler.tasks.sql.Sqlparameter: -action: replacesrc: task_iddest: name-action: replacesrc: my_custom_conn_iddest: datasource_name再使用 --custom-rules 參數指定轉化自定義參數,就能應用自定義規則的轉化:
# 指定自定義規則路徑為 /path/to/MyCustomOperator.yamlair2phinmigrate --inplace --custom-rules /path/to/MyCustomOperator.yaml ~/airflow/dags 讓 Air2phin 運行地更快Air2phin 默認是一個進程運行 DAG 文件的轉化的,當你有許多 DAG 文件時,Air2phin 轉化非常耗時,我們提供了一個啟動多進程運行 Air2phin 轉化的參數 --multiprocess,可以將其指定為用戶機器的 CPU 數量來縮短轉化時間:
# 指定 air2phin 啟動 12 個進程同時進行轉化air2phinmigrate --inplace --custom-rules /path/to/MyCustomOperator.yaml --multiprocess 12~/airflow/dags存在的問題
目前,作為一個轉化工具,Air2phin 的使用方式已經算比較完善了,能夠滿足用戶遷移調度系統的基本需求,但還有一些地方有待完善。
內置規則還不夠多
轉化規則還不夠多,目前只有五個,分別是:
airflow.DAG
airflow.operators.bash.BashOperator
airflow.operators.dummy_operator.DummyOperator
airflow.operators.python_operator.PythonOperator
airflow.operators.spark_sql_operator.SparkSqlOperator
如果有更多的規則,Air2phin 將成為一個更加好用的轉化工具,這里歡迎各位隨時提交轉化規則的PR(https://github.com/WhaleOps/air2phin/pulls)。
部分Airflow的用法不能被遷移過來
部分概念僅僅在 Airflow 中有,在 DolphinScheduler 中還沒有,如任務的成功、失敗、重試、觸發 callback,任務的 owner,variable,工作流并發數,tag 等,這部分 Airflow DAG 可以被遷移,但兼容的屬性將會丟失,無法遷移到 DolphinScheduler。
Air2phin 常見問題解答
Q:為什么選擇解析 Airflow DAG 文件而不是數據庫?
A:因為 Airflow DAG 文件中才有完成的工作流信息,Airflow 的數據庫中只有工作流基本信息,沒有任務定義的信息,也沒有任務的關系,我們選擇通過解析 Airflow 的 DAG 文件而不是數據庫來完成轉化。
Q:為什么要通過 dolphinscheduler python sdk 做中轉不自己提交到 DolphinScheduler?
A:因為 Airflow DAG 就是 Python 定義的,在 Airflow DAG 中有很多 Python 的特性,我們不想將這部分特性轉化成結構化的數據(轉化可能存在信息丟失),恰好 DolphinScheduler 已經有了 Python 的 sdk,所以直接通過 LibCST 轉化是成本更加低的做法。
Q:為什么使用 LibCST 而不是 python 內置的 AST?
A:因為 LibCST 更加符合我們,Python 內置的 AST 庫解析成 AST 的時候會丟失掉 comment 的信息,但是我們呢希望保留著部分信息。且 LibCST 提供更加多 visitor 保證我們更加方便的實現替換。
參考鏈接:air2phin(https://github.com/WhaleOps/air2phin)
?騰訊回應進軍類 ChatGPT;Meta 新語言模型能運行在單張顯卡上;OpenAI 創始人提出新摩爾定律|極客頭條
?ChatGPT 帶火的「提示工程師」崗,不用寫代碼,也能獲得年薪數百萬?
?“ChatGPT 正在取代員工”,最新 ChatGPT 調查報告發布!
責任編輯:Rex_28