Xây dựng một Realtime Dashboard sử dụng Spark Streaming, Kafka, Nodejs và MongoDB

Sharing is caring!

Hello! Sau mấy ngày cuối tuần vật vả với coding, không đi đâu cả (lý do là có bão thoai!) thì tui cũng đã thử nghiệm xong với cái việc đã đề ra ở bài trước đó là dùng Spark Streaming để lấy dữ liệu từ Kafka. Không chỉ dừng lại ở đó, tui cũng đã làm luôn cái Real-time Dashboard để hiển thị cái dữ liệu mà Spark đã lấy về và xử lý.

Kịch bản của tui cho bài tutorial này là như sau:

Giả sử tui có một website bán xe đạp vô cùng lớn đi ha (ví dụ là http://xedapnhat.vn Happy-Grin ), bán các nhãn hiệu xe đạp bao gồm: Trek, Giant, Jett, Cannondale, Surly. Do bán số lượng lớn mà, cho nên tui muốn xem coi là cứ 30s thì có bao nhiêu chiếc xe của từng nhãn hiệu này được bán và nó sẽ hiển thị thông tin đã được xử lý lên cái Dashboard.

Để giả lặp cái lượng xe đạp được mua online, tui sẽ viết một ứng dụng Producer để nó tự tạo cái dữ liệu này và push vào Kafka (chỉnh sửa lại cái WordListGenerator Producer ở bài trước là được).

Giải pháp xây dựng Real-time Dashboard như sau:

1. Tạo ứng dụng Bicycle Data Producer:

Cái này cũng không khó, các bạn chỉ cần lấy cái WordListGenerator ở bài trước ra, xong sửa lại như sau:

nội dung file pom.xml dùng Maven để build file jar:

Thằng này nó sẽ loop vô tận nếu như ta nhập tham số events là 0, tại mỗi lần lặp nó sẽ random số message sẽ send đi bằng vòng loop for, số lượng message này nằm trong khoản [rndStart, rndEnd], default là [0, 500]. GIữa mỗi lần lặp của vòng while, sẽ delay  30s

Message gửi đi có dạng: yyyy-mm-dd hh:mm:ss, bike_name

bike_name sẽ được random lấy ra từ biến bicyclesSrc:

2. Tạo Spark Streaming để consume data từ Kafka:

Sơ lược qua Spark Streaming một chút. Về cơ bản, Spark Streaming nó sẽ nhận dữ liệu từ các nguồn streaming data, cụ thể ở đây là Kafka tạm gọi là receiver data. Sau đó, nó chia nhỏ cái receiver data và chứa trong các batch data, mỗi batch được xem như là một tập các RDD. Một dãy các RDD thì Spark gọi đó là DStream.

Ở đây, tui sẽ tạo một StreamingContext với SparkConf, sau đó dựa vào StreamingContext này để tạo một DirectStream connect với Kafka, nhưng trước hết phải tạo SparkCommon Object cái đã:

SparkCommon này lưu cái SparkConf với các thông số về Spark cũng như thông số để connect với MongoDB. Các bạn chú ý cái “local[2]”. Bình thường thì tui hay để là local (có nghĩa là local[1]) hoặc local[*], do ta dùng streaming, và Spark khuyến cáo là nên dùng từ 2 thread trở lên để đảm bảo luôn có một thread làm nhiệm vụ nhận message từ Kafka.

Tiếp theo là tạo BicycleStreamingConsumer:

Các bạn chú ý cái đoạn code sau khi đã transform DStream bằng các hàm map, reduceByKey, các bạn phải gọi ít nhất một trong các hàm sau thuộc đối tượng DStream (nếu không nó sẽ báo lỗi):

  • print()
  • saveAsObjectFiles(prefix, [suffix])
  • saveAsTextFiles(prefix, [suffix])
  • saveAsHadoopFiles(prefix, [suffix])
  • foreachRDD(func)

Tham khảo thêm tại đây: http://spark.apache.org/docs/1.6.3/streaming-programming-guide.html#output-operations-on-dstreams

Chú ý thêm là ở chổ này các bạn gọi hàm print(), println() của Scala đều không có thấy gì ngoài console đâu nha.

Duyệt qua từng RDD của DStream để chuyển nó thành DataFrame bằng hàm toDF() sau đó mới insert DataFrame này xuống MongoDB. Khi insert thì MongoSpark nó sẽ tự tạo schema cho collection dựa vào cái class BikeAggreration.

Để dùng được các hàm chuyển từ RDD sang DataFrame hay DataSet thì các bạn phải tạo một SQLContext và import cái này:

Khai báo tên biến SQLContext là gì thì import với tên biến rồi theo sau là .implicits._

Không thể không nhắc đến file pom.xml:

3. Cấu hình MongoDB để sử dụng OpLog:

Khi Spark Streaming ở phần trên insert data xuống MongoDB thì thao tác insert này sẽ được lưu lại trong OpLog nếu MongoDB chạy ở dạng Replication khi Nodejs sẽ watching cái OpLog này để phát hiện khi nào thì sự kiện insert data xảy ra.

Cấu hình Replication:

Tạo 2 thư mục tên là rs1 và rs2

Mở lần lượt 2 Command Prompt lên và chạy 2 lệnh này ở thư mục <MONGO_DIR>\bin:

Tiếp theo, mở một Command Prompt khác lên, chạy lệnh mongo ở thư mục <MONGO_DIR>\bin:

Ngay cái dấu nhắc của mongo shell, chạy lệnh:

Sau đó, các bạn thoát ra bằng lệnh exit, sau đó chạy lại lệnh:

Nếu các bạn thấy ngay cái dấu nhắc của mongo shell là rs:PRIMARY> thì thôi, nếu là rs:SECONDARY> thì các bạn thoát shell và chạy lệnh mongo lại với tham số localhost:27018, phải làm vậy để connect vào đúng cái PRIMARY thôi.

Giờ là các bạn đang đứng ở PRIMARY shell nha. Chạy các lệnh sau tại dấu nhắc của mongo shell:

Cái này dùng để set cái thằng chạy ở port 27017 là PRIMARY, thằng kia là SECONDARY dựa vào cái tham số priority của nó. Xong, nếu lần sau có chạy dạng REplication nữa thì thằng chạy ở port 27017 sẽ là PRIMARY.

4. Tạo Nodejs Application dùng socket.io để làm Real-time Dashboard:

Các bạn download Nodejs ở đây: https://nodejs.org/en/download, chọn windows version, sau đó install theo hướng dẫn của nó, cứ để default rồi click Next.

Tiếp theo, các bạn tạo một thư mục tên là realtime-dashboard ở đâu cũng được. Trong bài này thì tui tạo ở đây: E:\projects\nodejs\realtime-dashboard.

Chuyển vào thư mục này:

Tạo file package.json:

nhập nội dung như file sau:

Đứng tại thư mục realtime-dashboard Install lần lượt các package sau:

Tạo thư mục web trong thư mục realtime-dashboard, sau đó tạo file index.html trong thư mục web với nội dung sau:

Mở file server-mongo.js lên, đổi nội dung như sau:

5. Run hệ thống:

Tui giả sử các bạn đã build BicycleDataProducerBicycleStreamingConsumer ra file jar bằng Maven rồi nha.

5.1. Start MongoDB:

Mở 3 Command Prompt, chuyển vào thư mục <MONGO_DIR>\bin, lần lượt chạy 3 lệnh sau:

5.2. Start ZooKeeper và Kafka:

Mở 2 Command Prompt khác lên, chuyển vào thư mục %KAFKA_HOME%\bin\windows, chạy 2 lệnh sau:

5.3. Tạo topic:

Mở 1 Command Prompt khác lên, chuyển vào thư mục %KAFKA_HOME%\bin\windows, chạy lệnh sau:

Tạo bike-data topic:

5.4. Chạy Nodejs Server:

Mở 1 Command Prompt lên, chuyển vào thư mục E:\projects\nodejs\realtime-dashboard, chạy lênh sau:

Đợi nodejs start xong thì mở web browser lên, vào http://localhost:8080 sẽ thấy trang Dashboard

5.5. Chạy BicycleStreamingConsumer:

Mở 1 Command Prompt lên, chạy lệnh sau (không cần chuyển vào thư mục dist của project này):

5.6. Chạy BicycleDataProducer:

Mở 1 Command Prompt lên,chuyển vào thư mục <BicycleDataProducer_PROJECT_DIR>\dist, chạy lệnh:

5.6. Xem video demo:

Cứ mỗi lần thằng BicycleStreamingConsumer nó insert data xuống MongoDB (kiểm tra bên mongoDB thì thấy số record đã thay đổi) là lặp tức Nodejs nó detect được và lấy những cái record mới nhất in console, đồng thời socket.io cũng emit data cho web browser làm cho biểu đồ nó thay đổi.

6. Trường hợp không dùng Nodejs watching MongoDB:

Với trường hợp này, ta không dùng package mongo-oplog-watch của Nodejs để kiểm tra khi nào thì có data insert vào MongoDB, ta có thể push cái data mà BicycleStreamingConsumer đã xử lý ngược lại vào Kafka với một topic khác, rồi bên Nodejs, dùng Kafka Consumer để hứng dự liệu này sau đó emit nó ra browser bằng socket.io

Các bạn xem cái workflow như sau:

 

Để làm theo workflow này, các bạn chỉ tay đổi code cho BicycleStreamingConsumer và Nodejs

6.1. Thêm một thread nữa để dùng cho việc send message:

Các bạn sửa lại local[2] thành local[3];

 

6.2. Thêm code cho BicycleStreamingConsumer:

chỉ thêm cái hàm pushBikeInfoInKafka() để xử lý việc push data vào lại Kafka

Thêm biến inTopic:

Bỏ 2 dòng:

Thay dòng:

Bằng dòng:

6.3. Tạo file server-kafka.js

Download thêm package kafka-node, đứng tại thư mục E:\projects\nodejs\realtime-dashboard, chạy lệnh:

Tạo file server-kafka.js:

6.4. Chạy lại Nodejs và BicycleStreamingConsumer:

Trước khi chạy, các bạn tạo thêm một topic nữa là bike-data-visualization, giả sử ZooKeeper và Kafka vẫn còn đang chạy:

với Nodejs:

với BicycleStreamingConsumer:

6.5. Xem video demo:

Các bạn sẽ thấy mổi khi thằng BicycleStreamingConsumer nó nhận data từ Kafka là nó xử lý và đẩy ngược lại vô Kafka sau đó thì thằng Nodejs nó nhận được liền ngay lập tức.

Tới đây là mệt lắm roài, trên đây là những việc nhỏ làm demo thôi. Nếu dựng hệ thống thực tế thì còn phải tối ưu code rất nhiều do phải xử lý dự liệu cực lớn, nhưng qua bài này tui cũng hy vọng các bạn sẽ giống tui, cũng sẽ lượm lặt được ít nhiều kiến thức cơ bản để sau này có thể triển khai thực tế. Thanks!

Sharing is caring!

Tạo một Kafka Producer để giả lập nguồn dữ liệu đầu vào bằng ngôn ngữ Scala

Vincent Le

Tui là Lê Minh Đạt, tên tiếng anh là Vincent(do thích nhân vật Vincent Valentine, ai từng là fan của trò Final Fantasy VII thì sẽ biết nhân vật này, hehe). Đang tập tành làm một blogger viết về mảng lập trình, mong muốn được chia sẻ những gì đã học được, tích góp được trong 10 năm đi làm thợ code.

shares