大大毛
的筆記
  DDM's Note
哪怕沒有辦法一定有說法,
就算沒有鴿子一定有烏鴉,
固執無罪 夢想有價,
讓他們驚訝.
posts - 14, comments - 23, trackbacks - 0, articles - 58
::
首頁
:: ::
聯系
:: ::
管理
Kafka資料落地至MariaDB (帶Key的新增、修改和刪除)
Posted on 2019-04-11 15:40
大大毛
閱讀(284)
評論(0)
編輯
收藏
所屬分類:
Nifi
需求:
比較前一篇文章來說,僅加多Delete的行為。例如我僅需要Status=1的資料,所以對于資料落地來講,最合適的莫過于下面這樣,Table也可以自動保持最少量的有效資料
1. 新資料Status=0,執行Insert;
2. 資料修改Status=0,執行Update;
3. 資料狀態變更Status=1,執行Delete;
4. 若資料狀態重新變更為0,則又會執行Insert;
思路:
按理來說,只要通過分支Route就可以將Insert/Update與Delete作業分成兩條Nifi支流(看網上確實有很多這么整法的),但是用Route有一個問題處理不了,那就是資料順序的正確性你是無法保證的。對于小數據量的場景來說,每筆Key的多次操作間隔可能會比較長,所以它不會有什么問題,但大數據量的情況下,兩同相同Key值的資料走Route后被處理的順序混亂就會造成最終資料結果的異常(比如應該是先Insert再Delete,結果卻是發現資料還躺在Table中)。而大數據量在使用Kafka做為數據源時就不可避免會出現:即使業務數據量確實不大,但對于積累了好幾天的數據再進行接收時,那一瞬間的數據量也會是很大的。
所以我們能做的就是動態決定執行Delete和Insert。
解決方案:
雖然與
前一篇
來說差異不大,但Nifi流程上卻有很大不同,下面會詳細描述為什么要這樣做
Processor及其設定:
ConsumeKafkaRecord、SplitJson、Connection、EvaluateJsonPath
,
與前一章的一樣,只是不同數據下解析的屬性有所不同,這里不再詳述。
UpdateAttribute
,
作用是從Kafka中Consume出資料(以Record的形態),這里使用Record是因為源數據就是以Record的方式存上去的 (Avro Schema)
SQL1
:這才是這次花招的關鍵,在這里根據STATUS自行構建SQL語句
Status=0
:構建的就是Delete From xxx Where Key1=? and Key2=? 這樣的刪除語句
Status!=0
:構建的就是Replace Into xxx (Key1,Key2,col3,col4) Values ( ?, ?, ?, ?) 這樣的Insert or Update語句
經此Processor處理后,資料落地所需的SQL就構建好了,后續的問題就是如何去綁定參數和執行
ReplaceText
,
作用是對FlowFile進行文本替換,這里使用它來直接產生我所需的JSON內容
Search Value
:這里使用的是Default的
(?s)(^.*$),作用就是把原先的整份文件全部換掉
Replacement Value
:這里放的就是一個固定結構的JSON,可以看到里面的屬性值都是使用的Attribute (它們的值來源于前面的EvaluateJsonPath從源JSON文件中的提取)
細心的朋友可以發現這里是與前一篇文章的最大不同,這里沒有使用AttributeToJson去直接產生JSON文件,而使用的是更加笨拙的方式
前面的文章有提過,我們產生的Attribute以及AttributeToJson所生成JSON中各屬性的順序問題,結論是怎么搞它都不是我所想象到的順序。但是ConvertJsonToSQL這個東東卻很實在,它確確實實是按JSON中屬性的順序去生成的SQL以及參數名稱(還記得參數名稱sql.args.
1
.value中的這個順序
1
么),所以問題就來了:
SQL由于必須要有Delete和Replace,所以它們的參數個數一定是不同的,而Delete壓的參數又是我們的Key,所以就必須要保證ConvertJsonToSQL生成屬性的順序,這樣我們才能夠保證我們的兩個Key一定會是sql.args.1和sql.args.2
換句話說,如果AttributesToJson若是能夠保證JSON屬性順序的話,那就不用這么費勁
ConvertJsonToSQL
,
與前文一樣,以Insert的方式生成SQL和綁定參數即可
UpdateAttribute
,
終于用到了它的Delete功能,作用是清除掉多余的SQL綁定參數
Delete Attributes Expression
:這里我根據Delete的條件(STATUS=0)去刪除多余的SQL綁定參數
這里的寫法比較死,我Hard-code刪除掉大與2的其它所有參數("
*
"
是一個通配符,"
|
"是一個多條件的間隔符),感覺上還有更好的寫法
至此我們就可以保證綁定參數的數量與SQL語法參數個數一致 (不一致它死給你看)
PutSQL
,
這里仍然是執行SQL,這里使用配置參數的形式讓它執行我們的SQL
SQL Statement
:前面用UpdateAttribute產生的SQL1參數,它會根據STATUS=0去判斷是使用DELETE還是REPLACE語法
這個屬性壓上后,無論SQL1是不是為空,這個組件都不會再去管FlowFile的內容(空屬性時是把FlowFile的內容當成SQL去執行的)
新用戶注冊
刷新評論列表
只有注冊用戶
登錄
后才能發表評論。
網站導航:
博客園
IT新聞
Chat2DB
C++博客
博問
管理
相關文章:
Nifi同步數據的幾種方法
RouteOnAttribute的用法
Kafka資料落地至MariaDB (帶Key的新增、修改和刪除)
Kafka資料落地至MariaDB (帶Key的新增、修改)
Oracle資料推送MQTT
Powered by:
BlogJava
Copyright © 大大毛
日歷
<
2025年5月
>
日
一
二
三
四
五
六
27
28
29
30
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1
2
3
4
5
6
7
公告
果然是不能想得太好。
隨筆分類
(4)
VB培訓(4)
文章分類
(59)
JAVA
(6)
Spring
(3)
Hibernate
Struts
(12)
NET
VB
(2)
ASP
(1)
ASP.NET
(6)
HTML
(3)
400
(2)
I4.0
Nifi
(5)
Angular
(1)
SQL
(15)
常用算法
(1)
其它
(2)
積分與排名
積分 - 60244
排名 - 871
最新評論
1.?re: 手工添加MyEclipse的XML文件模板[未登錄]
請問,eclipse下面有沒有呢?現在想要實現eclipse的xml的模板進行配置修改,簡單說,就是把新建時候的名字作為其中的一個tag;找了很久沒有找到方法
--allen
2.?re: 第二章 Visual Basic 基礎語法
受益匪淺,多謝!
--yuleself
3.?re: 數字填空
評論內容較長,點擊標題查看
--去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去去
4.?re: Checkbox聯動演示
dcdc
--dcd
5.?re: 利用TN5250NF下載檔案的自動化處理
請教若密碼要動態生成,是否有辦法呢?
謝謝
--江佳桂
i am ddm
主站蜘蛛池模板:
大学生一级毛片免费看
|
亚洲精品成人久久久
|
欧美亚洲国产SUV
|
国产精品亚洲аv无码播放
|
一本岛高清v不卡免费一三区
|
国产精品亚洲综合一区在线观看
|
亚洲第一页日韩专区
|
13小箩利洗澡无码视频网站免费
|
亚洲av成人综合网
|
亚洲一区二区三区偷拍女厕
|
成视频年人黄网站免费视频
|
永久免费精品影视网站
|
亚洲三级视频在线观看
|
久久99亚洲综合精品首页
|
黄页网站在线看免费
|
三上悠亚在线观看免费
|
亚洲欧美日韩中文无线码
|
婷婷亚洲久悠悠色悠在线播放
|
国产成人免费ā片在线观看
|
在线免费中文字幕
|
精品97国产免费人成视频
|
亚洲人成电影网站色www
|
亚洲人成电影在线天堂
|
亚洲?V乱码久久精品蜜桃
|
91网站免费观看
|
一个人免费视频在线观看www
|
亚洲av无码兔费综合
|
亚洲欧洲日韩在线电影
|
一本色道久久综合亚洲精品
|
蜜桃精品免费久久久久影院
|
手机看黄av免费网址
|
特级精品毛片免费观看
|
国产vA免费精品高清在线观看
|
亚洲一区二区三区在线观看网站
|
亚洲国产高清人在线
|
亚洲欧洲国产精品香蕉网
|
又粗又黄又猛又爽大片免费
|
妞干网免费视频在线观看
|
久久精品国产免费观看
|
日韩精品无码免费一区二区三区
|
51午夜精品免费视频
|