Home Database MongoDB – สรุปแนะนำการใช้งาน Aggregation Framework เบื้องต้น

MongoDB – สรุปแนะนำการใช้งาน Aggregation Framework เบื้องต้น

by khomkrit

Aggregation framework ช่วยให้เราเขียน query เพื่อ transform document ให้อยู่ในรูปแบบที่เราต้องการได้ หรือแม้แต่สร้างข้อมูลใหม่ๆ ที่ไม่ได้มีอยู่แล้วใน document ไหนๆ เลยขึ้นมาได้อย่างง่ายดาย ยกตัวอย่างเช่น เราสามารถใช้มันพิจารณายอดขายในแต่ละเดือน ยอดขายของสินค้าแต่ละชนิด หรือยอดสั่งซื้อรวมในช่วงเวลาที่กำหนดได้ ถ้าเทียบกับฐานข้อมูลประเภท Relational Database แล้วก็จะคล้ายๆ กับ GROUP BY ในภาษา SQL นั่นเอง

ถึงแม้ว่าเราสามารถเขียนโค้ดจัดการข้อมูลที่ได้จากการ query ออกมาเพื่อออกรายงานสรุป หรือ ค้นหาค่าบางอย่าง หรือ transform document ให้อยู่ในรูปแบบที่ต้องการได้ แต่การใช้ aggregation framework นั้นทำให้เราทำได้ง่าย และมีประสิทธิภาพดีกว่ากันมาก ซึ่งเราสามารถนิยามขั้นตอนต่างๆ จากนั้นส่งไปรันได้ด้วยคำสั่งเดียวก็ได้ผลลัพธ์ออกมาแล้ว

Introduction

Aggregation Framework เป็นเครื่องมือช่วยให้เราประมวลผลข้อมูลโดยใช้แนวคิด data processing pipeline ซึ่งนั่นคือสายการทำงาน (pipeline) ที่ต่อกันด้วย stage โดยข้อมูลจะถูกส่งเข้าไปใน stage แรกสุดจากนั้นประมวลผลแล้วส่งไปยัง stage ถัดไปเรื่อยๆ จนถึง stage สุดท้ายจึงจบการทำงานของ pipeline ซึ่งนั่นแปลว่าสิ่งที่เราต้องสร้างและออกแบบก็คือ pipeline นั่นเอง

เวลาเราจะใช้ aggregation framework เราจะใช้มันผ่านฟังก์ชั่น aggregate() ที่รับ array ของ stage เข้าไป ยกตัวอย่างเช่น

db.orders.aggregate([
   { $match: { status: "A" } }, // Stage 1
   { $group: { _id: "$cust_id", total: { $sum: "$amount" } } } // Stage 2
])

ประกอบด้วย 2 stage (บรรทัดที่ 1 และ บรรทัดที่ 2)

  • stage ที่ 1 ใช้คำสั่ง $match เพื่อหา document ที่ field status มีค่า A จากนั้นส่ง document ที่หาเจอไปยัง stage ถัดไป
  • stage ที่ 2 ใช้คำสั่ง $group เพื่อจัดกลุ่ม document ที่มี field $cust_id เหมือนกันเข้าด้วยกัน จากนั้นใช้ฟังก์ชั่น $sum เพื่อรวมค่าที่เก็บไว้ใน field $amount ของแต่ละ document ในกลุ่มเดียวกัน เก็บไว้ใน field ใหม่ที่ชื่อ total

Operators

aggregation operators แบ่งเป็น 2 กลุ่มใหญ่ ได้แก่

Pipeline Stage

Pipeline Stage คือ operator สำหรับบอกว่า stage ไหนให้ทำอะไร เช่น บอกให้ค้นหา document ที่ match กับค่าที่ต้องการ, บอกให้จัดกลุ่มตาม field ที่กำหนด, บอกให้เรียงลำดับจากน้อยไปหามาก, บอกให้ดึงมาเฉพาะ field ที่ต้องการ เป็นต้น

Pipeline Operators

Pipeline Operators คือ operator ที่ใช้กับ field ต่างๆ ของ document ที่เข้ามาในแต่ละ stage โดยมากใช้สำหรับการ reshape document ให้อยู่ในรูปแบบที่ต้องการ เช่น การรวมค่าของ 2 field เข้าด้วยกัน เป็นต้น ให้มองว่า pipeline operators เป็นฟังก์ชั่นที่รับ arguments เข้ามาแล้ว return ค่ากลับ ซึ่งรูปแบบทั่วไปจะใช้งานแบบนี้

{ <operator>: [ <argument1>, <argument2> ... ] }

Operators แบ่งเป็น 2 กลุ่ม ได้แก่

  1. ใช้กำหนดหน้าที่ของ Stage
  2. ใช้เป็น Operators ในแต่ละ stage

เวลาออกแบบ pipeline เราจะเริ่มจากคิดว่า stage ไหนต้องทำอะไรบ้าง และแต่ละ stage จะต้อง transform document อย่างไรบ้าง เพื่อส่งต่อไปอีก stage

Pipeline Stages

pipeline ประกอบด้วย stage หลายๆ stage มาอยู่รวมกัน ทำงานต่อกันตั้งแต่ stage แรกไปจนถึง stage สุดท้าย เมื่อเราสร้างแต่ละ stage ขึ้นมา เราจำเป็นต้องระบุว่าแต่ละ stage ต้องทำอะไรโดยใช้ Pipeline Operators

ปัจจุบัน (MongoDB 4.0) มี pipeline operators ให้ใช้งานมากมาย ในบทความนี้จะยกตัวอย่าง ให้ดูบาง operators ที่มักถูกใช้อยู่เป็นประจำ ดังนี้

$match

ใช้ filter document มีมีเงื่อนไขตรงกับที่เรากำหนดไว้ ก่อนส่ง document ที่ match ไปยัง stage ถัดไป ซึ่ง $match มักถูกใช้ในช่วงต้นๆ ของ pipeline เพื่อลดจำนวน document ให้เหลือน้อยที่สุดเท่าที่ทำได้ ส่งผลให้ประสิทธิภาพของ pipeline ดีขึ้น เนื่องจากเหลือข้อมูลให้ process น้อยลง

$project

ใช้สำหรับเลือก field ที่จะส่งไปยัง stage ถัดไป คล้ายกับที่คำสั่ง $match ที่เราใช้เลือก document ที่จะส่งไปยัง stage ถัดไป โดยการที่เราเลือก document หรือเลือก field ก่อนที่จะส่งไปยัง stage ถัดไปนั้นส่งผลให้ขนาดของแต่ละ document ลดลงไปด้วย ทำให้ stage ถัดไป process document เฉพาะในส่วนที่ต้องใช้งานจริงๆ เท่านั้น และนี่ส่งผลดีต่อ performance ของระบบ

$group

เรามักเห็น operator $group อยู่เสมอ เมื่อเรา aggregate ข้อมูล เนื่องจาก operator นี้ใช้สำหรับจัดกลุ่ม document เข้าด้วยกัน แล้วนำมาใช้ร่วมกับฟังก์ชั่นบางอย่างเพื่อหวังผลทางสถิติ เช่น

  • $min$max หาค่าของที่น้อยที่สุด หรือมากที่สุดของ field ที่ระบุ
  • $avg หาค่าเฉลี่ยของ field ที่ระบุ
  • $addToSet จับ field ที่ระบุมารวมกันเพื่อสร้างเป็น array ขึ้นมาใหม่ โดยข้อมูลใน array นี้จะไม่ซ้ำกัน
  • $first$last กรณีข้อมูลเรียงมาก่อนหน้นานี้ สามารถหา document ตัวแรกสุด หรือตัวสุดท้ายมาใช้งานได้
  • $push, จับ field ที่ระบุมารวมกันเพื่อสร้างเป็น array ใหม่โดยไม่คัดตัวที่ซ้ำกันออกไป
  • $sum, หาผลรวมของข้อมูลในกลุ่ม (ตั้งแต่ MongoDB 3.2 $sum สามารถนำมาใช้ใน $project stage ได้ด้วย)

และยังมีฟังก์ชั่นอื่นๆ อีกมากมาย อ่านเพิ่มได้ที่ Aggregation Pipeline Operators

ตัวอย่างการใช้ $group โดยการจัดกลุ่มข้อมูลจาก เดือน วัน ปี และใช้ฟังก์ชั่นต่างๆ เพื่อหาข้อมูลทางสถิติ ดังนี้

  1. คำนวนยอดขายจากการการนำราคาขาย (price) คูณกับจำนวนที่ขายได้ (quantity) โดยใช้ฟังก์ชั่น $multiply ช่วยคูณให้
  2. หาค่าเฉลี่ยของจำนวนสินค้าที่ขายได้ โดยใช้ฟังก์ชั่น $avg
  3. นับจำนวน document ที่ถูกจัดกลุ่ม โดยใช้ฟังก์ชั่น $sum
db.sales.aggregate([
  {
    $group : {
        _id : { month: { $month: "$date" }, day: { $dayOfMonth: "$date" }, year: { $year: "$date" } },
        totalPrice: { $sum: { $multiply: [ "$price", "$quantity" ] } },
        averageQuantity: { $avg: "$quantity" },
        count: { $sum: 1 }
    }
  }
])

