Cassandra 的數(shù)據(jù)存儲(chǔ)結(jié)構(gòu)

Cassandra 的數(shù)據(jù)模型是基于列族(Column Family)的四維或五維模型。它借鑒了 Amazon 的 Dynamo 和 Google's BigTable 的數(shù)據(jù)結(jié)構(gòu)和功能特點(diǎn),采用 Memtable 和 SSTable 的方式進(jìn)行存儲(chǔ)。在 Cassandra 寫入數(shù)據(jù)之前,需要先記錄日志 ( CommitLog ),然后數(shù)據(jù)開始寫入到 Column Family 對應(yīng)的 Memtable 中,Memtable 是一種按照 key 排序數(shù)據(jù)的內(nèi)存結(jié)構(gòu),在滿足一定條件時(shí),再把 Memtable 的數(shù)據(jù)批量的刷新到磁盤上,存儲(chǔ)為 SSTable 。

圖 1. Cassandra 的數(shù)據(jù)模型圖:

  1. Cassandra 的數(shù)據(jù)模型的基本概念:
  2. 1. Cluster : Cassandra 的節(jié)點(diǎn)實(shí)例,它可以包含多個(gè) Keyspace
    2. Keyspace : 用于存放 ColumnFamily 的容器,相當(dāng)于關(guān)系數(shù)據(jù)庫中的 Schema 或 database3. ColumnFamily : 用于存放 Column 的容器,類似關(guān)系數(shù)據(jù)庫中的 table 的概念 4. SuperColumn :它是一個(gè)特列殊的 Column, 它的 Value 值可以包函多個(gè) Column5. Columns:Cassandra 的最基本單位。由 name , value , timestamp 組成

下面是關(guān)于數(shù)據(jù)模型實(shí)例分析 :


圖 2. 數(shù)據(jù)模型實(shí)例分析
圖 2. 數(shù)據(jù)模型實(shí)例分析


Cassandra 節(jié)點(diǎn)的安裝和配置

獲取 Cassandra

 # wget  http://labs.renren.com/apache-mirror/cassandra/0.6.0/apache- 
cassandra-0.6.0-rc1-bin.tar.gz
# tar -zxvf apache-cassandra-0.6.0-rc1-bin.tar.gz
# mv apache-cassandra-0.6.0-rc1 cassandra
# ls Cassandra



Cassandra 的目錄說明

bin 存放與 Cassandra 操作的相關(guān)腳本
conf 存放配置文件的目錄
interface Cassandra 的 Thrift 接口定義文件,可以用于生成各種編程語言的接口代碼
Javadoc 源代碼的 javadoc
lib Cassandra 運(yùn)行時(shí)所需的 jar 包

 

配制 Cassandra 節(jié)點(diǎn)的數(shù)據(jù)存儲(chǔ)目錄和日志目錄

修改配制文件 storage-conf.xml:


默認(rèn)的內(nèi)容

				
<CommitLogDirectory>/var/lib/cassandra/commitlog</CommitLogDirectory>
<DataFileDirectories>
<DataFileDirectory>/var/lib/cassandra/data</DataFileDirectory>
</DataFileDirectories>



配置后的內(nèi)容

				
<CommitLogDirectory>/data3/db/lib/cassandra/commitlog</CommitLogDirectory>
<DataFileDirectories>
<DataFileDirectory>/data3/db/lib/cassandra/data</DataFileDirectory>
</DataFileDirectories>

 

修改日志配制文件 log4j.properties:


log4j.properties 配置

				
# 日志路徑
#log4j.appender.R.File=/var/log/cassandra/system.log
# 配置后的日志路徑 :
log4j.appender.R.File=/data3/db/log/cassandra/system.log

 

創(chuàng)建文件存放數(shù)據(jù)和日志的目錄

 # mkdir – p /data3/db/lib/cassandra 
# mkdir – p /data3/db/log/Cassandra

 

配制完成后,啟動(dòng) Cassandra

 # bin/Cassandra 

 

顯示信息

 INFO 09:29:12,888 Starting up server gossip 
INFO 09:29:12,992 Binding thrift service to localhost/127.0.0.1:9160

 

看到這兩行啟動(dòng)回顯信息時(shí),說明 Cassandra 已啟動(dòng)成功。

連接到 Cassandra 并添加、獲取數(shù)據(jù)

