<bdo id="ks4iu"><del id="ks4iu"></del></bdo>
  • 
    <pre id="ks4iu"></pre>
  • <bdo id="ks4iu"><del id="ks4iu"></del></bdo>
    <input id="ks4iu"><em id="ks4iu"></em></input>
    
    
  • <center id="ks4iu"><cite id="ks4iu"></cite></center>
  • 首頁 > 智能影音 >

    遷移工具 Air2phin 宣布開源,2 步遷移 Airflow 至 Dolphinscheduler

    近日,調度系統遷移工具 Air2phin 宣布開源。借助 Air2phin,用戶可 2 步將調度系統從 Airflow 遷移至 Apache DolphinScheduler,為有調度系統遷移需要的用戶帶來極大便利。

    Air2phin 是什么?

    Air2phin 是一個最近宣布開源的調度系統遷移工具,旨在將 Apache Airflow DAGs 文件轉換成 Apache DolphinScheduler Python SDK 定義文件,從而實現用戶將調度系統(Workflow orchestration)從 Airflow 遷移到 DolphinScheduler 的目的。它是一個基于多規則的 AST 轉換器,使用 LibCST 來解析和轉換 Airflow 的 DAG 代碼,其全部規則使用 Yaml 文件定義,并提供了一定的自定義規則擴展能力。


    (相關資料圖)

    近期,Air2phin 已經發布了0.0.12 版本,提供了豐富的功能,可以更好地幫助用戶完成 Airflow 到 Apache DolphinScheduler 的遷移。

    AST 是 Abstract Syntax Tree(抽象語法樹)的縮寫,它是一種以樹狀結構表示代碼語法結構的數據結構。在編譯器中,AST 是由詞法分析器和語法分析器生成的。詞法分析器將源代碼轉換成標記流(token stream),語法分析器將標記流轉換成抽象語法樹。AST 是一種樹狀結構,它由一系列節點組成,每個節點表示代碼中的一個語法結構(如表達式、語句、函數、類等),節點之間的關系表示語法結構之間的嵌套關系。

    為什么開源 Air2phin?

    可能有人會問,為什么我需要一個遷移工具?這是因為隨著業務的發展,企業或組織原來使用的工作流編排系統已經無法滿足當前的需求,需要將工作流編排系統遷移到新的平臺或者更新到新的版本。經過調研,很多用戶有了將調度系統從開源工作流編排系統 Airflow 遷移到 Apache DolphinScheduler 上來的需求。

    在遷移過程中,由于數據處理任務可能涉及多個系統之間的依賴關系,遷移過程需要確保在不影響業務運行的前提下完成。此時,調度系統遷移工具就可以發揮重要作用,它能減少人工干預,盡量自動化地完成兩個調度系統間的遷移工作,并且能兼容多個系統間的多個版本,幾乎可以做到用戶無干預完成遷移。

    為此,白鯨開源專門研發了開源遷移工具 Air2phin,可以讓用戶 2 步將調度系統從 Airflow 遷移至 Apache DolphinScheduler,為用戶帶來極大的便利。

    為了讓大家更好地理解 Air2phin 的重要性,我們先從調度系統的相關背景知識開始,了解將調度系統從 Airflow 遷移至 Apache DolphinScheduler 的好處。

    為什么要從 Airflow 遷移至 DolphinScheduler?

    什么是工作流編排系統?

    工作流編排系統,是以尊重編排規則和業務邏輯的方式管理數據流。工作流編排工具讓用戶可以將多個有關聯的任務轉換為可以安排、運行和觀測的工作流,幫助企業更好地管理和控制業務流程,從而提高業務效率。工作流編排是數據處理流程中不可或缺的組件之一,負責根據預先定義的規則和邏輯執行數據處理任務,確保數據處理流程按照預期順利執行,常見工作流編排系統包括 Apache DolphinScheduler、Apache Airflow、Apache Oozie, Azkaban 等。

    Airflow是什么?

    其中,Apache Airflow 是一個開源的工作流編排系統,它可以幫助用戶創建、調度和監控復雜的工作流程。Airflow 最初由 Airbnb 開發,并于 2016 年開源,現在由 Apache 軟件基金會維護。Airflow 使用 Python 語言編寫,具有高度的可擴展性和靈活性,支持多種任務類型,如計算、數據處理、通知、交互等。Airflow 的工作流程是通過編寫 Python 腳本來定義的,可以使用 Airflow 提供的操作符和鉤子,以及自定義操作符和鉤子來擴展其功能。但其有著不可忽視的缺陷,比如需要需要深度二次開發,脫離社區版本,升級成本高;Python 技術棧維護迭代成本高;scheduler loop 掃描 Dag folder 延遲降低性能的問題;以及在生產環境中使用穩定性差等。

    在新數據時代業務需求下誕生的 Apache DolphinScheduler 是一個開源的分布式工作流調度系統,彌補了以往調度系統的弱勢,旨在為企業用戶提供一種可靠、高效、易于使用的工作流調度平臺,支持多種任務類型,如計算、數據處理、ETL 等。

    與 Airflow 相比,DolphinScheduler 采用了分布式架構,提供了多種任務類型,用戶可以定義任務之間的依賴關系,設置任務的優先級和調度策略等,其使用可視化的界面來創建和管理工作流程的特性更是與 Airflow 形成鮮明對比,變得更加易于操作,對非編程人員來說更加友好。

    經過調研對比,對于很多用戶來說,將調度系統遷移至 Apache DolphinScheduler 是一個降本增效的更優選擇。

    Air2phin 如何安裝和使用

    Air2phin 是一個 python 的包,可以通過 Python 的包安裝工具 pip 完成安裝,詳見air2phin getting start。

    python -m pip install--upgrade air2phin

    一個簡單的例子

    我們通過一個簡單的例子,來說明如何使用 Air2phin 的。我們截取了 airflow tutorial.py 中的部分代碼作為 Air2phin 轉化的例子,來說明 Air2phin 如何逐步完成轉化成 dolphinscheduler python sdk。

    圖1:airflow tutorial.py 中的部分代碼

    圖2:Air2phin 如何逐步完成轉化成 dolphinscheduler python sdk

    假設將 airflow tutorial.py 部分內容保存至文件 tutorial_part.py,想要將其轉化成 dolphinscheduler python sdk 定義,只需要一行命令就能完成。結果如圖 2 所示,因為命令增加了 --inplace 參數,所以 Air2phin 會直接將原文件覆蓋,如果不需要覆蓋原問題,可以不使用 --inplace 參數,Air2phin 會新增一個 tutorial_part-air2phin.py 文件來保存轉化后的內容。

    air2phinmigrate--inplacetutorial_part.py

    通過觀察,我們發現這次轉化分別觸發了多條轉化規則,包括

    將 airflow.DAG 轉換成

    pydolphinscheduler.core.process_definition.ProcessDefinition,這個規則在第三行(import語句)以及第六行 DAG context

    將 airflow.operators.bash.BashOperator 轉換成

    pydolphinscheduler.tasks.shell.Shell,這個規則在任務 t1,t2 中都被使用

    除了對應的類轉化之外,我們需要將類的屬性進行轉化,如將

    airflow.DAG.schedule_interval 轉換成了 ProcessDefinition.schedule,同時修改了部分值的內容,如將 timedelta(days=1) 轉成 "0 0 0 * * ? *"

    最后,我們只需要安裝 pydolphinscheduler ,并且將轉化后的文件通過 python 運行,就能完成工作流的遷移了,詳見 pydolphinscheduler 使用(https://dolphinscheduler.apache.org/python/main/start.html#installing-pydolphinscheduler)。

    # 安裝 apache-dolphinschedulerpython-m pip install apache-dolphinscheduler# 將工作流提交到 dolphinschedulerpythontutorial_part.py

    在運行 python tutorial_part.py 時,需要保證 dolphinscheduler API 和 python gateway 服務已經啟動,并且開放了對應的端口,詳見啟動 python gateway service。

    至此,我們通過一個簡單的例子,說明了 Air2phin 是如何完成遷移的。

    工作原理

    Airflow 和 dolphinscheduler python sdk 如何工作?

    在了解 Air2phin 如果工作之前,先了解 Airflow 和 dolphinscheduler python sdk 如何工作是非常重要的前置條件,幫助我們更好地了解 Air2phin 的遷移步驟,當遇到問題的時候也能更加從容地應對。

    Airflow 如何工作 :A irflow 工作流相關的信息都保存在 DAG 文件中,之后將 DAG 文件放置到 Airflow 的指定目錄,Airflow 的 Scheduler 會間隔一定時間去掃描和解析 Airflow 的 DAG 文件,所以 DAG 文件是被動被掃描和更新的。 dolphinscheduler python sdk: 同 Airflow 類似,將全部工作流相關的信息都通過 Python 文件定義,但是 dolphinscheduler python sdk 是通過人為主動觸發的方式,將工作流信息提交,運行命令 python 工作流文件名 即可完成主動任務提交。

    Air2phin 工作流程

    了解完兩者是如何使用,如何提交/發現工作流的,將更加利于我們對 Air2phin 的工作原理的理解。因為 Airflow 的 DAG 文件以及 DolphinScheduler 的 Python sdk 定義文件都是 Python 編寫的,所以 Air2phin 的大部分代碼都是處理兩者間的差異,最后將 Airflow 的代碼轉化成 dolphinscheduler python sdk 和定義。

    Air2phin 使用了 LibCST(https://libcst.readthedocs.io/en/latest/) 來實現 airflow python DAG 代碼的抽象語法樹解析,然后通過 LibCST 的 Transformer(https://libcst.readthedocs.io/en/latest/tutorial.html#Build-Visitor-or-Transformer)結合轉化規則最后轉化成 dolphinscheduler python sdk 的定義。

    Air2phin 整體工作流程如下:

    從標準輸入或者文件中獲取原本的 Airflow DAG 內容

    從 Yaml 文件加載所有轉換規則

    將 Airflow DAG 內容通過 LibCST 解析成 CST 樹

    通過 LibCST Transformer 轉換 dolphinscheduler python sdk 定義內容

    Air2phin 最佳實踐

    遷移整個文件夾而不是單個文件

    當用戶想要遷移 Airflow 到 DolphinScheduler 的時候,都是想要整體做遷移而不是單個文件遷移的,Air2phin 提供整體文件夾遷移的能力,只需要將路徑從文件路徑改成文件夾即可。

    # 遷移整個 ~ /airflow/dags 文件夾 air2phin migrate --inplace ~ /airflow/dags 增加自定義的規則

    部分使用 Airflow 的用戶自定義 Hook 或者 Operator,用戶自定義的 Operator 無法通過 Air2phin 內置的轉化規則完成轉化,需要用戶增加自定義的規則,并告訴 Air2phin 規則的位置。例如我們有一個叫 MyCustomOperator 的算子是繼承 PostgresOperator 的大部分功能, 只是命名不一樣,其定義如下:

    from airflow.providers.postgres.operators.postgres import PostgresOperatorclassMyCustomOperator(PostgresOperator):def__init__(self,*,sql: str | Iterable[str],my_custom_conn_id: str = "postgres_default",autocommit: bool = False,parameters: Iterable | Mapping | None = None,database: str | None = None,runtime_parameters: Mapping | None = None,**kwargs,)-> None:super.__init__(sql= sql,postgres_conn_id= my_custom_conn_id,autocommit= autocommit,parameters= parameters,database= database,runtime_parameters= runtime_parameters,**kwargs,)

    它在 Airflow 的多個 DAG 中被使用,使用的方式如下:

    from custom.my_custom_operator import MyCustomOperatorwithDAG(dag_id= "my_custom_dag",default_args= default_args,schedule_interval= "@once",start_date= days_ago(2),tags= ["example"],)as dag:t1= MyCustomOperator(task_id= "my_custom_task",sql= "select * from table",my_custom_conn_id= "my_custom_conn_id",)

    現在需要對這個 Operator 進行轉化,我們可以自定義一個轉化規則,并將其命名為 MyCustomOperator.yaml,內容如下,最主要的內容是 migration.module 和 migration.parameter 的定義,其確定了轉化規則:

    name: MyCustomOperatordeion: The configuration for migrating airflow custom operator MyCustomOperator to DolphinScheduler SQL task.migration: module: -action: replacesrc: custom.my_custom_operator.MyCustomOperatordest: pydolphinscheduler.tasks.sql.Sqlparameter: -action: replacesrc: task_iddest: name-action: replacesrc: my_custom_conn_iddest: datasource_name

    再使用 --custom-rules 參數指定轉化自定義參數,就能應用自定義規則的轉化:

    # 指定自定義規則路徑為 /path/to/MyCustomOperator.yamlair2phinmigrate --inplace --custom-rules /path/to/MyCustomOperator.yaml ~/airflow/dags 讓 Air2phin 運行地更快

    Air2phin 默認是一個進程運行 DAG 文件的轉化的,當你有許多 DAG 文件時,Air2phin 轉化非常耗時,我們提供了一個啟動多進程運行 Air2phin 轉化的參數 --multiprocess,可以將其指定為用戶機器的 CPU 數量來縮短轉化時間:

    # 指定 air2phin 啟動 12 個進程同時進行轉化air2phinmigrate --inplace --custom-rules /path/to/MyCustomOperator.yaml --multiprocess 12~/airflow/dags

    存在的問題

    目前,作為一個轉化工具,Air2phin 的使用方式已經算比較完善了,能夠滿足用戶遷移調度系統的基本需求,但還有一些地方有待完善。

    內置規則還不夠多

    轉化規則還不夠多,目前只有五個,分別是:

    airflow.DAG

    airflow.operators.bash.BashOperator

    airflow.operators.dummy_operator.DummyOperator

    airflow.operators.python_operator.PythonOperator

    airflow.operators.spark_sql_operator.SparkSqlOperator

    如果有更多的規則,Air2phin 將成為一個更加好用的轉化工具,這里歡迎各位隨時提交轉化規則的PR(https://github.com/WhaleOps/air2phin/pulls)。

    部分Airflow的用法不能被遷移過來

    部分概念僅僅在 Airflow 中有,在 DolphinScheduler 中還沒有,如任務的成功、失敗、重試、觸發 callback,任務的 owner,variable,工作流并發數,tag 等,這部分 Airflow DAG 可以被遷移,但兼容的屬性將會丟失,無法遷移到 DolphinScheduler。

    Air2phin 常見問題解答

    Q:為什么選擇解析 Airflow DAG 文件而不是數據庫?

    A:因為 Airflow DAG 文件中才有完成的工作流信息,Airflow 的數據庫中只有工作流基本信息,沒有任務定義的信息,也沒有任務的關系,我們選擇通過解析 Airflow 的 DAG 文件而不是數據庫來完成轉化。

    Q:為什么要通過 dolphinscheduler python sdk 做中轉不自己提交到 DolphinScheduler?

    A:因為 Airflow DAG 就是 Python 定義的,在 Airflow DAG 中有很多 Python 的特性,我們不想將這部分特性轉化成結構化的數據(轉化可能存在信息丟失),恰好 DolphinScheduler 已經有了 Python 的 sdk,所以直接通過 LibCST 轉化是成本更加低的做法。

    Q:為什么使用 LibCST 而不是 python 內置的 AST?

    A:因為 LibCST 更加符合我們,Python 內置的 AST 庫解析成 AST 的時候會丟失掉 comment 的信息,但是我們呢希望保留著部分信息。且 LibCST 提供更加多 visitor 保證我們更加方便的實現替換。

    參考鏈接:air2phin(https://github.com/WhaleOps/air2phin)

    ?騰訊回應進軍類 ChatGPT;Meta 新語言模型能運行在單張顯卡上;OpenAI 創始人提出新摩爾定律|極客頭條

    ?ChatGPT 帶火的「提示工程師」崗,不用寫代碼,也能獲得年薪數百萬?

    ?“ChatGPT 正在取代員工”,最新 ChatGPT 調查報告發布!

    責任編輯:Rex_28

    推薦閱讀

    20萬左右的進口車哪個好

    · 2023-02-28 10:27:01

    ipad助手排行榜_ipad助手

    · 2023-02-28 10:28:26
    欧美国产在线一区,免费看成年视频网页,国产亚洲福利精品一区,亚洲一区二区约美女探花
    <bdo id="ks4iu"><del id="ks4iu"></del></bdo>
  • 
    <pre id="ks4iu"></pre>
  • <bdo id="ks4iu"><del id="ks4iu"></del></bdo>
    <input id="ks4iu"><em id="ks4iu"></em></input>
    
    
  • <center id="ks4iu"><cite id="ks4iu"></cite></center>
  • 主站蜘蛛池模板: 午夜私人影院在线观看| 激情内射亚州一区二区三区爱妻 | 国产精品va一级二级三级| 国产1区2区3区在线观看| 久久最近最新中文字幕大全| 99亚洲精品高清一二区| 直接进入免费看黄的网站| 日韩一区精品视频一区二区| 国产麻豆精品精东影业av网站 | 在公车上忘穿内裤嗯啊色h文 | 久久久999久久久精品| 宅男噜噜噜66| 激情五月婷婷网| 夜色私人影院永久入口| 亚洲精品成人网站在线播放| 99re最新地址精品视频| 欧美日韩在线观看一区二区| 女人与狥交下配a级正在播放| 国产三级免费观看| 亚洲aⅴ男人的天堂在线观看| a级高清观看视频在线看| 波多野结衣被绝伦在线观看| 国产高跟踩踏vk| 亚洲人成无码www久久久| chinese精品男同志浪小辉| 爱情岛论坛亚洲永久入口口| 性按摩xxxx| 国产三级电影在线观看| 中文字幕在线观看第一页| 黄色免费网站在线看| 欧美性极品hd高清视频| 国产爽的冒白浆的视频高清| 亚洲欧美成人综合久久久| 一个人看的www在线观看免费| 色综合久久91| 日本人与黑人videos系列| 国产69久久精品成人看| www.99精品视频在线播放| 精品人妻一区二区三区浪潮在线| 日本午夜免费福利视频| 含羞草实验研究所入口免费网站直接进入|