Hive 的官方文檔中對查詢語言有了很詳細的描述,請參考:http://wiki.apache.org/hadoop/Hive/LanguageManual ,本文的內容大部分翻譯自該頁面,期間加入了一些在使用過程中需要注意到的事項。
Create Table
CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name
[(col_name data_type [COMMENT col_comment], ...)]
[COMMENT table_comment]
[PARTITIONED BY (col_name data_type
[COMMENT col_comment], ...)]
[CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name [ASC|DESC], ...)]
INTO num_buckets BUCKETS]
[ROW FORMAT row_format]
[STORED AS file_format]
[LOCATION hdfs_path]
CREATE TABLE 創建一個指定名字的表。如果相同名字的表已經存在,則拋出異常;用戶可以用 IF NOT EXIST 選項來忽略這個異常。
EXTERNAL 關鍵字可以讓用戶創建一個外部表,在建表的同時指定一個指向實際數據的路徑(LOCATION),Hive 創建內部表時,會將數據移動到數據倉庫指向的路徑;若創建外部表,僅記錄數據所在的路徑,不對數據的位置做任何改變。在刪除表的時候,內部表的元數據和數據會被一起刪除,而外部表只刪除元數據,不刪除數據。
LIKE 允許用戶復制現有的表結構,但是不復制數據。
用戶在建表的時候可以自定義 SerDe 或者使用自帶的 SerDe。如果沒有指定 ROW FORMAT 或者 ROW FORMAT DELIMITED,將會使用自帶的 SerDe。在建表的時候,用戶還需要為表指定列,用戶在指定表的列的同時也會指定自定義的 SerDe,Hive 通過 SerDe 確定表的具體的列的數據。
如果文件數據是純文本,可以使用 STORED AS TEXTFILE。如果數據需要壓縮,使用 STORED AS SEQUENCE 。
有分區的表可以在創建的時候使用 PARTITIONED BY 語句。一個表可以擁有一個或者多個分區,每一個分區單獨存在一個目錄下。而且,表和分區都可以對某個列進行 CLUSTERED BY 操作,將若干個列放入一個桶(bucket)中。也可以利用SORT BY 對數據進行排序。這樣可以為特定應用提高性能。
表名和列名不區分大小寫,SerDe 和屬性名區分大小寫。表和列的注釋是字符串。
Drop Table
刪除一個內部表的同時會同時刪除表的元數據和數據。刪除一個外部表,只刪除元數據而保留數據。
Alter Table
Alter table 語句允許用戶改變現有表的結構。用戶可以增加列/分區,改變serde,增加表和 serde 熟悉,表本身重命名。
Add Partitions
ALTER TABLE table_name ADD
partition_spec [ LOCATION 'location1' ]
partition_spec [ LOCATION 'location2' ] ...
partition_spec:
: PARTITION (partition_col = partition_col_value,
partition_col = partiton_col_value, ...)
用戶可以用 ALTER TABLE ADD PARTITION 來向一個表中增加分區。當分區名是字符串時加引號。
ALTER TABLE page_view ADD
PARTITION (dt='2008-08-08', country='us')
location '/path/to/us/part080808'
PARTITION (dt='2008-08-09', country='us')
location '/path/to/us/part080809';
DROP PARTITION
ALTER TABLE table_name DROP
partition_spec, partition_spec,...
用戶可以用 ALTER TABLE DROP PARTITION 來刪除分區。分區的元數據和數據將被一并刪除。
ALTER TABLE page_view
DROP PARTITION (dt='2008-08-08', country='us');
RENAME TABLE
ALTER TABLE table_name RENAME TO new_table_name
這個命令可以讓用戶為表更名。數據所在的位置和分區名并不改變。換而言之,老的表名并未“釋放”,對老表的更改會改變新表的數據。
Change Column Name/Type/Position/Comment
ALTER TABLE table_name CHANGE [COLUMN]
col_old_name col_new_name column_type
[COMMENT col_comment]
[FIRST|AFTER column_name]
這個命令可以允許用戶修改一個列的名稱、數據類型、注釋或者位置。
比如:
CREATE TABLE test_change (a int, b int, c int);
ALTER TABLE test_change CHANGE a a1 INT;
將 a 列的名字改為 a1.
ALTER TABLE test_change CHANGE a a1 STRING AFTER b;
將 a 列的名字改為 a1,a 列的數據類型改為 string,并將它放置在列 b 之后。新的表結構為: b int, a1 string, c int.
ALTER TABLE test_change CHANGE b b1 INT FIRST;
會將 b 列的名字修改為 b1, 并將它放在第一列。新表的結構為: b1 int, a string, c int.
注意:對列的改變只會修改 Hive 的元數據,而不會改變實際數據。用戶應該確定保證元數據定義和實際數據結構的一致性。
Add/Replace Columns
ALTER TABLE table_name ADD|REPLACE
COLUMNS (col_name data_type [COMMENT col_comment], ...)
ADD COLUMNS 允許用戶在當前列的末尾增加新的列,但是在分區列之前。
REPLACE COLUMNS 刪除以后的列,加入新的列。只有在使用 native 的 SerDE(DynamicSerDe or MetadataTypeColumnsetSerDe)的時候才可以這么做。
Alter Table Properties
ALTER TABLE table_name SET TBLPROPERTIES table_properties
table_properties:
: (property_name = property_value, property_name = property_value, ... )
用戶可以用這個命令向表中增加 metadata,目前 last_modified_user,last_modified_time 屬性都是由 Hive 自動管理的。用戶可以向列表中增加自己的屬性。可以使用 DESCRIBE EXTENDED TABLE 來獲得這些信息。
Add Serde Properties
ALTER TABLE table_name
SET SERDE serde_class_name
[WITH SERDEPROPERTIES serde_properties]
ALTER TABLE table_name
SET SERDEPROPERTIES serde_properties
serde_properties:
: (property_name = property_value,
property_name = property_value, ... )
這個命令允許用戶向 SerDe 對象增加用戶定義的元數據。Hive 為了序列化和反序列化數據,將會初始化 SerDe 屬性,并將屬性傳給表的 SerDe。如此,用戶可以為自定義的 SerDe 存儲屬性。
Alter Table File Format and Organization
ALTER TABLE table_name SET FILEFORMAT file_format
ALTER TABLE table_name CLUSTERED BY (col_name, col_name, ...)
[SORTED BY (col_name, ...)] INTO num_buckets BUCKETS
這個命令修改了表的物理存儲屬性。
Loading files into table
當數據被加載至表中時,不會對數據進行任何轉換。Load 操作只是將數據復制/移動至 Hive 表對應的位置。
Syntax:
LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE]
INTO TABLE tablename
[PARTITION (partcol1=val1, partcol2=val2 ...)]
Synopsis:
Load 操作只是單純的復制/移動操作,將數據文件移動到 Hive 表對應的位置。
- filepath 可以是:
- 相對路徑,例如:project/data1
- 絕對路徑,例如: /user/hive/project/data1
- 包含模式的完整 URI,例如:hdfs://namenode:9000/user/hive/project/data1
- 加載的目標可以是一個表或者分區。如果表包含分區,必須指定每一個分區的分區名。
- filepath 可以引用一個文件(這種情況下,Hive 會將文件移動到表所對應的目錄中)或者是一個目錄(在這種情況下,Hive 會將目錄中的所有文件移動至表所對應的目錄中)。
- 如果指定了 LOCAL,那么:
- load 命令會去查找本地文件系統中的 filepath。如果發現是相對路徑,則路徑會被解釋為相對于當前用戶的當前路徑。用戶也可以為本地文件指定一個完整的 URI,比如:file:///user/hive/project/data1.
- load 命令會將 filepath 中的文件復制到目標文件系統中。目標文件系統由表的位置屬性決定。被復制的數據文件移動到表的數據對應的位置。
- 如果沒有指定 LOCAL 關鍵字,如果 filepath 指向的是一個完整的 URI,hive 會直接使用這個 URI。 否則:
- 如果沒有指定 schema 或者 authority,Hive 會使用在 hadoop 配置文件中定義的 schema 和 authority,fs.default.name 指定了 Namenode 的 URI。
- 如果路徑不是絕對的,Hive 相對于 /user/ 進行解釋。
- Hive 會將 filepath 中指定的文件內容移動到 table (或者 partition)所指定的路徑中。
- 如果使用了 OVERWRITE 關鍵字,則目標表(或者分區)中的內容(如果有)會被刪除,然后再將 filepath 指向的文件/目錄中的內容添加到表/分區中。
- 如果目標表(分區)已經有一個文件,并且文件名和 filepath 中的文件名沖突,那么現有的文件會被新文件所替代。
SELECT
Syntax
SELECT [ALL | DISTINCT] select_expr, select_expr, ...
FROM table_reference
[WHERE where_condition]
[GROUP BY col_list]
[
CLUSTER BY col_list
| [DISTRIBUTE BY col_list]
[SORT BY col_list]
]
[LIMIT number]
- 一個SELECT語句可以是一個union查詢或一個子查詢的一部分。
- table_reference是查詢的輸入,可以是一個普通表、一個視圖、一個join或一個子查詢
- 簡單查詢。例如,下面這一語句從t1表中查詢所有列的信息。
SELECT * FROM t1
WHERE Clause
where condition 是一個布爾表達式。例如,下面的查詢語句只返回銷售記錄大于 10,且歸屬地屬于美國的銷售代表。Hive 不支持在WHERE 子句中的 IN,EXIST 或子查詢。
SELECT * FROM sales WHERE amount > 10 AND region = "US"
ALL and DISTINCT Clauses
使用ALL和DISTINCT選項區分對重復記錄的處理。默認是ALL,表示查詢所有記錄。DISTINCT表示去掉重復的記錄。
hive> SELECT col1, col2 FROM t1
1 3
1 3
1 4
2 5
hive> SELECT DISTINCT col1, col2 FROM t1
1 3
1 4
2 5
hive> SELECT DISTINCT col1 FROM t1
1
2
基于Partition的查詢
一般 SELECT 查詢會掃描整個表(除非是為了抽樣查詢)。但是如果一個表使用 PARTITIONED BY 子句建表,查詢就可以利用分區剪枝(input pruning)的特性,只掃描一個表中它關心的那一部分。Hive 當前的實現是,只有分區斷言出現在離 FROM 子句最近的那個WHERE 子句中,才會啟用分區剪枝。例如,如果 page_views 表使用 date 列分區,以下語句只會讀取分區為‘2008-03-01’的數據。
SELECT page_views.*
FROM page_views
WHERE page_views.date >= '2008-03-01'
AND page_views.date
HAVING Clause
Hive 現在不支持 HAVING 子句。可以將 HAVING 子句轉化為一個字查詢,例如:
SELECT col1 FROM t1 GROUP BY col1 HAVING SUM(col2) > 10
可以用以下查詢來表達:
SELECT col1 FROM (SELECT col1, SUM(col2) AS col2sum
FROM t1 GROUP BY col1) t2
WHERE t2.col2sum > 10
LIMIT Clause
Limit 可以限制查詢的記錄數。查詢的結果是隨機選擇的。下面的查詢語句從 t1 表中隨機查詢5條記錄:
SELECT * FROM t1 LIMIT 5
Top k 查詢。下面的查詢語句查詢銷售記錄最大的 5 個銷售代表。
SET mapred.reduce.tasks = 1
SELECT * FROM sales SORT BY amount DESC LIMIT 5
REGEX Column Specification
SELECT 語句可以使用正則表達式做列選擇,下面的語句查詢除了 ds 和 hr 之外的所有列:
SELECT `(ds|hr)?+.+` FROM sales
Join
Syntax
join_table:
table_reference JOIN table_factor [join_condition]
| table_reference {LEFT|RIGHT|FULL} [OUTER]
JOIN table_reference join_condition
| table_reference LEFT SEMI JOIN
table_reference join_condition
table_reference:
table_factor
| join_table
table_factor:
tbl_name [alias]
| table_subquery alias
| ( table_references )
join_condition:
ON equality_expression ( AND equality_expression )*
equality_expression:
expression = expression
Hive 只支持等值連接(equality joins)、外連接(outer joins)和(left semi joins???)。Hive 不支持所有非等值的連接,
因為非等值連接非常難轉化到 map/reduce 任務。另外,Hive 支持多于 2 個表的連接。
寫 join 查詢時,需要注意幾個關鍵點:
1. 只支持等值join,例如:
SELECT a.* FROM a JOIN b ON (a.id = b.id)
SELECT a.* FROM a JOIN b
ON (a.id = b.id AND a.department = b.department)
是正確的,然而:
SELECT a.* FROM a JOIN b ON (a.id b.id)
是錯誤的。
2. 可以 join 多于 2 個表,例如
SELECT a.val, b.val, c.val FROM a JOIN b
ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
如果join中多個表的 join key 是同一個,則 join 會被轉化為單個 map/reduce 任務,例如:
SELECT a.val, b.val, c.val FROM a JOIN b
ON (a.key = b.key1) JOIN c
ON (c.key = b.key1)
被轉化為單個 map/reduce 任務,因為 join 中只使用了 b.key1 作為 join key。
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1)
JOIN c ON (c.key = b.key2)
而這一 join 被轉化為 2 個 map/reduce 任務。因為 b.key1 用于第一次 join 條件,而 b.key2 用于第二次 join。
join 時,每次 map/reduce 任務的邏輯是這樣的:reducer 會緩存 join 序列中除了最后一個表的所有表的記錄,
再通過最后一個表將結果序列化到文件系統。這一實現有助于在 reduce 端減少內存的使用量。實踐中,應該把最大的那個表寫在最后
(否則會因為緩存浪費大量內存)。例如:
SELECT a.val, b.val, c.val FROM a
JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
所有表都使用同一個 join key(使用 1 次 map/reduce 任務計算)。Reduce 端會緩存 a 表和 b 表的記錄,然后每次取得一個 c 表的記錄就計算一次 join 結果,類似的還有:
SELECT a.val, b.val, c.val FROM a
JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
這里用了 2 次 map/reduce 任務。第一次緩存 a 表,用 b 表序列化;第二次緩存第一次 map/reduce 任務的結果,然后用 c 表序列化。
LEFT,RIGHT 和 FULL OUTER 關鍵字用于處理 join 中空記錄的情況,例如:
SELECT a.val, b.val FROM a LEFT OUTER
JOIN b ON (a.key=b.key)
對應所有 a 表中的記錄都有一條記錄輸出。輸出的結果應該是 a.val, b.val,當 a.key=b.key 時,而當 b.key 中找不到等值的
a.key 記錄時也會輸出 a.val, NULL。“FROM a LEFT OUTER JOIN b”這句一定要寫在同一行——意思是 a 表在 b 表的左邊,所以
a 表中的所有記錄都被保留了;“a RIGHT OUTER JOIN b”會保留所有 b 表的記錄。OUTER JOIN 語義應該是遵循標準 SQL spec的。
Join 發生在 WHERE 子句之前。如果你想限制 join 的輸出,應該在 WHERE 子句中寫過濾條件——或是在 join 子句中寫。
這里面一個容易混淆的問題是表分區的情況:
SELECT a.val, b.val FROM a
LEFT OUTER JOIN b ON (a.key=b.key)
WHERE a.ds='2009-07-07' AND b.ds='2009-07-07'
會 join a 表到 b 表(OUTER JOIN),列出 a.val 和 b.val 的記錄。WHERE 從句中可以使用其他列作為過濾條件。
但是,如前所述,如果 b 表中找不到對應 a 表的記錄,b 表的所有列都會列出 NULL,包括 ds 列。也就是說,join
會過濾 b 表中不能找到匹配 a 表 join key 的所有記錄。這樣的話,LEFT OUTER 就使得查詢結果與 WHERE 子句無關了。
解決的辦法是在 OUTER JOIN 時使用以下語法:
SELECT a.val, b.val FROM a LEFT OUTER JOIN b
ON (a.key=b.key AND
b.ds='2009-07-07' AND
a.ds='2009-07-07')
這一查詢的結果是預先在 join 階段過濾過的,所以不會存在上述問題。這一邏輯也可以應用于 RIGHT 和 FULL 類型的 join 中。
Join 是不能交換位置的。無論是 LEFT 還是 RIGHT join,都是左連接的。
SELECT a.val1, a.val2, b.val, c.val
FROM a
JOIN b ON (a.key = b.key)
LEFT OUTER JOIN c ON (a.key = c.key)
先 join a 表到 b 表,丟棄掉所有 join key 中不匹配的記錄,然后用這一中間結果和 c 表做 join。這一表述有一個不太明顯的問題,
就是當一個 key 在 a 表和 c 表都存在,但是 b 表中不存在的時候:整個記錄在第一次 join,即 a JOIN b 的時候都被丟掉了
(包括a.val1,a.val2和a.key),然后我們再和 c 表 join 的時候,如果 c.key 與 a.key 或 b.key 相等,就會得到這樣的結果:
NULL, NULL, NULL, c.val。
LEFT SEMI JOIN 是 IN/EXISTS 子查詢的一種更高效的實現。Hive 當前沒有實現 IN/EXISTS 子查詢,所以你可以用 LEFT SEMI JOIN
重寫你的子查詢語句。LEFT SEMI JOIN 的限制是, JOIN 子句中右邊的表只能在 ON 子句中設置過濾條件,在 WHERE 子句、
SELECT 子句或其他地方過濾都不行。
SELECT a.key, a.value
FROM a
WHERE a.key in
(SELECT b.key
FROM B);
可以被重寫為:
SELECT a.key, a.val
FROM a LEFT SEMI JOIN b on (a.key = b.key
Hive 優化
Hive 針對不同的查詢進行了優化,優化可以通過配置進行控制,本文將介紹部分優化的策略以及優化控制選項。
列裁剪(Column Pruning)
在讀數據的時候,只讀取查詢中需要用到的列,而忽略其他列。例如,對于查詢:
SELECT a,b FROM T WHERE e
其中,T 包含 5 個列 (a,b,c,d,e),列 c,d 將會被忽略,只會讀取a, b, e 列
這個選項默認為真: hive.optimize.cp = true
分區裁剪(Partition Pruning)
在查詢的過程中減少不必要的分區。例如,對于下列查詢:
SELECT * FROM (SELECT c1, COUNT(1)
FROM T GROUP BY c1) subq
WHERE subq.prtn = 100;
SELECT * FROM T1 JOIN
(SELECT * FROM T2) subq ON (T1.c1=subq.c2)
WHERE subq.prtn = 100;
會在子查詢中就考慮 subq.prtn = 100 條件,從而減少讀入的分區數目。
此選項默認為真:hive.optimize.pruner=true
Join
在使用寫有 Join 操作的查詢語句時有一條原則:應該將條目少的表/子查詢放在 Join 操作符的左邊。原因是在 Join 操作的 Reduce
階段,位于 Join 操作符左邊的表的內容會被加載進內存,將條目少的表放在左邊,可以有效減少發生 OOM 錯誤的幾率。
對于一條語句中有多個 Join 的情況,如果 Join 的條件相同,比如查詢:
INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x ON (u.userid = x.userid);
- 如果 Join 的 key 相同,不管有多少個表,都會則會合并為一個 Map-Reduce
- 一個 Map-Reduce 任務,而不是 ‘n’ 個
- 在做 OUTER JOIN 的時候也是一樣
如果 Join 的條件不相同,比如:
INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x on (u.age = x.age);
Map-Reduce 的任務數目和 Join 操作的數目是對應的,上述查詢和以下查詢是等價的:
INSERT OVERWRITE TABLE tmptable
SELECT * FROM page_view p JOIN user u
ON (pv.userid = u.userid);
INSERT OVERWRITE TABLE pv_users
SELECT x.pageid, x.age FROM tmptable x
JOIN newuser y ON (x.age = y.age);
Map Join
Join 操作在 Map 階段完成,不再需要Reduce,前提條件是需要的數據在 Map 的過程中可以訪問到。比如查詢:
INSERT OVERWRITE TABLE pv_users
SELECT /*+ MAPJOIN(pv) */ pv.pageid, u.age
FROM page_view pv
JOIN user u ON (pv.userid = u.userid);
可以在 Map 階段完成 Join,如圖所示:

相關的參數為:
- hive.join.emit.interval = 1000 How many rows in the right-most join operand Hive should buffer before emitting the join result.
- hive.mapjoin.size.key = 10000
- hive.mapjoin.cache.numrows = 10000
Group By
- Map 端部分聚合:
- 并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端進行部分聚合,最后在 Reduce 端得出最終結果。
- 基于 Hash
- 參數包括:
- hive.map.aggr = true 是否在 Map 端進行聚合,默認為 True
- hive.groupby.mapaggr.checkinterval = 100000 在 Map 端進行聚合操作的條目數目
- 有數據傾斜的時候進行負載均衡
- hive.groupby.skewindata = false
- 當選項設定為 true,生成的查詢計劃會有兩個 MR Job。第一個 MR Job 中,Map 的輸出結果集合會隨機分布到 Reduce 中,每個 Reduce 做部分聚合操作,并輸出結果,這樣處理的結果是相同的 Group By Key 有可能被分發到不同的 Reduce 中,從而達到負載均衡的目的;第二個 MR Job 再根據預處理的數據結果按照 Group By Key 分布到 Reduce 中(這個過程可以保證相同的 Group By Key 被分布到同一個 Reduce 中),最后完成最終的聚合操作。
合并小文件
文件數目過多,會給 HDFS 帶來壓力,并且會影響處理效率,可以通過合并 Map 和 Reduce 的結果文件來消除這樣的影響:
- hive.merge.mapfiles = true 是否和并 Map 輸出文件,默認為 True
- hive.merge.mapredfiles = false 是否合并 Reduce 輸出文件,默認為 False
- hive.merge.size.per.task = 256*1000*1000 合并文件的大小
Hive 的擴展特性
CREATE TABLE mylog ( user_id BIGINT, page_url STRING, unix_time INT)
STORED AS TEXTFILE;
當用戶的數據文件格式不能被當前 Hive 所識別的時候,可以自定義文件格式。可以參考 contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64 中的例子。寫完自定義的格式后,在創建表的時候指定相應的文件格式就可以:
CREATE TABLE base64_test(col1 STRING, col2 STRING)
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.contrib.
fileformat.base64.Base64TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.contrib.
fileformat.base64.Base64TextOutputFormat';
SerDe
SerDe 是 Serialize/Deserilize 的簡稱,目的是用于序列化和反序列化。序列化的格式包括:
- 分隔符(tab、逗號、CTRL-A)
- Thrift 協議
反序列化(內存內):
- Java Integer/String/ArrayList/HashMap
- Hadoop Writable 類
- 用戶自定義類
目前存在的 Serde 見下圖:

其中,LazyObject 只有在訪問到列的時候才進行反序列化。 BinarySortable:保留了排序的二進制格式。
當存在以下情況時,可以考慮增加新的 SerDe:
- 用戶的數據有特殊的序列化格式,當前的 Hive 不支持,而用戶又不想在將數據加載至 Hive 前轉換數據格式。
- 用戶有更有效的序列化磁盤數據的方法。
用戶如果想為 Text 數據增加自定義 Serde ,可以參照 contrib/src/java/org/apache/hadoop/hive/contrib/serde2/RegexSerDe.java 中的例子。RegexSerDe 利用用戶提供的正則表倒是來反序列化數據,例如:
CREATE TABLE apache_log(
host STRING,
identity STRING,
user STRING,
time STRING,
request STRING,
status STRING,
size STRING,
referer STRING,
agent STRING)
ROW FORMAT
SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES
( "input.regex" = "([^ ]*) ([^ ]*) ([^ ]*) (-|\\\\[[^\\\\]]*\\\\])
([^ \\"]*|\\"[^\\"]*\\") (-|[0-9]*) (-|[0-9]*)(?: ([^ \\"]*|\\"[^\\"]*\\")
([^ \\"]*|\\"[^\\"]*\\"))?",
"output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s";)
STORED AS TEXTFILE;
用戶如果想為 Binary 數據增加自定義的 SerDE,可以參考例子:serde/src/java/org/apache/hadoop/hive/serde2/binarysortable,例如:
CREATE TABLE mythrift_table
ROW FORMAT SERDE
'org.apache.hadoop.hive.contrib.serde2.thrift.ThriftSerDe'
WITH SERDEPROPERTIES (
"serialization.class" = "com.facebook.serde.tprofiles.full",
"serialization.format" = "com.facebook.thrift.protocol.TBinaryProtocol";);
Map/Reduce 腳本(Transform)
用戶可以自定義 Hive 使用的 Map/Reduce 腳本,比如:
FROM (
SELECT TRANSFORM(user_id, page_url, unix_time)
USING 'page_url_to_id.py'
AS (user_id, page_id, unix_time)
FROM mylog
DISTRIBUTE BY user_id
SORT BY user_id, unix_time)
mylog2
SELECT TRANSFORM(user_id, page_id, unix_time)
USING 'my_python_session_cutter.py' AS (user_id, session_info);
Map/Reduce 腳本通過 stdin/stdout 進行數據的讀寫,調試信息輸出到 stderr。
UDF(User-Defined-Function)
用戶可以自定義函數對數據進行處理,例如:
add jar build/ql/test/test-udfs.jar;
CREATE TEMPORARY FUNCTION testlength
AS 'org.apache.hadoop.hive.ql.udf.UDFTestLength';
SELECT testlength(src.value) FROM src;
DROP TEMPORARY FUNCTION testlength;
UDFTestLength.java 為:
package org.apache.hadoop.hive.ql.udf;
public class UDFTestLength extends UDF {
public Integer evaluate(String s) {
if (s == null) {
return null;
}
return s.length();
}
}
自定義函數可以重載:
add jar build/contrib/hive_contrib.jar;
CREATE TEMPORARY FUNCTION example_add
AS 'org.apache.hadoop.hive.contrib.udf.example.UDFExampleAdd';
SELECT example_add(1, 2) FROM src;
SELECT example_add(1.1, 2.2) FROM src;
UDFExampleAdd.java:
public class UDFExampleAdd extends UDF {
public Integer evaluate(Integer a, Integer b) {
if (a = null || b = null)
return null;
return a + b;
}
public Double evaluate(Double a, Double b) {
if (a = null || b = null)
return null;
return a + b;
}
}
%%
在使用 UDF 的時候,會自動進行類型轉換,這個 java 或者 C 中的類型轉換有些類似,比如:
SELECT example_add(1, 2.1) FROM src;
的結果是 3.1,這是因為 UDF 將類型為 Int 的參數 “1″ 轉換為 double。
類型的隱式轉換是通過 UDFResolver 來進行控制的,并且可以根據不同的 UDF 進行不同的控制。
UDF 還可以支持變長的參數,例如 UDFExampleAdd.java:
public class UDFExampleAdd extends UDF {
public Integer evaluate(Integer... a) {
int total = 0;
for (int i=0; i
使用例子為:
SELECT example_add(1, 2) FROM src;
SELECT example_add(1, 2, 3) FROM src;
SELECT example_add(1, 2, 3, 4.1) FROM src;
綜上,UDF 具有以下特性:
- 用 java 寫 UDF 很容易。
- Hadoop 的 Writables/Text 具有較高性能。
- UDF 可以被重載。
- Hive 支持隱式類型轉換。
- UDF 支持變長的參數。
- genericUDF 提供了較好的性能(避免了反射)。
UDAF(User-Defined Aggregation Funcation)
例子:
SELECT page_url, count(1), count(DISTINCT user_id) FROM mylog;
UDAFCount.java:
public class UDAFCount extends UDAF {
public static class Evaluator implements UDAFEvaluator {
private int mCount;
public void init() {
mcount = 0;
}
public boolean iterate(Object o) {
if (o!=null)
mCount++;
return true;
}
public Integer terminatePartial() {
return mCount;
}
public boolean merge(Integer o) {
mCount += o;
return true;
}
public Integer terminate() {
return mCount;
}
}
UDAF 總結:
- 編寫 UDAF 和 UDF 類似
- UDAF 可以重載
- UDAF 可以返回復雜類
- 在使用 UDAF 的時候可以禁止部分聚合功能
UDF,UDAF 和 MR 腳本的對比:
