Tạo một Kafka Producer để giả lập nguồn dữ liệu đầu vào bằng ngôn ngữ Scala

Sharing is caring!

Hi all, như đã nói trong phần cuối của bài cài đặt Kafka, hôm nay tui sẽ thử viết một cái custom Producer để tạo nguồn dữ liệu đầu vào cho Kafka Single Node. Trong bài trước, tui đã thử chạy cái producer mặc định của Kafka đó là mỗi lần nhập một đoạn text từ bàn phím thì bên Consumer sẽ nhận được cái đoạn text đó ngay tức thì. Vậy, cái event xảy ra đó là nhập liệu từ bàn phím và data phát sinh ra từ cái event này chính là nội dung text mà tui đã nhập. Do đó, tạo một custom Producer chẳng qua là mình sẽ làm cái việc là cho cái custom Producer này collect dữ liệu từ các event khác nhau rồi xào nấu gì đó với cái data này sau đó send nó đi.

Do tui vẫn muốn sử dụng lại cái chương trình WordCount lúc trước khi tìm hiểu Spark, thay vì WordCount nó sẽ đọc text từ file rồi thống kê từ thì bây giờ tui muốn là chương trình WordCount nó sẽ hứng dữ liệu từ Kafka Streaming, có nghĩa là chương trình WordCount của Spark sẽ làm Consumer. Vậy nên, cái custom Producer này nó sẽ đọc data Words ở đâu đó, cụ thể là từ một file chứa 500 từ, mỗi lần nó chỉ lấy ramdom 10 từ, sau đó send 10 từ này đi, WordCount sẽ hứng lấy 10 từ này và thông kê. Trong phạm vi bài hôm nay, tui sẽ tạo cái Producer này, bài kế tiếp tui sẽ chỉnh sửa cái chương trình WordCount lại thành Consumer của Kafka. Các bạn xem workflow như sau:

Tuy là làm demo thì 3 cái block này nằm trên một máy localhost, nhưng trên thực tế nếu có điều kiện thì 3 block này có thể nằm trên 3 hệ thống khác nhau.

1. Tạo chương trình WordListGenerator làm Producer:

Để viết WordListGenerator, tui dùng Scala với Intellij IDEA để viết nha các bạn. Đầu tiên là các bạn tạo một project Maven rỗng:

Tại màn hình Welcome, click “Create New Project”.

Ở cửa sổ New Project, chọn Maven (bỏ chọn Create from archetype) rồi click Next.

GroupId: Các bạn nhập gì cũng được, không cần giống tui

ArtifactId: word-list-generator

Sau đó click Next.

Nhập tên Project là WordListGeneratorProducer, sau đó chọn đường dẫn để lưu project. Click Finish

Sau khi Intellij IDEA nó load Project mới xong,các bạn expand cái thư mục src ra, sau đó lần lược right click vào thư mục maintest, chọn New -> Directory, đặt tên là scala

Sau đó, chọn cái thư mục main/scala này, right click, chọn “Mark Directory as” -> Source Root

Làm tương tự cho thư mục test/scala, right click, chọn “Mark Directory as” -> Test Source Root

Các bạn mở cái file pom.xml ra, đổi nội dung như sau:

Sau đó click “Import Changes” ở góc phải dưới của Intellij IDEA để nó cập nhật các dependencies

Tiếp theo, right click vào thư mục src/main/scala chọn New ->Package để tạo một package là com.vincentle.kafka.training. Sau đó right click vào package này, chọn New -> Scala Class, chọn mục object và nhập tên object tên là WordListGeneratorProducer.

Đầu tiên là lấy các parameter sẽ truyền vào khi chạy chương trình này theo thứ tự từ 0 -> 4:

Hàm getOrElse() sẽ trả về giá trị default nếu như chúng ta không truyền paramater nào cả. Tiếp theo là tạo đối tượng Producer của Kafka.

biến clientId dùng để xác định coi là data khi truyền qua cho Kafka là của ai để không bị trùng lặp. Ở đây tui chỉ thay đổi 4 cái biến sẽ quy định cái kiểu data của mình khi truyền là:

bootstrap.servers: cái này cho biết là ta sẽ connect với Kafka/Broker ở node nào, mặc định tui để là “localhost:9092”

client.id: giá trị là biến clientId

key.serializervalue.serializer: cái này dùng để quy định kiểu dữ liệu của cặp key/value khi gửi đi là gì. Ở đây, tui để đều là String hết bằng: org.apache.kafka.common.serialization.StringSerializer

Tạo producer và truyền cái biến props này vào. Các bạn chú ý là chổ cái Constructor của class KafkaProducer:

Tiếp theo là load word list từ file:

Lúc này biến wordList sẽ là một Seq gồm 500 từ (mỗi từ một dòng). Các bạn có thể tạo list từ tại đây: https://randomwordgenerator.com/

Sau khi có nguồn list từ, tui sẽ loop nó với số lượng events nhận từ parameter và random 10 từ sẽ send nó đi:

Ở đây tui tạo một hàm là getRandomWordList() để lấy random 10 từ cho mỗi event:

Sau đó tui tạo biến keyvalue đều là kiểu String hết. Hàm mkString() sẽ tạo cái chuổi các từ được phân cách bằng dấu “,”. Cứ mỗi một lần gửi đi là tui sẽ tạo một ProducerRecord với cái biến topic nhận từ parameter, biến key và  biến value, sau đó dùng hàm send() của KafkaProducer để gửi nó đi.

Chú ý là cái KafkaProducer constructor các bạn để kiểu giá trị của Key/Value là gì thì trong constructor của ProducerRecord cái Key/Value của phải để kiểu giá trị y như vậy.

Để ngưng một khoản thời gian là interval time giữa các lần send, tui dùng Thread.sleep() để delay, nếu không khi chạy, nó chạy một cái ào luôn ấy.

Sau khi chạy xong, đóng kết nối:

Tổng hợp lại thì code đầy đủ như sau:

Dùng maven để build ra file jar. các bạn chọn menu View -> Tool Windows -> Maven Projects, ở của sổ này, các bạn chọn Lifecycle, double click package để build file jar.

2. Test chương trình WordListGeneratorProducer:

Đến đây, xem như các bạn đã xem bài cài đặt Kafka rồi nha. Bắt đầu khởi động Kafka Single Node với Single Broker. Các lệnh sau được chạy ở thư mục %KAFKA_HOME%\bin\windows:

2.1. Khởi động ZooKeeper:

2.2. Khởi động Kafka:

2.3. Chạy chương trình Consumer mặc định của Kafka, sử dụng topic là welcome-messasge:

2.4. Chạy chương trình WordListGeneratorProducer của tui:

Sau khi build file jar thành công, file word-list-generator-1.0.jar của tui sẽ nằm ở thư mục: <PROJECT_DIR>\dist

Dùng lệnh java với tham số -cp để chạy file jar này:

Các tham số lần lượt như sau:

brokers: localhost:9092

topic: welcome-message

events: 20

intervalEvent: 5 seconds

filePath: E:\temp\data\word-list.txt

Kết quả các bạn sẽ thấy như sau:

Nó lặp 20 lần, cứ mỗi 5 giây thì nó send data một lần. Ứng dụng thực tế thì các bạn có thể biến cái chương trình này thành một cái Application WebServer (nhúng Jetty Server vào chẳng hạn ) để nó nhận data từ broswer sau đó dùng producer để đẩy data đi. Bài sau tui sẽ trình bày dùng Spark Streaming để hứng data. Xin cảm ơn!

Sharing is caring!

Tìm hiểu và cài đặt Apache Kafka Single Node/Single Broker trên Windows 10
Xây dựng một Realtime Dashboard sử dụng Spark Streaming, Kafka, Nodejs và MongoDB

Vincent Le

Tui là Lê Minh Đạt, tên tiếng anh là Vincent(do thích nhân vật Vincent Valentine, ai từng là fan của trò Final Fantasy VII thì sẽ biết nhân vật này, hehe). Đang tập tành làm một blogger viết về mảng lập trình, mong muốn được chia sẻ những gì đã học được, tích góp được trong 10 năm đi làm thợ code.

shares