<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    大大毛 的筆記

      DDM's Note

    哪怕沒有辦法一定有說法,
    就算沒有鴿子一定有烏鴉,
    固執無罪 夢想有價,
    讓他們驚訝.

    posts - 14, comments - 23, trackbacks - 0, articles - 58
       :: 首頁 ::  :: 聯系 ::  :: 管理
    需求:
        比較前一篇文章來說,僅加多Delete的行為。例如我僅需要Status=1的資料,所以對于資料落地來講,最合適的莫過于下面這樣,Table也可以自動保持最少量的有效資料
        1. 新資料Status=0,執行Insert;
        2. 資料修改Status=0,執行Update;
        3. 資料狀態變更Status=1,執行Delete;
        4. 若資料狀態重新變更為0,則又會執行Insert;

    思路:
        按理來說,只要通過分支Route就可以將Insert/Update與Delete作業分成兩條Nifi支流(看網上確實有很多這么整法的),但是用Route有一個問題處理不了,那就是資料順序的正確性你是無法保證的。對于小數據量的場景來說,每筆Key的多次操作間隔可能會比較長,所以它不會有什么問題,但大數據量的情況下,兩同相同Key值的資料走Route后被處理的順序混亂就會造成最終資料結果的異常(比如應該是先Insert再Delete,結果卻是發現資料還躺在Table中)。而大數據量在使用Kafka做為數據源時就不可避免會出現:即使業務數據量確實不大,但對于積累了好幾天的數據再進行接收時,那一瞬間的數據量也會是很大的。
       所以我們能做的就是動態決定執行Delete和Insert。

    解決方案:
    雖然與前一篇來說差異不大,但Nifi流程上卻有很大不同,下面會詳細描述為什么要這樣做



    Processor及其設定:
        ConsumeKafkaRecord、SplitJson、Connection、EvaluateJsonPath與前一章的一樣,只是不同數據下解析的屬性有所不同,這里不再詳述。

        UpdateAttribute作用是從Kafka中Consume出資料(以Record的形態),這里使用Record是因為源數據就是以Record的方式存上去的 (Avro Schema)
    • SQL1:這才是這次花招的關鍵,在這里根據STATUS自行構建SQL語句
      • Status=0:構建的就是Delete From xxx Where Key1=? and Key2=? 這樣的刪除語句
      • Status!=0:構建的就是Replace Into xxx (Key1,Key2,col3,col4) Values ( ?, ?, ?, ?) 這樣的Insert or Update語句
    • 經此Processor處理后,資料落地所需的SQL就構建好了,后續的問題就是如何去綁定參數和執行

        ReplaceText作用是對FlowFile進行文本替換,這里使用它來直接產生我所需的JSON內容
    • Search Value:這里使用的是Default的(?s)(^.*$),作用就是把原先的整份文件全部換掉
    • Replacement Value:這里放的就是一個固定結構的JSON,可以看到里面的屬性值都是使用的Attribute (它們的值來源于前面的EvaluateJsonPath從源JSON文件中的提取)
      • 細心的朋友可以發現這里是與前一篇文章的最大不同,這里沒有使用AttributeToJson去直接產生JSON文件,而使用的是更加笨拙的方式
        • 前面的文章有提過,我們產生的Attribute以及AttributeToJson所生成JSON中各屬性的順序問題,結論是怎么搞它都不是我所想象到的順序。但是ConvertJsonToSQL這個東東卻很實在,它確確實實是按JSON中屬性的順序去生成的SQL以及參數名稱(還記得參數名稱sql.args.1.value中的這個順序1么),所以問題就來了:
          • SQL由于必須要有Delete和Replace,所以它們的參數個數一定是不同的,而Delete壓的參數又是我們的Key,所以就必須要保證ConvertJsonToSQL生成屬性的順序,這樣我們才能夠保證我們的兩個Key一定會是sql.args.1和sql.args.2
          • 換句話說,如果AttributesToJson若是能夠保證JSON屬性順序的話,那就不用這么費勁

          ConvertJsonToSQL與前文一樣,以Insert的方式生成SQL和綁定參數即可

          UpdateAttribute終于用到了它的Delete功能,作用是清除掉多余的SQL綁定參數

      • Delete Attributes Expression:這里我根據Delete的條件(STATUS=0)去刪除多余的SQL綁定參數
        • 這里的寫法比較死,我Hard-code刪除掉大與2的其它所有參數(" * "是一個通配符," | "是一個多條件的間隔符),感覺上還有更好的寫法
      • 至此我們就可以保證綁定參數的數量與SQL語法參數個數一致 (不一致它死給你看)

            PutSQL這里仍然是執行SQL,這里使用配置參數的形式讓它執行我們的SQL

        • SQL Statement:前面用UpdateAttribute產生的SQL1參數,它會根據STATUS=0去判斷是使用DELETE還是REPLACE語法
          • 這個屬性壓上后,無論SQL1是不是為空,這個組件都不會再去管FlowFile的內容(空屬性時是把FlowFile的內容當成SQL去執行的)

        i am ddm

        主站蜘蛛池模板: 亚洲国产高清人在线| 永久黄网站色视频免费| 亚洲精品乱码久久久久久不卡| 国产成人精品亚洲日本在线| 国产aⅴ无码专区亚洲av麻豆| 国产精品福利在线观看免费不卡| 亚洲六月丁香六月婷婷色伊人| 亚洲成aⅴ人片久青草影院| 亚洲AV无码国产丝袜在线观看| 四虎亚洲精品高清在线观看| 97在线视频免费播放| 最近最新中文字幕完整版免费高清 | 亚洲AV人无码激艳猛片| 国产免费拔擦拔擦8x| 免费观看理论片毛片| 亚洲精品第一国产综合亚AV| 亚洲天天做日日做天天看| 在线电影你懂的亚洲| 亚洲伊人久久精品影院| 免费人妻av无码专区| 亚洲国产精品无码久久久蜜芽| 哒哒哒免费视频观看在线www | 中文字幕亚洲第一在线| 久久久久亚洲av毛片大| 中文字幕亚洲激情| 又粗又大又猛又爽免费视频| 亚洲国产精品丝袜在线观看| 久久久久久a亚洲欧洲AV| 亚洲精品视频观看| 国产精品亚洲精品爽爽| 羞羞视频免费网站含羞草| 一级毛片在线免费播放| 99久久国产免费中文无字幕| 91成人在线免费观看| 30岁的女人韩剧免费观看| 国产一区二区三区在线观看免费 | 亚洲免费观看视频| 亚洲第一区在线观看| 亚洲宅男天堂a在线| 亚洲精品色播一区二区| a级毛片高清免费视频|