Cassandra 的 bin 目錄已自帶了命令行連接工具 cassandra-cli,可使用它連接到 Cassandra,并添加、讀取數(shù)據(jù)。


連接到 Cassandra,并添加、讀取數(shù)據(jù)

				
# bin/cassandra-cli --host localhost --port 9160
Connected to: "Test Cluster" on localhost/9160
Welcome to cassandra CLI.
Type 'help' or '?' for help. Type 'quit' or 'exit' to quit.
cassandra>
cassandra> set Keyspace1.Standard2['studentA']['age'] = '18'
Value inserted
cassandra> get Keyspace1.Standard2['studentA']
=> (column=age, value=18, timestamp=1272357045192000)
Returned 1 results

 

停止 Cassandra 服務(wù)


查出 Cassandra 的 pid:16328

				
# ps -ef | grep cassandra
# kill 16328

 

Cassandra 配制文件 storage-conf.xml 相關(guān)配制介紹


清單 1. storage-conf.xml 節(jié)點(diǎn)配制說明清單

				
<!-- 集群時(shí)顯示的節(jié)點(diǎn)名稱 -->
<ClusterName>Test Cluster</ClusterName>
<!-- 節(jié)點(diǎn)啟動(dòng)時(shí),是否自動(dòng)加入到集群中,默認(rèn)為 false -->
<AutoBootstrap>false</AutoBootstrap>
<!-- 集群的節(jié)點(diǎn)配制 -->
<Seeds>
<Seed>127.0.0.1</Seed>
</Seeds>
<!-- 節(jié)點(diǎn)之間通迅的監(jiān)聽地址 -->
<ListenAddress>localhost</ListenAddress>
<!--
基于 Thrift 的 cassandra 客戶端監(jiān)聽地址,
集群時(shí)設(shè)為:0.0.0.0 表示偵聽所有客戶端 , 默認(rèn)為:localhost
-->
<ThriftAddress>localhost</ThriftAddress>
<!-- 客戶端連接的端口 -->
<ThriftPort>9160</ThriftPort>
<!--
FlushDataBufferSizeInMB 將 memtables 上的數(shù)據(jù)寫入在 Disk 上,
超過設(shè)定好的限制大小時(shí) ( 默認(rèn) 32M),則將數(shù)據(jù)寫入磁盤,
FlushIndexBufferSizeInMB 超過設(shè)定的時(shí)長(默認(rèn) 8 分鐘)后,
將 memtables 由的數(shù)據(jù)寫入磁盤中
-->
<FlushDataBufferSizeInMB>32</FlushDataBufferSizeInMB>
<FlushIndexBufferSizeInMB>8</FlushIndexBufferSizeInMB>
<!--
節(jié)點(diǎn)之間的日志記錄同步模式。
默認(rèn):periodic, 對應(yīng)配制 CommitLogSyncPeriodInMS
啟動(dòng) batch 時(shí),則對應(yīng)的配制 CommitLogSyncBatchWindowInMS
-->
<CommitLogSync>periodic</CommitLogSync>
<!-- 默認(rèn)為每 10 秒同步一次日志記錄 -->
<CommitLogSyncPeriodInMS>10000</CommitLogSyncPeriodInMS>
<!--
<CommitLogSyncBatchWindowInMS>1</CommitLogSyncBatchWindowInMS> -->

 


常用編程語言使用 Cassandra 來存儲(chǔ)數(shù)據(jù)

在使用 Cassandra 時(shí),通常情況下都需要使用第三方插件 Thrift 來生成與 Cassandra 相關(guān)的庫文件 , 您可以在 http://incubator.apache.org/thrift 下載此插件,并學(xué)習(xí)它的使用方法。以下是分別在 Java、PHP、Python、C#、Ruby 五種常用編程語言中使用 Cassandra:

Java 程序使用 Cassandra

把 libthrift-r917130.jar,apache-cassandra-0.6.0-rc1.jar 加入到 Eclipse 的編譯路徑中。

建立數(shù)據(jù)庫連接:使用 libthrift-r917130.jar 的 TTransport 的 open 方法建立起與 Cassandra 服務(wù)端 (IP:192.168.10.2 端口:9160) 的連接。

數(shù)據(jù)庫操作:使用 Cassandra.Client 創(chuàng)建一個(gè)客戶端實(shí)例。調(diào)用 Client 實(shí)例的 insert 方法寫入數(shù)據(jù),通過 get 方法獲取數(shù)據(jù)。

