<rt id="bn8ez"></rt>
<label id="bn8ez"></label>

  • <span id="bn8ez"></span>

    <label id="bn8ez"><meter id="bn8ez"></meter></label>

    大大毛 的筆記

      DDM's Note

    哪怕沒有辦法一定有說法,
    就算沒有鴿子一定有烏鴉,
    固執無罪 夢想有價,
    讓他們驚訝.

    posts - 14, comments - 23, trackbacks - 0, articles - 58
       :: 首頁 ::  :: 聯系 ::  :: 管理

    Kafka資料落地至MariaDB (帶Key的新增、修改)

    Posted on 2019-04-11 14:14 大大毛 閱讀(462) 評論(0)  編輯  收藏 所屬分類: Nifi
    需求:
        接收Kafka資料,資料具有Key列(多列),有新增、修改但無刪除,需要同步落地至MariaDB
    解決方案(僅新增、修改):
        這個場景是最常見的,資料不會有被刪除的狀態,所有的更新就只有Insert Or Update這兩種狀態,先上實例的圖 (兩邊的LogMessage是為了接收Fail,有感嘆號是避免一起開啟的時候它也被開啟----這樣failure的訊息就不會再卡在Connection中了)


    思路:
        因為記錄只有新增和修改兩種狀態,理論上說這兩種的SQL非常接近,所以可以做以下考量
        1. Processor層面是否支援Update Or Insert
          > 查網上訊息有個叫Upsert,不過在Nifi中查找,只有一個支援Mongo的組件具有這個功能
        2. DB層面是否支援
          > Maria DB有個 "REPLACE INTO" 的語法是可以支持Insert Or Update,雖然簡單看了下介紹說是會依主鍵或唯一索引去先做定位,如果定位到已經存在則先做刪除再進行新增(偽Update),但確實可以達成我們的目的,不是嗎?

    Processor及其設定:
        ConsumeKafkaRecord,作用是從Kafka中Consume出資料(以Record的形態),這里使用Record是因為源數據就是以Record的方式存上去的 (Avro Schema)
    • Kafka Brokers:Kafka的Broker列表,多個Broker以逗號分隔,類似www.broker1.com:9193,www.broker2.com:9193這樣的形式配置
    • Topic Name:需要Consume的Kafka Topic名稱
    • Record Reader/Writer:關于Record所需要設定的Reader和Writer,要先行在Configure中設定,當然也要設定好Schema  
    • Group ID:Consumer所要設定的ID,這個的設定要依Kafka的配置來,現在我們一般就只有單個的Partition,所以會要求每個Processor都設定有不同
    • Offset Reset:需要設定為"earliest",這樣就會依GroupID沒有收過的資料來進行收取,否則就只會收新推上去的資料。第一次玩的兄弟經常坑在GroupID和Offset Reset這兩項上,若是收不到資料則有  可能就是GroupID沒有換成新的(舊的已經收過一次就不會重新再收),或者是Offset Reset = latest又沒有新資料推上去~~~
    • Max Poll Records和SCHEDULING中的Run Schedule:需要根據實際接收的速度來進行調整。經過觀察發現Consume的速度超快,但整個Nifi Flow的速度會卡在其它需要做解析或讀寫DB的Processor外 (通常解析JSON會是前面的關卡),所以任由Consumer的高速讀取就會造成整個Nifi流程在后段被卡住。造成這個的主要原因其實就在于kafka處理的高速上,所以當有新換GroupID或新流程時,Kafka上積累的海量資料就會在一瞬間被接收下來,然后就是各種紅 (其實紅了也沒事,它會自動向上推,讓前一個Processor停止處理)。
      • 若是常態下的資料推送量就已經超過了你的Nifi處理速度,那么就要考量使用多個線程處理或者是從源頭的Kafka上就把資料分割開來  
      • SCHEDULING的Cocurrent Tasks:這個Default=1,就是當前Processor需要開起來的線程數。但是這個設置需要當心,你需要仔細考量過你的資料流是否允許亂序 (多線程時當然不可能還能保證資料處理的順序),所以它是僅適用于不Care資料處理順序的場景,例如每筆Key就只會有一筆資料,而且哪筆資料先收后收無所謂

        SplitJson,作用就只是簡單的把一個JSON數組切開成單個的JSON。Consume出來的會是個數組,這跟你存放進去的單筆訊息是不是數組沒什么關系。

        Connection,就是Processor中的那根帶箭頭的連線,它的作用是連接不同的Processor并且它還具有緩存池的的一個用途,除了把數據從A導流向B外,還可以將B暫時處理不動的資料存放在自帶的緩存池中,若是緩存池達到上限,則Nifi會自動讓A暫停處理直至B緩過勁~~~
    • Back Pressure Object Threshold / Back Pressure Data Size Threshold:最大緩存的消息筆數 / 最大緩存消息的體積,兩者任一超過就會讓上游Processor處理暫停
    • Available Prioritizers:出入緩存池的順序控制,Default是空,通常來說都應該要設成FIFO先進先出的方式
      • 不設定這個經常會造成Nifi資料處理丟失的假象,A1,A2,A3,A4,最后看到的不是A4而是A3,會讓人以為A4被玩掉了,其實只是A4被先處理,而A3變成了最后一筆狀態。而且這種錯誤很難被發現!!


        EvaluateJsonPath1,這個元件的作用是解析JSON,它也只能簡單的解析,想在Value中對取出來的值做一些處理好象是不允許的....
    • Destination:表示解析出來的內容是成為Attribute,還是直接替換Flow File內容,這里設定是做為屬性,所以Processor處理后就可以在Flow File上看到多出自定義的那些屬性以及它們的值
    • Return Type:返回值的類型,這種簡單從JSON中取值的可以使用Auto-detect即可
    • Path Not Found Behavior:是說如果設定需要解析的JSON路徑不存在時的處理行為
    • Null Value Representation:這個對于Null值的處理, "empty string"會將null設為空字符串(MO=),另外一個"the string 'null'"則是會將null設為"null"這樣的字符串 (MO="null")
    • MO/MODELFAMILY/....:這些是我手工添加的屬性名稱,需要根據JSON長樣來設,對應Value設定的$.MO則是表示MO的值來源于JSON第一層的"MO"節點。
      • 需要注意的一點是屬性名稱貌似是會區分大小寫的,所以可以看到我全部使用的大寫
    • 截圖是運行時態的Procssor,停止運行時PROPERTIES上會有一個 + 號,點它即可以新增自己的屬性
      • 有一點比較奇怪的地方,就是通過+號維護進去的多個屬性,它們的排列順序卻不是你手工新增的順序,這點引發另外一處的一個疑問,會在下面講

        EvaluateJsonPath2,當然也是要從JSON中解析,只不過我是要把整個JSON的內容都保留下來,由于它們要求的設定不同,所以被迫要撕成兩個元件來做
    • Destination:這個設定仍然是屬性
    • Return Type:json,第一個解析元件雖然可以隨意設置,但把這兩種合并成一個元件并使用Auto時就會報錯,所以看起來第一種簡單屬性實際上只支持Scalar吧...
    • JSONDATA:我定義的一個屬性名稱,注意Value中設定的"@"符號,它表示整份FlowFile的內容(前面已經轉成一個JSON)
    • 這個JSONDATA是因為我的需求,因為Kafka上的資料來源于其它系統,而我其實只需要其中的少量幾個欄位 (前一個EvaluateJsonPath解析的那些),為了備查數據上的其它欄位以及在后續使用,所以才要把整份JSON都保留到DB中去 (說得這么高端,實際的原因卻是他們的JSON屬性是用程序硬拼字串拼出來的,有的東西實在是在Nifi中搞不出來......)

        UpdateAttribute,元件用途是對FlowFile的Attrubute進行修改,這里是拿來對解析出來的值進行再加工以及添加新屬性
    • Delete Attributes Expression:這個屬性如果有設置就表示該Processor為Delete屬性的狀態,會忽略你新加的那些屬性處理,只專心做好一件事"刪除符合條件的屬性"
    • PROVIDER:這是一個新的屬性,它并沒有包含在JSON中,是為表示數據來源而新加的
    • SO:這個就是前面
      EvaluateJsonPath1解析出來的某個值,那個元件無法直接加工,所以放在這里做的二次加工,去掉前導0

        AttributesToJson,作用是將一堆Attribute轉換為Json,當然就只能是那種簡單結構的Json,這里使用它是為了配合后面一個Processor的使用
    • Attributes List:拿來生成JSON的屬性列表,這里我其實把EvaluateJsonPath1、EvaluateJsonPath2和UpdateAttribute產生的屬性都放上去了 (它們就是我落地MariaDB的Table列)
      • 不得不說的一個灰常遺憾的結果:那就是生成的JSON屬性順序絕對不是你在List中寫的屬性順序,我比較懷疑是在前面幾個組件生成Attribute的順序,但更讓人遺憾的是它們的順序也不會是你維護它們的順序。這個結果會導致我們在另外的Case 2中會碰到一個不可逾越的障礙~~~~
    • Attributes Regular Expression:符合條件的正則表達式
    • Destination: 這個屬性在 
      EvaluateJsonPath上
      就有, 它可以讓結果成為一個新的屬性還是直接替換FlowFile的內容, Default是直接換掉FlowFile的內容。

        ConvertJsonToSQL,作用是根據JSON內容轉換成SQL語句以及語句所要的參數,經過這一關后FlowFile的內容就變成SQL語句,然后Attribute中多出一些參數

    • JDBC Connection Pool:Configrue中指定的MariaDB連接字符串,那里直接有指定Schema
    • Statement Type:這個有INSERT、UPDATE、DELETE這3個選項,若是Mongo的那個組件就會看到有UPSERT(Update or Insert),其它各類的都木有~~~,這里我使用的是INSERT選項,后面通過玩的一點小花招把它再折騰為REPLACE INTO
    • Update Keys: 這個屬性可以不填,它是For Update時使用的
    • SQL Parameter Attribute Prefix: default = sql,它其實影響到組件處理后生成的SQL語句參數叫什么,設為sql,最后就會看到生成出來
      。如下圖就是處理后的Attribute樣式,它會產生sql.args.X.type和sql.args.X.value,這一組合起來就對應于SQL中第一個?參數的類型及值,”sql"就是我們這里設置的前綴名稱 (充分考慮到大家會想要搞事)

        ReplaceText,作用是文本替換,這里就是我們處理Update Or Insert的關鍵,直接把SQL語句換掉它
    • Search Value:在FlowFile中查找的字符串,它支持正則
    • Replacement Value:替換的值,這里就是簡單的把Insert Into (x1,x2,x3) values (?,?,?)處理為Replace Into (x1,x2,x3) values (?,?,?)而已,Replace Or Insert的行為交給DB去做

        PutSQL,作用是在DB上執行一段SQL語句
    • JDBC Connection Pool:前面ConvertJsonToSQL轉換SQL時就有用過,指定數據庫的連接
    • SQL Statement:需執行的SQL,為空時表示使用前面傳遞過來的FlowFile的內容(已經是一個SQL語句)


    總結:
        這是一個帶有Key值(多個Key列)的無刪除行為的資料接收,所以可以利用AttributeToJSON去將提取出來的有用屬性重新生成JSON文件,并直接利用ConvertJsonToSQL轉換為Insert語句及對應的綁定參數,這里借用了MariaDB提供的Replace Into機制去自動使用表上的Key鍵去做Update更新,所以整個Nifi Flow還是比較簡單。在后續文章中會講到帶Delete行為的資料接收方法以及無Key更新的解決方案

    i am ddm

    主站蜘蛛池模板: 在线观看日本亚洲一区| 亚洲精品在线播放视频| 精品特级一级毛片免费观看| 成人免费淫片在线费观看| 亚洲精品一区二区三区四区乱码| 无码人妻一区二区三区免费看| 国产AV无码专区亚洲A∨毛片| 免费无码黄网站在线看| 亚洲成AV人片在WWW色猫咪| 日本免费人成网ww555在线| 亚洲av色福利天堂| 久久99国产综合精品免费| 亚洲神级电影国语版| 久久精品a一国产成人免费网站| 在线aⅴ亚洲中文字幕| 日韩毛片免费在线观看| 免费国产污网站在线观看不要卡 | 国内精品久久久久影院亚洲| 无码乱肉视频免费大全合集| ASS亚洲熟妇毛茸茸PICS| 成人毛片18岁女人毛片免费看| 亚洲av无码一区二区三区人妖 | 永久免费av无码网站大全| 欧美日韩亚洲精品| 久久久久亚洲AV成人网人人软件| 国产免费一区二区三区不卡| 久久亚洲精品无码VA大香大香| 夜夜嘿视频免费看| 黄床大片免费30分钟国产精品| 亚洲av鲁丝一区二区三区| 黄页网站免费观看| 免费国产在线精品一区 | 亚洲一区二区三区在线| 国产一级淫片a免费播放口之| jizz免费在线观看| 亚洲精品中文字幕无乱码| 嫩草影院免费观看| 怡红院免费的全部视频| 亚洲免费网站在线观看| 亚洲精品黄色视频在线观看免费资源| 免费视频精品一区二区三区|