$unwind

ใช้สำหรับคลี่ Array ออกมาแล้วจับคู่เข้ากับ document ที่เป็น owner เป็นรายตัวไป ยกตัวอย่างเช่น ถ้าเรามีข้อมูลแบบนี้

{ "_id" : 1, "item" : "ABC1", sizes: [ "S", "M", "L"] }

แล้วเราลอง $unwind แบบนี้

db.inventory.aggregate( [ { $unwind : "$sizes" } ] )

จะได้ผลลัพธ์ดังนี้

{ "_id" : 1, "item" : "ABC1", "sizes" : "S" }
{ "_id" : 1, "item" : "ABC1", "sizes" : "M" }
{ "_id" : 1, "item" : "ABC1", "sizes" : "L" }

ถ้า stage ถัดไปเรา group by size และ count เราก็จะทราบจำนวนว่าแต่ละ size นั้นมี item กี่ชิ้น ดังนี้

db.inventory.aggregate([
 { $unwind: '$sizes' },
 { $group: { _id: '$sizes', count: { $sum:1 } } }
])

$out

ใช้เมื่อเราต้องการบันทึกผลลัพธ์จาก pipeline ไปยัง collection ที่ต้องการ ถ้ายังไม่มี collection ปลายทางคำสั่งนี้จะสร้าง collection ปลายทางขึ้นมาใหม่ แต่หากมีแล้วก็จะบันทึกทับไปเลย และหากการเขียนข้อมูลลงไปใน collection ปลายทางไม่สำเร็จ ข้อมูลใน collection จะยังอยู่เหมือนเดิมไม่ถูกทับด้วยคำสั่ง $out

ตัวอย่างการใช้งารน

db.reviews.aggregate([
  { $group: { _id: '$product_id', count: { $sum:1 } } },
  { $out: 'resultCollection' }
])

นอกจาก operators ดังที่กล่าวมาข้างต้น ก็ยังมี operators ที่น่าสนใจอื่นๆ อีก เช่น

  • $limit – จำกัดจำนวน document ที่จะถูกส่งต่อไปยัง step ถัดไป
  • $skip – skip document
  • $sort – เรียง document
  • $geoNear – เลือก document ที่อยู่ใกล้กับ location ที่ระบุ
  • $out – เขียนผลลัพธ์จาก pipeline ลงใน collection

อย่างไรก็ตาม เราควรเข้าไปทำความรู้จักกับ pipeline stage operators อื่นๆ เพิ่มเติมว่า MongoDB มีอะไรมาให้เราได้ใช้บ้าง ซึ่งจะทำให้เราสามารถใช้ Aggregation Framework ได้อย่างเต็มประสิทธิภาพได้ที่นี่ Aggregation Pipeline Stages

Reshaping documents

มีฟังก์ชั่นให้เราใช้ reshape document เยอะมากจนจำไม่หมด เวลาจะทำอะไรสักอย่างอันดับแรกให้คิดไว้ก่อนเลยว่า Aggregation Framework นั้นมีฟังก์ชั่นที่ทำให้เราได้เลยหรือไม่ โดยเริ่มจากการเข้ามาดูที่คู่มืออย่างเป็นทางการของ MongoDB

การ reshape document อย่างง่ายที่สุดที่มักถูกใช้ก็คือการเปลี่ยนชื่อ field หรือสร้าง field ใหม่ ยกตัวอย่างเช่น เราต้องการ สร้าง field ชื่อ name และ field นี้มี อีก 2 field ข้างในคือ firstlast เราสามารถทำได้ดังนี้

db.users.aggregate([
  { $match: { username: 'khomkrit' } },
  { $project: { name: { first: '$first_name',
                        last: '$last_name'}}
  }
])

String functions

String Expression Operators คือกลุ่มฟังก์ชั่นที่ใช้จัดการกับ string ซึ่งโดยส่วนใหญ่แล้วจะเป็นพังก์ชั่นที่เราคุ้นเคยดีอยู่แล้ว เช่น $split$concat$dateToString$substr$trim เป็นต้น

หากเราต้องการนำ first_name รวมกับ last_name มารวมกันแล้วสร้างเป็น field ใหม่ชื่อ name เราสามารถทำได้ดังนี้

db.users.aggregate([
  {
    $project: { name: { $concat: ['$first_name', ' ', '$last_name'] } }
  }
])

จากตัวอย่างด้านบนใช้ Pipeline Operators ชื่อ $concat เพื่อรวม string เข้าด้วยกัน ทำให้ได้ผลลัพธ์ออกมาแบบนี้