關(guān)閉數(shù)據(jù)庫連接:使用 TTransport 的 close 方法斷開與 Cassandra 服務(wù)端的連接。


清單 2. Java 連接 Cassandra,寫入并讀取數(shù)據(jù)。

				
package com.test.cassandra;|
import java.io.UnsupportedEncodingException;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.TException;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
/*
* 使 Java 客戶端連接 Cassandra 并進(jìn)行讀寫操作
* @author jimmy
* @date 2010-04-10
*/
public class JCassandraClient{
public static void main(String[] args) throws InvalidRequestException,
NotFoundException, UnavailableException, TimedOutException,
TException, UnsupportedEncodingException {

// 建立數(shù)據(jù)庫連接
TTransport tr = new TSocket("192.168.10.2", 9160);
TProtocol proto = new TBinaryProtocol(tr);
Cassandra.Client client = new Cassandra.Client(proto);
tr.open();
String keyspace = "Keyspace1";
String cf = "Standard2";
String key = "studentA";
// 插入數(shù)據(jù)
long timestamp = System.currentTimeMillis();
ColumnPath path = new ColumnPath(cf);
path.setColumn("age".getBytes("UTF-8"));
client.insert(keyspace,key,path,"18".getBytes("UTF-8"),
timestamp,ConsistencyLevel.ONE);
path.setColumn("height".getBytes("UTF-8"));
client.insert(keyspace,key,path,"172cm".getBytes("UTF-8"),
timestamp,ConsistencyLevel.ONE);
// 讀取數(shù)據(jù)
path.setColumn("height".getBytes("UTF-8"));
ColumnOrSuperColumn cc = client.get(keyspace, key, path, ConsistencyLevel.ONE);
Column c = cc.getColumn();
String v = new String(c.value, "UTF-8");
// 關(guān)閉數(shù)據(jù)庫連接
tr.close();
}
}

 

PHP 程序使用 Cassandra

在 PHP 代碼中使用 Cassandra,需要借助 Thrift 來生成需要的 PHP 文件,通過使用 thrift --gen php interface/cassandra.thrift 生成所需要的 PHP 文件,生成的 PHP 文件中提供了與 Cassandra 建立連接、讀寫數(shù)據(jù)時(shí)所需要的函數(shù)。


清單 3. PHP 連接 Cassandra,寫入并讀取數(shù)據(jù)。

				
<?php
$GLOBALS['THRIFT_ROOT'] = '/usr/share/php/Thrift';
require_once
$GLOBALS['THRIFT_ROOT'].'/packages/cassandra/Cassandra.php';
require_once
$GLOBALS['THRIFT_ROOT'].'/packages/cassandra/cassandra_types.php';
require_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
require_once $GLOBALS['THRIFT_ROOT'].'/protocol/TBinaryProtocol.php';
require_once
$GLOBALS['THRIFT_ROOT'].'/transport/TFramedTransport.php';
require_once
$GLOBALS['THRIFT_ROOT'].'/transport/TBufferedTransport.php';
try {
// 建立 Cassandra 連接
$socket = new TSocket('192.168.10.2', 9160);
$transport = new TBufferedTransport($socket, 1024, 1024);
$protocol = new TBinaryProtocolAccelerated($transport);
$client = new CassandraClient($protocol);
$transport->open();
$keyspace = 'Keyspace1';
$keyUser = "studentA";
$columnPath = new cassandra_ColumnPath();
$columnPath->column_family = 'Standard1';
$columnPath->super_column = null;
$columnPath->column = 'age';
$consistency_level = cassandra_ConsistencyLevel::ZERO;
$timestamp = time();
$value = "18";
// 寫入數(shù)據(jù)
$client->insert($keyspace, $keyUser, $columnPath, $value,
$timestamp, $consistency_level);
$columnParent = new cassandra_ColumnParent();
$columnParent->column_family = "Standard1";
$columnParent->super_column = NULL;
$sliceRange = new cassandra_SliceRange();
$sliceRange->start = "";
$sliceRange->finish = "";
$predicate = new cassandra_SlicePredicate();
list() = $predicate->column_names;
$predicate->slice_range = $sliceRange;
$consistency_level = cassandra_ConsistencyLevel::ONE;
$keyUser = studentA;
// 查詢數(shù)據(jù)
$result = $client->get_slice($keyspace, $keyUser, $columnParent,
$predicate, $consistency_level);
// 關(guān)閉連接
$transport->close();
} catch (TException $tx) {
}?>

 

Python 程序使用 Cassandra

在 Python 中使用 Cassandra 需要 Thrift 來生成第三方 Python 庫,生成方式: thrift --gen py interface/cassandra.thrift, 然后在 Python 代碼中引入所需的 Python 庫,生成的 Python 庫提供了與 Cassandra 建立連接、讀寫數(shù)據(jù)時(shí)所需要的方法。


清單 4. Python 連接 Cassandra,寫入并讀取數(shù)據(jù)。

				
from thrift import Thrift
from thrift.transport import TTransport
from thrift.transport import TSocket
from thrift.protocol.TBinaryProtocol import
TBinaryProtocolAccelerated
from cassandra import Cassandra
from cassandra.ttypes import *
import time
import pprint
def main():
socket = TSocket.TSocket("192.168.10.2", 9160)
transport = TTransport.TBufferedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)
client = Cassandra.Client(protocol)
pp = pprint.PrettyPrinter(indent=2)
keyspace = "Keyspace1"
column_path = ColumnPath(column_family="Standard1", column="age")
key = "studentA"
value = "18 "
timestamp = time.time()
try:
# 打開數(shù)據(jù)庫連接
transport.open()
# 寫入數(shù)據(jù)
client.insert(keyspace,key,column_path,
value,timestamp,ConsistencyLevel.ZERO)
# 查詢數(shù)據(jù)
column_parent = ColumnParent(column_family="Standard1")
slice_range = SliceRange(start="", finish="")
predicate = SlicePredicate(slice_range=slice_range)
result = client.get_slice(keyspace,key,column_parent,
predicate,ConsistencyLevel.ONE)
pp.pprint(result)
except Thrift.TException, tx:
print 'Thrift: %s' % tx.message
finally:
# 關(guān)閉連接
transport.close()
if __name__ == '__main__':
main()

 

C# 使用 Cassandra

在 C# 中使用 Cassandra 需要 Thrift.exe 來生成動(dòng)態(tài)鏈接庫,使用 ./thrift.exe --gen csharp interface/cassandra.thrift 生成所需要的 DLL 文件,生成的 DLL 提供了與 Cassandra 建立連接,讀寫數(shù)據(jù)等所需的類和方法,在編程環(huán)境中引入生成的 DLL,即可使用。


清單 5. C# 連接 Cassandra,寫入并讀取數(shù)據(jù)。

				
namespace CshareCassandra{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using Apache.Cassandra;
using Thrift.Protocol;
using Thrift.Transport;
class CassandraClient{
static void Main(string[] args){
// 建立數(shù)據(jù)庫連接
TTransport transport = new TSocket("192.168.10.2", 9160);
TProtocol protocol = new TBinaryProtocol(transport);
Cassandra.Client client = new Cassandra.Client(protocol);
transport.Open();
System.Text.Encoding utf8Encoding = System.Text.Encoding.UTF8;
long timeStamp = DateTime.Now.Millisecond;
ColumnPath nameColumnPath = new ColumnPath(){
Column_family = "Standard1",
Column = utf8Encoding.GetBytes("age")};
// 寫入數(shù)據(jù)
client.insert("Keyspace1","studentA",nameColumnPath,
utf8Encoding.GetBytes("18"),timeStamp, ConsistencyLevel.ONE);
// 讀取數(shù)據(jù)
ColumnOrSuperColumn returnedColumn = client.get("Keyspace1",
"studentA", nameColumnPath, ConsistencyLevel.ONE);
Console.WriteLine("Keyspace1/Standard1: age: {0}, value: {1}",
utf8Encoding.GetString(returnedColumn.Column.Name),
utf8Encoding.GetString(returnedColumn.Column.Value));
// 關(guān)閉連接
transport.Close();
}
}}

 

Ruby 使用 Cassandra

在 Ruby 中使用 Cassandra 需要先安裝 gem,安裝命令:gem install cassandra

安裝完成后,打開 Ruby 的 irb 開始使用 Cassandra。


清單 6. Ruby 連接 Cassandra,寫入并讀取數(shù)據(jù)

				
> require 'rubygems'
> require 'cassandra'
# 建立數(shù)據(jù)庫連接
> cdb = Cassandra.new('Keyspace1',"192.168.10.1:9160", :retries => 3)
# 寫入數(shù)據(jù)
> cdb.insert(:Standard1, 'studentA', {'age' => '18'})
# 讀取數(shù)據(jù)
> cdb.get(:Standard1, :studentA)
# 關(guān)閉連接
> cdb.disconnect

 


