聚合管道

聚合管道是一个基于数据处理管道概念建模的数据聚合框架。文档进入多阶段管道,该管道将文档转换成汇总结果,其语法 db.collction.aggregate(<pipeline>, <options>)

  • pipeline: 类型array,定义文档使用的聚合管道和操作符
  • options: 传递给聚合命令的其他选项。

管道表达式

管道表达式只可以操作当前管道中的文档,不能访问其他的文档.表达式操作可以在内存中完成对文档的转换

常用表达式

表达式说明
$<field>声明字段路径,内嵌文档的路径$<field>.$<inline.field>
$$<var>$$来声明系统变量
$literal: <value>声明常量.当value = "$field".并不会当作字段路径解析.而是当作字符串"\$field"处理
$arrayElemAt返回指定数组索引处的元素。

聚合阶段

方法

名称说明
$collStats返回有关集合或视图的统计信息.
$project修改管道中的文档,对于每一个输入文档,相应输出一个文档.
$match对管道中的文档进行过滤,仅允许符合条件的文档进入下一个阶段.过滤操作不回修改文档.语法与查询文档语法完全相同
$limit限制管道中的文档数量
$skip跳过管道中前N篇文档
$group对管道中文档进行分组
$sample从输入中随机选择指定数量的文档
$sort对文档进行排序
$lookup对同一数据库中的另一个集合执行左外部联接,以从“联接”集合中筛选文档进行处理。
$out将管道内的文档输出到新的集合中,集合中原来的文档是不会被保存的.如果出现了错误并不会覆盖原集合
$unwind从输入文档中解构数组字段以输出每个元素的文档。每个输出文档用一个元素值替换数组。对于每个输入文档,输出n个文档,其中n是数组元素数,对于空数组,可以为零。
$count返回聚合阶段文档数量

示例

$project
# 修改管道中的文档.输出 隐藏_id, 显示name,生成新字段city值为location第2个值

> db.qvbilam.aggregate([{ $project: { _id: false, name: true, city: {$arrayElemAt: ["$location",0]} } } ])

# 输出,当location有字段为字符串时会报错
{ "name" : "angelQvbilam", "city" : null }
{ "name" : "PiaoYao", "city" : "HeBei" }
{ "name" : "ZhaoX", "city" : "ZheJiang" }
$match
# 匹配管道中location第一个值为"HeBei"的值

> db.qvbilam.aggregate({ $match: { "location.0": "HeBei"  }  })

# 返回

{ "_id" : ObjectId("602f7e8dc4f596e5b431db1d"), "name" : "PiaoYao", "num" : 21, "location" : [ "HeBei", "ShiJiaZhuang", "ShenZe" ] }
$unwind
语法
{
  $unwind:
    {
      path: <field path>,
      includeArrayIndex: <string>,
      preserveNullAndEmptyArrays: <boolean>
    }
}
说明
字段类型说明
pathstring数组字段的字段路径。要指定字段路径,请在字段名前面加上美元符号$,并用引号括起来
includeArrayIndexstring可选。用于保存元素数组索引的新字段的名称。名称不能以美元符号$开头。
preserveNullAndEmptyArraysbool可选。如果为true,则如果路径为null、缺少或空数组,$unwind将输出文档。如果为false,则如果路径为null、丢失或空数组,$unwind不会输出文档。

当字段为null,[]或者不存在都不回展开文档.即被排除.如果想显示可以添加preserverNullAndEmptyArrays: true显示.如下所示

db.qvbilam.aggregate({ $unwind: { path: "$field", preserverNullAndEmptyArrays: true } })
示例
# 获取location不为空的文档

> db.qvbilam.find({ location: {$ne: null} }, { _id:false, num: false })

# 返回

{ "name" : "PiaoYao", "location" : [ "HeBei", "ShiJiaZhuang", "ShenZe" ] }
{ "name" : "ZhaoX", "location" : [ "ZheJiang", "HangZhou", "XiaoShan" ] }
{ "name" : "ZhaoC", "location" : "ChongQing" }


# 展开数组

> db.qvbilam.aggregate({ $match: { location: {$ne: null} } }, {$unwind: { path: "$location" }}, {$project: { _id: false, num: false }})

# 返回
{ "name" : "PiaoYao", "location" : "HeBei" }
{ "name" : "PiaoYao", "location" : "ShiJiaZhuang" }
{ "name" : "PiaoYao", "location" : "ShenZe" }
{ "name" : "ZhaoX", "location" : "ZheJiang" }
{ "name" : "ZhaoX", "location" : "HangZhou" }
{ "name" : "ZhaoX", "location" : "XiaoShan" }
{ "name" : "ZhaoC", "location" : "ChongQing" }
$lookup
语法
{
   $lookup:
     {
       from: <collection to join>,
       localField: <field from the input documents>,
       foreignField: <field from the documents of the "from" collection>,
       as: <output array field>
     }
}
说明
字段说明
from指定同一数据库中要执行联接的集合
localField管道中的字段,如果输入文档不包含localField,$lookup会将该字段视为具有null值以进行匹配。
foreignField另一个集合的字段,$lookup对输入文档中的foreignField和localField执行相等匹配。如果from集合中的文档不包含foreignField,$lookup会将该值视为null以进行匹配。
as指定要添加到输入文档的新数组字段的名称。新数组字段包含来自集合的匹配文档。如果输入文档中已存在指定的名称,则覆盖现有字段
示例
# 为新的集合插入文档
db.location.insertMany([
    {
    name: "HeBei",
    longitude: 114.468664,
    Latitude: 38.037057
    },
    {
    name: "ShiJiaZhuang",
    longitude: 114.514859,
    Latitude: 38.042306
    },
    {
    name: "ShenZe",
    longitude: 115.20094,
    Latitude: 38.18417
    },
    {
    name: "ZheJiang",
    longitude: 120.152791,
    Latitude: 30.267446
    },
    {
    name: "HangZhou",
    longitude: 120.155070,
    Latitude: 30.274084
    },
    {
    name: "XiaoShan",
    longitude: 120.49328,
    Latitude: 30.283330
    },
    {
    name: "ChongQing",
    longitude: 106.551556,
    Latitude: 29.563009
    }
])

# 查询

db.qvbilam.aggregate([{ $match: { location: {$ne: null} } },{
  $lookup: {
    from: "location",
    localField: "location",
    foreignField: "name",
    as: "longitudeAndLatitude"
  }
},{ $project: { _id: false, num: false} }])

# 返回

{ "_id" : ObjectId("602f7e8dc4f596e5b431db1d"), "name" : "PiaoYao", "num" : 21, "location" : [ "HeBei", "ShiJiaZhuang", "ShenZe" ], "longitudeAndLatitude" : [ { "_id" : ObjectId("6034a13b0b4db02284f61ce9"), "name" : "HeBei", "longitude" : 114.468664, "Latitude" : 38.037057 }, { "_id" : ObjectId("6034a13b0b4db02284f61cea"), "name" : "ShiJiaZhuang", "longitude" : 114.514859, "Latitude" : 38.042306 }, { "_id" : ObjectId("6034a13b0b4db02284f61ceb"), "name" : "ShenZe", "longitude" : 115.20094, "Latitude" : 38.18417 } ] }
{ "_id" : ObjectId("602f7eccc4f596e5b431db1e"), "name" : "ZhaoX", "num" : 50, "location" : [ "ZheJiang", "HangZhou", "XiaoShan" ], "longitudeAndLatitude" : [ { "_id" : ObjectId("6034a13b0b4db02284f61cec"), "name" : "ZheJiang", "longitude" : 120.152791, "Latitude" : 30.267446 }, { "_id" : ObjectId("6034a13b0b4db02284f61ced"), "name" : "HangZhou", "longitude" : 120.15507, "Latitude" : 30.274084 }, { "_id" : ObjectId("6034a13b0b4db02284f61cee"), "name" : "XiaoShan", "longitude" : 120.49328, "Latitude" : 30.28333 } ] }
{ "_id" : ObjectId("6034969a0b4db02284f61ce8"), "name" : "ZhaoC", "num" : 51, "location" : "ChongQing", "longitudeAndLatitude" : [ { "_id" : ObjectId("6034a13b0b4db02284f61cef"), "name" : "ChongQing", "longitude" : 106.551556, "Latitude" : 29.563009 } ] }

在返回的三篇文档中的longitudeAndLatitude中的数组包含了三个值.分别对应的是省,城市,县的坐标.当想显示坐标与地一一对应时.可以使用$unwind将数组展开.如下

# 查询 隐藏管道内部分字段与链表部分字段

db.qvbilam.aggregate([
  { $match: { location: {$ne: null} } },
  { $unwind: { path: "$location" } },{
  $lookup: {
    from: "location",
    localField: "location",
    foreignField: "name",
    as: "longitudeAndLatitude"
  }
},{ $project: { _id: false, num: false, "longitudeAndLatitude._id": false, "longitudeAndLatitude.name": false} }])

# 返回

{ "name" : "PiaoYao", "location" : "HeBei", "longitudeAndLatitude" : [ { "longitude" : 114.468664, "Latitude" : 38.037057 } ] }
{ "name" : "PiaoYao", "location" : "ShiJiaZhuang", "longitudeAndLatitude" : [ { "longitude" : 114.514859, "Latitude" : 38.042306 } ] }
{ "name" : "PiaoYao", "location" : "ShenZe", "longitudeAndLatitude" : [ { "longitude" : 115.20094, "Latitude" : 38.18417 } ] }
{ "name" : "ZhaoX", "location" : "ZheJiang", "longitudeAndLatitude" : [ { "longitude" : 120.152791, "Latitude" : 30.267446 } ] }
{ "name" : "ZhaoX", "location" : "HangZhou", "longitudeAndLatitude" : [ { "longitude" : 120.15507, "Latitude" : 30.274084 } ] }
{ "name" : "ZhaoX", "location" : "XiaoShan", "longitudeAndLatitude" : [ { "longitude" : 120.49328, "Latitude" : 30.28333 } ] }
{ "name" : "ZhaoC", "location" : "ChongQing", "longitudeAndLatitude" : [ { "longitude" : 106.551556, "Latitude" : 29.563009 } ] }
$group
语法
{ 
  $group: { 
    _id: <expression>, 
    <field1>: { <accumulator1> : <expression1> }, 
    ... 
  }
}
  • _id字段是必需的.可以将_id值指定为null,以计算整个输入文档.
  • 其余的计算字段是可选的,使用<accumulator>操作符进行计算.
  • _id<accumulator>表达式可以接受任何有效的表达式.
操作符
字段说明
$num返回数值之和。忽略非数值。
$avg返回数值的平均值。忽略非数值。
$first从每个组的第一个文档返回一个值。仅当文档处于已定义的顺序时,才定义顺序。
$last从每个组的最后一个文档返回一个值。仅当文档处于已定义的顺序时,才定义顺序。
$max返回每个组的最高表达式值。
$min返回每个组的最低表达式值。
$push返回每个组的表达式值数组。
$addToSet返回每个组的唯一表达式值数组。数组元素的顺序未定义。
$stdDevPop返回输入值的总体标准偏差。
$stdDevSamp返回输入值的样本标准偏差。
示例
插入订单集合.包含用户名称,单价,购买数量以及商品id
db.order_info.insertMany([
{
     "name": "qvbilam",
  "price": 500,
  "num": 10,
  "goods_id": 1
},
{
     "name": "qvbilam",
  "price": 200,
  "num": 2,
  "goods_id": 2
},
{
     "name": "GuoOne",
  "price": 40,
  "num": 3,
  "goods_id": 2
},
{
     "name": "GuoOne",
  "price": 45,
  "num": 8,
  "goods_id": 1
},
])
通过用户姓名进行分组
# 通过$name字段作为主键进行分组

db.order_info.aggregate({ $group: {_id: "$name"} })

# 返回

{ "_id" : "GuoOne" }
{ "_id" : "qvbilam" }
对分组后的文档进行聚合运算
  • _id: 按照$name字段进行分组
  • totalNum: 计算$num数量
  • totalMoney: 计算总价钱(数量 * 单价的总和)
  • avgNum: 计算数量的平均值
  • countOrder: 统计订单数量
  • maxMoney: 计算每笔订单最大的总价
# 分组后聚合运算

db.order_info.aggregate({ 
  $group: {
    _id: "$name",
    totalNum: {$sum: "$num"},
    totalMoney: {$sum : { $multiply: ["$num","$price"] } },
    avgNum: { $avg: "$num" },
    countOrder: {$sum: 1},
    maxMoney: { $max: { $multiply: ["$num","$price"] } },
  }
})

# 返回

{ "_id" : "GuoOne", "totalNum" : 11, "totalMoney" : 480, "avgNum" : 5.5, "countOrder" : 2, "maxMoney" : 360 }
{ "_id" : "qvbilam", "totalNum" : 12, "totalMoney" : 5400, "avgNum" : 6, "countOrder" : 2, "maxMoney" : 5000 }
分组后生成新的数组字段
# 分组后设置新的字段

db.order_info.aggregate({ $group: { _id: "$name", numArr: { $push: { num: "$num", price: "$price" } } } })

# 返回
{ "_id" : "GuoOne", "numArr" : [ { "num" : 3, "price" : 40 }, { "num" : 8, "price" : 45 } ] }
{ "_id" : "qvbilam", "numArr" : [ { "num" : 10, "price" : 500 }, { "num" : 2, "price" : 200 } ] } 
不进行分组进行计算
# 对所有文档的num值进行求和

db.order_info.aggregate({ $group: {_id: null, num: {$sum : "$num"}} })

# 返回

{ "_id" : null, "num" : 23 } 

聚合选项

说明

{
  aggregate: "<collection>",
  pipeline: [ <stage>, <...> ],
  explain: <boolean>,
  allowDiskUse: <boolean>,
  cursor: <document>,
  bypassDocumentValidation: <boolean>,
  readConcern: <document>,
  collation: <document>
}

当聚合占用内存超过100mb时,会出现异常.可以通过设置allowDiskUse参数[/scode ]

参数类型说明
pipelinearray作为聚合管道的一部分处理和转换文档流的聚合管道阶段数组
explainboolean指定返回有关管道处理的信息
allowDiskUseboolean允许写入临时文件,当设置为true时,写入临时文件dbpath/_tmp(dbpath默认值为:"/data/db")
cursordocumentmongodb3.4不赞成使用不带游标选项的聚合命令,除非管道包含explain选项。使用aggregate命令内联返回聚合结果时,请使用默认批大小游标{}指定游标选项,或在游标选项游标{batchSize:<num>}中指定批大小。
bypassDocumentValidationboolean仅当指定$out聚合运算符时可用,允许聚合在操作期间绕过文档验证.允许插入不符合验证要求的文档
readConcerndocument可选。指定读取关注点。该选项具有以下语法:<br/>readConcern:{level:<value>}<br/>value:<br/>"local:这是默认的读取关注级别。<br/>"majority":可用于使用WiredTiger存储引擎的副本集。<br/>"linearizable":仅可用于主服务器上的读取操作。

管道优化

Last modification:February 24th, 2021 at 04:43 pm