{ 
  "_id" : ObjectId("5d4fe3bea6cbfb4102f49f74"), 
  "name" : "khomkrit sriwichai"
}

คราวนี้เรามาลองใช้ฟังก์ชั่นให้ซับซ้อนยิ่งขึ้นไปอีก โดยเราต้องการนำตัวอักษรตัวแรกของ first_name และ last_name มารวมกัน แล้วทำให้มันเป็นตัวพิมพ์ใหญ่ เราสามารถทำได้ดังนี้

db.users.aggregate([
  {
    $project: { name: { $concat: ['$first_name', ' ', '$last_name'] },
                abbreviate: { 
                  $toUpper: { 
                    $concat: [
                        { $substr: ['$first_name', 0, 1] },  
                        { $substr: ['$last_name', 0, 1] }
                    ]
                  }
                }
              }
  }
])

เราก็จะได้ผลลัพธ์ออกมาแบบนี้

{
  "_id" : ObjectId("5d4fe3bea6cbfb4102f49f74"), 
  "name" : "khomkrit sriwichai", 
  "abbreviate" : "KS"
}

นอกจาก String functions ที่ยกมาดังกล่าวแล้ว MongoDB ยังเตรียมฟังก์ชั่นกลุ่มต่างๆ ไว้ให้เราได้ใช้อีกเป็นจำนวนมาก และที่มักถูกใช้เป็นประจำ ซึ่งผมคิดว่าเราควรเริ่มทำความรู้จักกับฟังก์ชั่นกลุ่มเหล่านี้ก่อนเพราะมีโอกาสใช้บ่อย ได้แก่

  • Arithemetic เกี่ยวกับการคำนวนทางคณิตศาสตร์
  • Array จัดการข้อมูลชนิด array
  • Date จัดการวันที่
  • Set จัดการ array แบบ set เช่น เทียบว่า array 2 ชุดนี้เหมือนกันหรือไม่ หรือ array 2 ชุดนี้ มีอะไรบ้างที่ไม่อยู่ใน array อีกชุด หรือมีอะไรบ้างที่มีเหมือนกันทั้ง 2 array เป็นต้น
  • Logical เป็น boolean operator สำหรับกำหนดเงื่อนไขว่าให้ทำ หรือไม่ทำสิ่งใดในแต่ละ stage
  • Miscellaneous ฟังก์ชั่นอำนวยความสะดวกต่างๆ

และอื่นๆ อีกอีกมากมาย สามารถอ่านเอกสารอย่างเป็นทางการเพิ่มเติมได้ที่นี่

Pipeline performance

มีบางสิ่งที่เราต้องคำนึงอยู่เสมอเมื่อเราออกแบบ pipeline เพื่อประสิทธิภาพการทำงานที่ดี ได้แก่

  • พยายามลดจำนวน document ก่อนนำไปคำนวน หรือก่อนส่งต่อไปยัง stage ถัดไป ยิ่งเหลือ document จำนวนน้อยเท่าไหร่ การทำงานใน stage ถัดไปยิ่งเร็วขึ้นเท่านั้น
  • ใช้ index กับ $match และ $sort
  • ถ้าเราทำ sharding $match และ $project จะรันแยก shard กันของใครของมัน แต่สำหรับ operator อื่นๆ จะรันบน primary shard

Options

ฟังก์ชั่น aggregate() สามารถรับ parameter ตัวที่สองได้ (ตัวแรกคือ array ของ stage) ซึ่งก็คือ options ที่เราสามารถกำหนดได้ 3 ค่า ได้แก่ explainallowDiskUse และ cursor

explain

ผลลัพธ์จะคล้ายกับฟังก์ชั่น .explain() ตอนใช้คำสั่ง .find().explain() เมื่อต้องการใช้ให้กำหนดค่าเป็น true ดังนี้

db.reviews.aggregate([
  { $match: { ... } },
  { $group: { ... }}
], { explain: true })

allowDiskUse

เมื่อเราต้องเข้าไป query ข้อมูลขนาดใหญ่ๆ แล้วเราพบ error ประมาณนี้

assert: command failed: {
  "errmsg": "exception: Exceeded memory limit for $group, but didn't allow external sort. Pass allowDiskUse:true to opt in.",
  "code": 16945,
  "ok", 0
} : aggregate failed

โดยทั่วไปแล้ว error นี้จะเกิดขึ้นหลังจากมีการรอที่นานเกินไปในขณะที่ aggregation pipeline process document จำนวนมาก ในกรณีนี้ pipeline ใช้ memory มากเกินไปจนเกิดปัญหา ซึ่งเราสามารถแก้ได้โดยใช้ option allowDiskUse:true

