MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念Map(映射)和Reduce(归约)是它们的主要思想,是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。
一、概念
MapReduce是一个并行程序设计模型与方法(Programming Model & Methodology),它借助于函数式程序设计语言Lisp的设计思想,提供了一种简便的并行程序设计方法,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理。
同样的,MongoDB支持使用javascript编写mapreduce函数来做分布式数据处理,将大问题通过分批次处理最终转换为一个个小问题,并将每个小文的计算结果合并为最终的结果。伴随而来的是对机器资源的消耗,整个过程非常缓慢,应避免在实时的数据分析中使用。
二、使用
语法
- ①使用内置函数
mapReduce
1
2
3
4
5
6
7
8
9
10
11
12
13
14db.collection.mapReduce(
//map函数
function() {emit(key, value);},
//reduce函数
function(key,values) {return reduceFunction},
{
out: collection,
query: document,
sort: document,
limit: number
}
)- ②使用内置函数
runCommand
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23var m = function() {
emit(this.age, this.name);
}
var r = function(key, values) {
var ret = {age:key, names:values};
return ret;
}
var f = function(key,rval){
if(key == 0){
rval.msg = "a new life,baby!";
}
return rval
}
db.runCommand({
mapreduce: collection,
map: m,
reduce: r,
finalize: f,
query: document,
sort: document,
limit: number,
out: collection
})- ①使用内置函数
使用
准备数据源,推荐
- 下载
ml-latest-small.zip
unzip ml-latest-small.zip
cd ml-latest-small
,笔者文件路径/Users/your_name/phpweb/mongo
- 编写php代码(需要安装mongodb扩展)
Write batch sizes must be between 1 and 100000. Got 100837 operations.
,批量写入一次最多写100000条数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51$moviefile = "/Users/your_name/phpweb/mongo/ml-latest-small/movies.csv";
$moviedata = getMovies($moviefile);
insertMongo('study', 'movies', $moviedata);
function getMovies($file) {
if (!is_file($file)) {
exit('none file');
}
$handle = fopen($file, 'r');
if (!$handle) {
exit('read file fail');
}
$res = [];
while (($data = fgetcsv($handle)) !== false) {
$tmp = [];
$tmp['movieId'] = $data[0];
$tmp['title'] = $data[1];
$tmp['genres'] = $data[2];
$tmp['tags'] = explode('|', $data[2]);
$res[] = $tmp;
}
fclose($handle);
return $res;
}
function insertMongo($db, $coll, $data){
try {
$manager = new MongoDB\Driver\Manager('mongodb://localhost:27017');
$total = count($data);
if ($total >= 10000) {
$page = ceil($total/10000);
for($i=1; $i<=$page; $i++) {
$offset = ($i - 1) * 10000;
$tmp = array_slice($data, $offset, 10000);
$command = new MongoDB\Driver\Command([
'insert' => $coll,
'documents' => $tmp
]);
$cursor = $manager->executeCommand($db, $command);
}
} else {
$command = new MongoDB\Driver\Command([
'insert' => $coll,
'documents' => $data
]);
$cursor = $manager->executeCommand($db, $command);
}
} catch(Exception $e) {
print_r($e->getMessage());
}
}- 下载
连接mongo查询数据
db.movies.find().limit(5).pretty()
,主要看一下存储的数据格式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54{
"_id" : ObjectId("60c3105b5200f12dedb6843d"),
"movieId" : "movieId",
"title" : "title",
"genres" : "genres",
"tags" : [
"genres"
]
}
{
"_id" : ObjectId("60c3105b5200f12dedb6843e"),
"movieId" : "1",
"title" : "Toy Story (1995)",
"genres" : "Adventure|Animation|Children|Comedy|Fantasy",
"tags" : [
"Adventure",
"Animation",
"Children",
"Comedy",
"Fantasy"
]
}
{
"_id" : ObjectId("60c3105b5200f12dedb6843f"),
"movieId" : "2",
"title" : "Jumanji (1995)",
"genres" : "Adventure|Children|Fantasy",
"tags" : [
"Adventure",
"Children",
"Fantasy"
]
}
{
"_id" : ObjectId("60c3105b5200f12dedb68440"),
"movieId" : "3",
"title" : "Grumpier Old Men (1995)",
"genres" : "Comedy|Romance",
"tags" : [
"Comedy",
"Romance"
]
}
{
"_id" : ObjectId("60c3105b5200f12dedb68441"),
"movieId" : "4",
"title" : "Waiting to Exhale (1995)",
"genres" : "Comedy|Drama|Romance",
"tags" : [
"Comedy",
"Drama",
"Romance"
]
}- 查询tags有交集的记录
- 使用聚合查询aggregate
- 使用MapReduce