Truy vấn dữ liệu từ MongoDB với Apache Spark

Sharing is caring!

Chào các bạn. Đã nói là dân lập trình, viết chương trình thì dù là Application độc lập hay là Web thì điều đầu tiên là phải nghĩ ngay đến cái framework hay cái platform mình sử dụng nó có làm việc được với các hệ Database hay không bao gồm cả có quan hệ hay không có quan hệ. Đây cũng chính là câu hỏi tui tự đặt ra khi tìm hiểu Apache Spark.

Như trong bài đầu tiên tui có nói là đối với Big Data thì tui chọn 2 cái Database đó là MongoDB và Elasticsearch vậy nên trong bài hôm nay, tui sẽ thử dùng Apache Spark để kết nối và truy vấn data từ MongoDB (còn Elasticsearch thì sẽ viết một bài riêng sau). May mắn thay, MongoDB có tạo một thư viện để cho Spark nó “nói chuyện” được với MongoDB tên là MongoDB Spark Connector. Kịch bản của hôm nay cũng đơn giản thôi: đó là save một mớ document từ file json vào MongoDB, sau đó dùng câu SQL để lấy dữ liệu lên lại, sau đó hiển thị các record ra.

1. Chuẩn bị:

– Tui giả sử là các bạn đã cài Apache Spark 1.6.3 và MongoDB (với MongoDB thì version mới nhất là 3.4, donwload tại đây. Về cách sử dụng thì các bạn xem docs của MongoDB tại đây)

– Tiếp theo là cái MongoDB Spark Connector, do tui sài Apache Spark 1.6.3 nên cái connector này tui sẽ dùng version 1.1.0 (các bạn xem docs tại đây).

Cái cuối cùng là data, cái này mới căng nha. Lúc tui tìm hiểu tui cũng tự hỏi như Captain America Overjoy :

Sau một lúc lục lọi thì tui tìm được cái data sample về employee ở đây Employee50K Employyee100K. Nhưng đây là data cho Elasticsearch. Sau đó tui chế biến lại cái Employee50K thành data để import vào MongoDB (Mongo_Employees50K).

Rồi, vậy là đầy đủ các thứ, chuyển qua code thôi.

2. Kết nối giữa Spark và MongoDB bằng “MongoDB Spark Connector”:

Cách tạo project cho Spark bằng Maven tui đã làm mẫu ở bài WordCount, các bạn có thể xem lại nha. Sau khi tạo project mới xong, các bạn thêm vào file pom.xml phần <dependencies></dependencies> như sau:

đó là các bạn phải sử dụng thêm 2 thư viện nữa đó là Spark SQL (dùng để truy vấn bằng SQL) và MongoDB Spark Connector (dùng để Connect với MongoDB).

2.1. Tạo object SparkCommon:

Đầu tiên, tui tạo một object tên là SparkCommon với mục đích gom mấy cái Config, Context vào một chổ để tiện quản lý.

Giải thích code:

Tạo một biến để lưu cái tên collection là employee

Đây là cú pháp uri để connect tới mongoDB, với uri trên thì các bạn sẽ connect vào MongoDB chạy ở máy localhost, port là 27017, sử dụng databse là hrdb, trong database này sử dụng collection là employee (phân cách giữa database với collection là dấu chấm nha các bạn).

Hai cái hàm setAppName()setMaster() của SparkConf() thì các bạn đã biết ở bài WordCount rồi. Để ý cái hàm set(), ở đây tui set 2 cái key là “spark.mongodb.input.uri”“spark.mongodb.output.uri” với cùng giá trị là mongUri. Ý nghĩa của việc này là báo cho Spark biết là tui sẽ ghi data và đọc data của MongoDB ở máy nào, database là gì, collection là gì, tương ứng như trên là máy localhost:27017, database là hrdb, collection là employee.

Còn nhiều key khác nữa, các bạn xem thêm trong docs của MongoDB Spark Connector

Trong Scala, khai báo biến bằng lazy val có nghĩa là biến đó sẽ được gán giá trị ở lần gọi đầu tiên, chứ không gán giá trị ngay lúc khai báo

Ở đây, tui tạo một biến SQLContext() với cái sparkCtx (cái SparkContext() thì các bạn cũng biết rồi), dùng để truy vấn, đọc hoặc ghi data xuống MongoDB ứng với cái SparkContext(), mà SparkContext() thì nhận cái SparkConf() như trên.

Thứ tự tạo biến: SparkConfig -> SparkContext -> SQLContext

2.2. Truy vấn data từ MongoDB

Tạo file SparkMongoEmployee.scala, cùng package với file SparkCommon.scala ở trên

Giải thích code:

Ở hàm main, tui dùng hàm isCollectionExisted() với parameter là tên collection để kiểm tra xem cái collection này có chưa hoặc có rồi mà chưa có record nào thì tui sẽ import data vô collection này.

Tui dùng class MongoConnector để lấy biến db ra, biến db này là MongoDatabase. Sau đó tui lấy collection tên là employee bằng hàm getCollection(), sau đó gọi hàm count() của collection để kiểm tra. Các bạn chú ý cái ReadConfig, ở đây tui lấy cái sparkCtx từ SparkCommon để truyên vô, báo cho nó biết là tui muốn lấy collection employee trong database là hrdb

Các bạn có thể dùng lệnh mongoimport.exe trong thư mục bin của MongoDB để import data từ file json. Ở đây tui muốn test khả năng ghi data từ Spark xuống MongoDB với số lượng record lớn để xem như thế nào thôi nha.

Để import data từ file json, tui tạo hàm importDataToMongoDB() với 2 parameters là colName: tên collection cần import data, fileName: đường dẫn đến file json để import data vào.