db.reviews.aggregate([
  { $match: { ... } },
  { $group: { ... }}
], { allowDiskUse: true })

โดยทั่วไปแล้วการใช้ allowDiskUse มักทำให้ pipeline ทำงานช้าลง ดังนั้นเราควรใช้มันเท่าที่จำเป็นเท่านั้น โดยอาจเริ่มจากการพยายาม optimize pipeline ให้เหลือ document ที่เราต้องทำงานด้วยน้อยที่สุดเท่าที่เราทำได้ก่อนโดยใช้ $match และ $project เข้าช่วย

cursor

ตั้งแต่ MongoDB 2.6 เป็นต้นมา result ของ pipeline จะ return กลับออกมาเป็น cursor (ถ้าเรา access ผ่าน Mongo shell) ซึ่ง cursor ที่เราได้กลับมา เราสามารถเรียกใช้ method ต่างๆ ได้ เช่น .hasNext().next()toArray().pretty() และอื่นๆ อีกมากมาย

จุดประสงค์ของ cursor ก็คือ ทำให้เราสามารถ stream data จำนวนมากๆ ได้ ทำให้เราสามารถ process data จำนวนมากๆ ได้โดยสนใจข้อมูลชุดหนึ่ง ณ เวลาหนึ่งๆ โดยไม่จำเป็นต้องโหลดข้อมูลเข้ามา process พร้อม กันทีเดียวทั้งหมดทำให้เราไม่ต้องใช้ memory จำนวนมากจนเกินไป แต่เมื่อไรก็ตามที่เราใช้ method toArray() และ pretty() result document ที่เราได้ทุกอันจะถูกอ่านจาก memory ทันที

itcount() จะอ่าน document ทุกอันเพื่อนับจำนวน document จากนั้นส่งจำนวนกลับไปยัง client และทิ้ง document เหล่านั้นไป ดังนั้นถ้า application ของเราอยากรู้จำนวนของ document เราสามารถใช้ $group แล้ว count แทนได้ ซึ่งมีประสิทธิภาพที่ดีกว่าการอ่านจาก itcount()

Other capabilities

Aggregation pipeline นั้นนับเป็นทางที่เราควรเลือกใช้ก่อนเสมอเมื่อต้องทำ aggregation ใน MongoDB แต่ก็มีบางทางเลือกที่ดูดีกว่าและสั้นกว่าการใช้ aggregation pipeline เช่น .count() และ .distinct() ที่เป็นฟังก์ชั่นสำเร็จรูปที่ MongoDB เตรียมมาให้ใช้

db.users.count()
db.users.distinct('first_name')

map-reduce

ทำให้เราสามารถใช้ JavaScript ในการนิยาม process ทั้งหมดเองได้ มีความยืดหยุ่นค่อนข้างสูง เพราะเราเป็นคนเขียน logic ต่างๆ เองทั้งหมด แต่ก็แลกมากับการทำงานที่อาจช้าลง การอ่านที่ค่อนข้างเข้าใจยากเพราะต้องไล่โค้ด JavaScript เพื่อทำความเข้าใจ ไม่ใช่ฟังก์ชั่นสำเร็จรูปเหมือนที่ MongoDB เตรียมไว้ให้

aggregation operation โดยส่วนมากแล้ว MongoDB ได้เตรียม operations ต่างๆ ไว้ให้ค่อนข้างพร้อมใช้ในแทบทุกสถานการณ์ทั่วไป ดังนั้นก่อนคิดจะเขียน map-reduce ฟังก์ชั่นเองก็ควรมองหาจาก operator ต่างๆ ที่มีอยู่แล้วก่อน แต่อย่างไรก็ตาม map-reduce ก็อาจยังจำเป็นอยู่บ้างในกรณีที่ไม่มี operators ใดๆ สามารถทำงานได้อย่างที่เราต้องการ

Map-Reduce

สรุป

  • operator แบ่งออกเป็น 2 กลุ่ม สำหรับใช้กำหนดหน้าที่ให้ stage และใช้สำหรับ transform document
  • ควรใช้ $match และ $project ตั้งแต่ต้นๆ pipeline เพื่อลดจำนวนข้อมูลที่ต้อง process ใน pipeline ลง
  • เราสามารถสร้าง aggregation operator โดยใช้ JavaScript เองได้ผ่าน mapReduce แต่ก่อนใช้ควรดูก่อนว่า MongoDB นั้นสามารถทำได้อยู่แล้วหรือไม่

You may also like