搭建 Cassandra 集群環(huán)境

Cassandra 的集群是沒有中心節(jié)點(diǎn)的,各個(gè)節(jié)點(diǎn)的地位完全相同,節(jié)點(diǎn)之間是通過 gossip 的協(xié)議來維護(hù)集群的狀態(tài)。


以下是兩臺(tái)安裝了 Linux 系統(tǒng)的服務(wù)器,且初步設(shè)置了 Cassandra 環(huán)境和啟用了端口 7000,9160:

服務(wù)器名 端口 IP 地址
ServiceA 7000,9160 192.168.10.3
ServiceB 7000,9160 192.168.10.2

 

配制服務(wù)器 ServiceA、ServiceB 的 storage-conf.xml 文件


ServiceA 的配置

				
<Seeds>
<Seed>192.168.10.3</Seed>
</Seeds>
<ListenAddress>192.168.10.2</ListenAddress>
<ThriftAddress>0.0.0.0</ThriftAddress>



ServiceB 的配置

				
<Seeds>
<Seed>192.168.10.3</Seed>
<Seed>192.168.10.2</Seed>
</Seeds>
<ListenAddress>192.168.10.2</ListenAddress>
<ThriftAddress>0.0.0.0</ThriftAddress>

 

配制完成后,分別啟動(dòng) ServiceA 和 ServiceB 上的 Cassandra 服務(wù)。

查看 ServiceA 和 ServiceB 是否集群成功,可使用 Cassandra 自帶的客戶端命令

 bin/nodetool --host 192.168.10.2 ring 



集群成功則會(huì)返回以下類似信息:

				
Address Status Load Range Ring
106218876142754404016344802054916108445
192.168.10.2 Up 2.55 KB 31730917190839729088079827277059909532 |<--|
192.168.10.3 Up 3.26 KB 106218876142754404016344802054916108445 |-->|

 

使用 Cassandra 命令行工具進(jìn)行集群測試

從 ServiceB 連接到 ServiceA,可使用命令:

 cassandra-cli -host 192.168.10.3 -port 9160 



集群測試一

				
寫入集群數(shù)據(jù)
ServiceA 連接到 ServiceA:

# set Keyspace1.Standard2['studentAA']['A2A'] = 'a2a'

ServiceB 連接到 ServiceA:

# set Keyspace1.Standard2['studentBA']['B2A'] = 'b2a'

ServiceA 連接到 ServiceB:
# set Keyspace1.Standard2['studentAB']['A2B'] = 'a2b'

 

獲取集群數(shù)據(jù):

 ServiceA 連接到 ServiceA : 
# get Keyspace1.Standard2['studentAA'],
get Keyspace1.Standard2['studentBA'],
get Keyspace1.Standard2['studentAB']

ServiceB 連接到 ServiceA :
# get Keyspace1.Standard2['studentAA'],
get Keyspace1.Standard2['studentBA'],
get Keyspace1.Standard2['studentAB']

ServiceA 連接到 ServiceB :
# get Keyspace1.Standard2['studentAA'],
get Keyspace1.Standard2['studentBA'],
get Keyspace1.Standard2['studentAB']

 

清單 8. 集群測試清單二

ServiceA 停止 Cassandra 服務(wù),ServiceA 連接到 ServiceB 并寫入數(shù)據(jù)

 # set Keyspace1.Standard2['studentAR']['A2R'] = 'a2R'

 

啟動(dòng) ServiceA,并鏈接到 ServiceA 本身,讀取剛才在 ServiceB 寫入的數(shù)據(jù)

 # bin/cassandra-cli -host 192.168.10.3 -port 9160 
# get Keyspace1.Standard2['studentAR']

 


總結(jié)

以上我們介紹了 Cassandra 的數(shù)據(jù)模型、節(jié)點(diǎn)安裝和配置、常用編程語言中使用 Cassandra 以及 Cassandra 的集群和測試。Cassandra 是一個(gè)高性能的 P2P 去中心化的非關(guān)系型數(shù)據(jù)庫,可以分布式進(jìn)行讀寫操作。在系統(tǒng)運(yùn)行時(shí)可以隨意的添加或刪降字段,是 SNS 應(yīng)用的理想數(shù)據(jù)庫。