Dùng sqlCtx để đọc file json vào DataFrame bằng hàm json của class DataFrameReader (chính là biến member read của SQLContext).

Dùng hàm save() của MongoSpark lưu data xuống với parameter là df.write chính là DataFrameWriter. Ở đây, tui hơi cẩn thận là chỉ ra cái collection name nào cần import data luôn. Nếu các bạn bỏ cái option đi thì MongoSpark nó hiểu là lấy cái collection name mình đã chỉ ra từ đầu trong lúc tạo SparkConf (các bạn thử sau hen)

Tóm lại như sau:
Ghi data Dùng DataFrameWriter
Đọc data Dùng DataFrameReader

Import xong thì tui dùng SQLContext để truy vấn data thử xem coi nó có hay chưa.

Dùng hàm load của MongoSpark với parameter là SQLContext để tạo DataFrame

Để sửa dụng câu SQL, tui phải đặt tạm cái collection này một cái table name (giống như trong CSDL truyền thống như MySQL vậy) bằng hàm registerTempTable() với parameter là tên table (cái tên này các bạn không bắt buộc phải đặt trùng với tên collection của MongDB nha, ở đây tui đặt trùng luôn cho tiện).

Sau đó, viết câu truy vấn (cơ bản là không khác gì với câu truy vấn như của MySQL), dùng hàm sql() để chạy câu truy vấn này. Ở đây tui select tất cả các nhân viên có tuổi nhỏ hơn 40 và order tất cả record tăng dần theo tuổi.

Công việc còn lại là in tổng số record có được bằng hàm count() và in tất cả record ra màn hình console bằng hàm show() của DataFrame

Cuối cùng là stop cái SparkContext

Các bạn phải start MongoDB trước rồi hãy run file SparkMongoEmployee.scala

Run cái file SparkMongoEmployee (run như thế nào thì các bạn xem lại bài WordCount nha), kết quả nó ra như vầy:

Total records:

.Record list:

Do print record ở console cho nên Spark nó chỉ lấy 20 records đầu tiên thôi nha các bạn mặc dù tổng số là 48186.

3. Dùng Maven để build file jar với các dependencies:

Ở bài WordCount, tui có dùng Artifacts để build file jar, nhưng lúc đó tui không có nhiều thư viện bên ngoài import vào cho nên khi build bằng artifacts, chỉ tạo có một file jar thôi. Nhưng trong bài này, tui có dùng thêm một số thư viện đi kèm (Maven gọi là các dependency), để khi tui submit file jar chính của tui lên Spark thì nó cũng hiểu được các dependency đi kèm đang được tham chiếu ở đâu, nếu không nó sẽ “la làng” lên là “Class Not Found”.

Khi các bạn run một file Scala trong IDE mà file này có import một số thư viện bên ngoài vào thì IDE nó giúp các bạn set classpath các file thư viện khi run rồi. Trên thực tế, khi các bạn run bên ngoài console thì các bạn phải tự set classpath cho các file thư viện này, thì Maven sẽ giúp các bạn làm việc đó, nó sẽ tự giúp các bạn copy các dependency từ repo ra thư mục mà bạn chỉ định, sau đó nó sẽ tự set classpath cho các thư viện này luôn.

Các bạn mở file pom.xml, mục <plugins></plugins> thêm các maven plugin sau vào:

Cái plugin “maven-dependency-plugin”, các bạn xem cái tag

${basedir} tạm gọi là cái biến của Maven, nó sẽ tham chiếu đến thư mục gốc của project thì ${basedir}/dist/lib là thư mục sẽ chứa tất cả các file dependency nào mà ta đang sử dung, Maven nó copy hết vào đây.

Cái plugin “maven-jar-plugin”, các bạn xem các cái tag:

outputDirectory: ${basedir}/dist chính là thư mục chứa cái file jar chính của các bạn. Cái tên file jar thì nó nằm ở đây:

Lúc các bạn tạo project bằng Maven thì nó có cho các bạn nhập những thông tin trên.

addClassPath: nếu là true thì Maven nó sẽ thêm classpath của các file jar dependency vào file MANIFEST.MF

classPathPrefix: các file dependency các bạn config nó được copy ở đâu trong cái mục “maven-dependency-plugin” thì ở đây các bạn phải ghi y như vậy

mainClass: tên class nào chứa hàm main và package của nó.

Để build jar bằng Maven trong Intellij IDEA, các bạn chọn menu View -> Tool Windows -> Maven Projects. Các bạn sẽ thấy cửa sổ Maven Project nó hiện ra ở bên phải:

Các bạn click package trong mục Lifecycle để tạo file jar. Sau khi tạo xong, các bạn sẽ thấy thư mục dist nó được tạo ở bên của sổ Project, trong thư mục dist sẽ là file jar tên là spark-mongo-1.0.jar:

Các file jar trong thư mục dist/lib:

Sau khi có file jar, bước cuối cùng là submit lên Spark (nhớ là MongoDB cũng đang chạy nha các bạn), mở Command Prompt lên và chạy lệnh sau:

Kết quả ra sao? các bạn tự đoán nha.  Happy-Grin

Sharing is caring!

Viết ứng dụng Word Count trên Spark bằng Scala, sử dụng Intellij IDEA Community
Cài đặt Apache Spark cluster trên hệ điều hành Linux/Ubuntu

Vincent Le

Tui là Lê Minh Đạt, tên tiếng anh là Vincent(do thích nhân vật Vincent Valentine, ai từng là fan của trò Final Fantasy VII thì sẽ biết nhân vật này, hehe). Đang tập tành làm một blogger viết về mảng lập trình, mong muốn được chia sẻ những gì đã học được, tích góp được trong 10 năm đi làm thợ code.

shares