Phân tích dữ liệu lớn với Apache Pig trên AWS Elastic MapReduce

Bạn gặp khó khăn khi cấu hình Hadoop Cluster dưới on-premise để xử lý dữ liệu lớn?

Hãy đọc bài viết này để tìm hiểu cách có thể giúp bạn tối ưu hóa gấp 10x thời gian đó.

1. Giới thiệu về Apache Pig:

Pig là một dự án nghiên cứu được bắt đầu bởi Yahoo và năm 2006. Hai năm sau, vào năm 2008 Apache Pig được phát hành phiên bản đầu tiên.

Apache Pig là một nền tảng ngôn ngữ cấp cao. Để phân tích và truy vấn dữ liệu lớn được lưu trữ trong HDFS của một Hadoop Cluster.

Ngoài ra, Pig còn cung cấp một ngôn ngữ cấp cao để viết các chương trình phân tích dữ liệu gọi là Pig Latin, khá giống với SQL. Pig latin cho phép Data Analyst viết các câu truy vấn và phân tích. Thậm chí có thể sử dụng Pig latin để tạo ra ETL pipeline. 

Trong bài viết này, tôi sẽ tập trung vào việc xây dựng hệ thống Hadoop Cluster cài đặt Pig. Và sử dụng vài task Pig latin căn bản nhằm cung cấp một cái nhìn tổng quan về việc phân tích dữ liệu với Pig.

2. Giới thiệu AWS Elastic Mapreduce:

Việc ứng dụng Hadoop Cluster bên dưới on – premises có nhiều hạn chế, nên tôi đã triển khai phương án trên điện toán đám mây. Để bổ sung những khuyết điểm của Hadoop Cluster bên dưới on – premises. Việc triển khai Hadoop Cluster để sử dụng phân tích bằng Pig trên AWS hỗ trợ doanh nghiệp dễ dàng mở rộng, chịu lỗi cao và đáp ứng tính sẵn sàng.

AWS Elastic MapReduce là nền tảng dữ liệu lớn trên nền tảng đám mây hàng đầu. Để xử lý lượng lớn dữ liệu bằng các công cụ nguồn mở như Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache HudiPresto. Amazon EMR giúp chúng ta dễ dàng thiết lập, vận hành và mở rộng môi trường dữ liệu lớn bằng cách tự động hóa các tác vụ tốn thời gian như cung cấp dung lượng và tinh chỉnh các cluster. Với EMR, bạn có thể chạy phân tích ở cấp độ petabyte với chi phí ít hơn một nửa so với các giải pháp tại chỗ truyền thống và nhanh hơn gấp 3 lần so với Apache Spark tiêu chuẩn. Chúng ta có thể chạy khối lượng công việc trên các phiên bản Amazon EC2, trên các cụm Amazon Elastic Kubernetes Service (EKS) hoặc tại chỗ bằng cách sử dụng EMR trên AWS Outpost

3. Xây dựng hệ thống Hadoop Cluster

Chuẩn bị dữ liệu:

Dữ liệu về các loại bia được lấy từ nguồn Kaggle: https://www.kaggle.com/gauravharamkar/beer-data-analytics và upload lên AWS S3 Bucket. Việc sử dụng phân tích bia đang nổi lên và đương nhiên, các nhà sản xuất bia có lượng dữ liệu lớn nhất sẽ có lợi thế hơn. Các nhà máy bia đã và đang sử dụng dữ liệu để thúc đẩy các chiến lược tiếp thị của họ. Sử dụng các khía cạnh khác doanh số bán.

Bộ dữ liệu này chứa thông tin về các loại bia khác nhau và các khía cạnh khác nhau của nó. Như phong cách bia, khối lượng bia tuyệt đối, tên bia, tên nhà sản xuất bia. Dữ liệu bia giúp các nhà máy bia hiểu được hành vi của người tiêu dùng và có thể cải thiện do hình thức bia, hương vị bia, mùi thơm của nó, xếp hạng tổng thể, đánh giá, vv.

Kiến trúc hệ thống

Kiến trúc hệ thống

Hệ thống Hadoop Cluster được tạo ra bằng AWS Elastic MapReduce tự động tạo ra các instances AWS EC2. Phân quyền tự động cho các EC2 thành Master node và Slave nodes. Hadoop Cluster sẽ được cấu hình tự động Pig 0.17.0 và Hadoop 2.10.1.

Hadoop Cluster này sẽ được bảo vệ bằng cách đặt trong lớp Public Subnet. (Cách đặt tên ngụ ý cho phép dữ liệu được phép ra vào lớp mạng ảo) nằm trong lớp mạng ảo VPC. Và việc tạo lớp mạng ảo VPC nhằm mục đích tách biệt các Hadoop Cluster khác nhau (trong trường hợp có nhiều Hadoop Cluster). 

Hadoop Cluster này sẽ được đặt tách ra từng Security Group riêng biệt cho Master và Slave. Nhằm mục đích giới hạn các Port và IPs được đi vào hệ thống trong Public Subnet.

Các dữ liệu lưu trữ của Hadoop Cluster, cụ thể là các file log, dữ liệu sử dụng và kết quả sẽ được lưu trữ tại S3 Bucket, là một features của dịch vụ lưu trữ dữ liệu dạng object, Simple Storage Service – AWS S3.

Phân vùng của máy chủ đang lưu trữ Hadoop Cluster là Asia Pacific (Singapore)ap-southeast-1

Cài đặt Hadoop Cluster

1 Thiết lập VPC

Tiến hành thiết lập VPC trong AWS VPC

big data apache pig hadoop thiet lap VPC

Bước 1 Thiết lập chọn Launch VPC Wizard

Chon chu de co san cua VPC. VPC voi 1 Public Subnet 3

Bước 2. Chọn chủ đề có sẵn của VPC. VPC với 1 Public Subnet, bấm chọn Select

VPC va cau hinh cac dia chi IPs 4

Bước 3 Đặt tên VPC và cấu hình các địa chỉ IPs (nếu cần). Chọn Create VPC để tạo

 

Da tao thanh cong VPC 1

Bước 4 Đã tạo thành công VPC.

 

Xac nhan lai Public Subnet da duoc tao

Bước 5 Xác nhận lại Public Subnet đã được tạo.

2 Thiết lập Key Pair cho Instance EC2 – Master.

Buoc 6 Tao Key Pair trong AWS EC2

Bước 6 Tạo Key Pair trong AWS EC2.

Buoc 7 Dat ten chon dinh dang file ppk de co de dang SSH vao EC2 Master. Chon tao Key Pair

Bước 7 Đặt tên, chọn định dạng file ppk để có dễ dàng SSH vào EC2 Master. Chọn tạo Key Pair

Buoc 8 Xac nhan file Key Pair

Bước 8 Xác nhận file Key Pair

3 Thiết lập Hadoop Cluster trong AWS EMR

Buoc 9 Chon tao Cluster

Bước 9 Chọn tạo Cluster

Buoc 10 Chon Go to Advanced options

Bước 10 Chọn Go to Advanced options

Buoc 11 Thiet lap chon phien ban emr 5.33.0

Bước 11 Thiết lập chọn phiên bản emr-5.33.0 sẽ ổn định hơn bản 6.xx.xx, chọn các phần mềm cần thiết. Chọn tiếp theo

Ở mục Multiple Master nodes, ta có thể tùy chỉnh để AWS tự động tạo ra một master node mới khi master nodes EC2 gặp vấn đề

Ở mục Step, ta có thể submit một tệp pig, hoặc hive để tự động cho cho Hadoop Cluster thực hiện 1 tác vụ đã được định nghĩa trước, ở bước này rất phù hợp cho nhu cầu sử dụng Hadoop Cluster chạy ETL Pipeline.

Buoc 12. Chon network la VPC da tao va cac cau hinh cua tung Master va Slave.

Bước 12. Chọn network là VPC đã tạo và các cấu hình của từng Master và Slave.

Tùy chỉnh cấu hình, lớp mạng của Hadoop Cluster tùy thuộc theo nhu cầu của doanh nghiệp/người dùng.

Buoc 13 Dat ten va tro den dia chi file S3 bucket da duoc tao

Bước 13 Đặt tên và trỏ đến địa chỉ file S3 bucket đã được tạo (có thể để default)

Buoc 14 Chon Key Pair da tao va AWS EMR se tu dong tao cac security group cho Master va Slave

Bước 14 Chọn Key Pair đã tạo và AWS EMR sẽ tự động tạo các security group cho Master và Slave

4. Thiết lập cấu hình Security Group để có thể SSH vào từ địa chỉ IP cá nhân.

Buoc 15 Chon ElasticMapReduce master

Bước 15 Chọn ElasticMapReduce-master

 

Buoc 16 Edit Inbound Rules

Bước 16 Edit Inbound Rules

Buoc 17 Them Type SSH va chon source My IP.

Bước 17 Thêm Type SSH và chọn source My IP.

5. SSH vào Hadoop Cluster: EC2 Master

Sử dụng phần mềm PuTTY để SSH vào EC2 Master

Buoc 18 Dien Hostname vao bang dns cua EC2 Master

Bước 18 Điền Hostname vào bằng dns của EC2 Master

Buoc 19 Chon Key Pair de co the truy cap vao EC2 Master

Bước 19 Chọn Key Pair để có thể truy cập vào EC2 Master (có thể tải xuống file key pair trong EC2)

Buoc 20 Ket noi thanh cong.

Bước 20 Kết nối thành công.

21

Bước 21. Sử dụng các lệnh để tạo file pig latin.

Để tạo file: touch [File name].pig

Để truy cập vào file: vim [File name].pig

Để chạy pig: pig [File name].pig

4. Phân tích dữ liệu với Apache Pig

1.Phân tích từng loại beer được người dùng đánh giá tổng quát cao nhất và nồng độ cồn người dùng thích

Task1 – Code

–lấy dữ liệu gốc từ file csv

raw_data = LOAD ‘s3://demohungemr/BeerProject.csv’ using PigStorage(‘,’);

–tạo 1 bảng có cấu trúc 9 cột

structured_data = FOREACH raw_data  GENERATE  (float) $0 as beer_ABV, (int) $1 as beer_beerID, (int) $2 as beer_brewerID, $3 as beer_name, $4 as beer_style, (float) $5 as review_appearance, (float) $6 as review_palette, (float) $7 as review_overall, (float) $8 as review_taste, $9 as review_profileName;

–lọc dữ liệu trong bảng

filtered_data = FILTER structured_data BY review_overall is not null;

filtered_data = FILTER structured_data BY beer_ABV is not null;

filtered_data = FILTER structured_data BY beer_style is not null;

–chọn 100000 dòng đầu tiên trong bảng

filtered_data_limit = LIMIT filtered_data 100000;

–duyệt các thành phần trong filtered_data_limit sau đó sinh ra một bảng mới chứa các cột beer_ABV, beer_style, review_overall

A1 = FOREACH filtered_data GENERATE beer_ABV as beer_ABV, beer_style as beer_style, review_overall as review_overall;

–nhóm lại bảng theo beer_style

B1 = GROUP A1 by beer_style;

— đối với mỗi nhóm (beer_style), tìm dòng có tổng thể được đánh giá cao nhất, sắp xếp theo thứ tự giảm dần

C1 = FOREACH B1 {

         I1 = ORDER A1 BY review_overall DESC;

     — only take the row with max review_overall

         J1 = LIMIT I1 5;

         GENERATE group, J1.( beer_ABV, beer_style, review_overall);

};

— lưu trữ kết quả vào một folder

STORE C1 into ‘s3://demohungemr/task1_result.csv’ using PigStorage(‘,’);

File Pig latin: task1.pig

File Pig latin task1 pig

Kết quả: 

Ket qua File Pig latin task1 pig

2 Phân tích dữ liệu cho ra để vẽ histogram của các beerID theo được đánh giá từ 0-5

Task2 – Code

–lấy dữ liệu gốc từ file csv

raw_data = LOAD ‘s3://demohungemr/BeerProject.csv’ using PigStorage(‘,’);

–tạo 1 bảng có cấu trúc 9 cột

structured_data = FOREACH raw_data  GENERATE  (float) $0 as beer_ABV, (int) $1 as beer_beerID, (int) $2 as beer_brewerID, $3 as beer_name, $4 as beer_style, (float) $5 as review_appearance, (float) $6 as review_palette, (float) $7 as review_overall, (float) $8 as review_taste, $9 as review_profileName;

–lọc dữ liệu trong bảng

filtered_data = FILTER structured_data BY review_overall is not null;

filtered_data = FILTER structured_data BY beer_beerID is not null;

–chọn 100000 dòng dữ liệu đầu tiên trong bảng

filtered_data_limit = LIMIT filtered_data 100000;

A2 = FOREACH filtered_data_limit GENERATE (int) review_overall as review_overall_bin, beer_beerID as beer_beerID;

 

— show 30 dòng của dữ liệu

A2_limit = LIMIT A2 30;

— nhóm lại bảng theo review_overall_bin

B2 = GROUP A2 by review_overall_bin;

— chỉ lấy 5 giá trị trong review_overall

B2_limit = LIMIT B2 5;

— đếm số lượng của beer_beerID và chỉ lấy các beer_beerID có giá trị khác nhau

C2 = foreach B2_limit {

unique_IDs = DISTINCT A2.beer_beerID;

generate group, COUNT(unique_IDs) as number_beer;

};

— lưu trữ kết quả vào một folder

STORE C2 into ‘s3://demohungemr/task2_result.csv’ using PigStorage(‘,’);

File pig latin: task2.pig

Ket qua File Pig latin task2 pig

Kết quả:

Kqua File Pig latin task2 pig

 

3 Đếm từng loại beer style có bao nhiêu beer khác nhau

Task3 – Code

–lấy dữ liệu gốc từ file csv

raw_data = LOAD ‘s3://demohungemr/BeerProject.csv’ using PigStorage(‘,’);

–tạo 1 bảng có cấu trúc 9 cột

structured_data = FOREACH raw_data  GENERATE  (float) $0 as beer_ABV, (int) $1 as beer_beerID, (int) $2 as beer_brewerID, $3 as beer_name, $4 as beer_style, (float) $5 as review_appearance, (float) $6 as review_palette, (float) $7 as review_overall, (float) $8 as review_taste, $9 as review_profileName;

–lọc dữ liệu trong bảng

filtered_data = FILTER structured_data BY review_overall is not null;

filtered_data = FILTER structured_data BY beer_beerID is not null;

–chọn 100000 dòng đầu tiên trong bảng

filtered_data_limit = LIMIT filtered_data 100000;

 

— duyệt các thành phần trong filtered_data_limit sau đó sinh ra một bảng mới chứa các cột beer_style có kiểu dữ liệu là chararray, beer_beerID

A3 = FOREACH filtered_data_limit GENERATE (chararray) beer_style as beer_style, beer_beerID as beer_beerID;

 

— group các dòng theo beer_style

B3 = GROUP A3 by beer_style;

 

— với mỗi beer_style, đếm số lượng của beerID và chỉ lấy ra các beer_beerID có ID khác nhau

C3 = FOREACH B3 {

         beer_ids = DISTINCT A3. beer_beerID;

         GENERATE group, COUNT(beer_ids) as number_beer;

};

— lưu trữ kết quả vào một folder

STORE C3 into ‘s3://demohungemr/task3’ using PigStorage(‘,’);

STORE C3

Và đây là kết quả:

STORE C3 kqua

Chân thành cảm ơn mọi người đã dành thời gian cho bài chia sẻ của Hùng Lê –  Cloud Consultant tại Renova Cloud. Mong bạn đã nhận được nhiều hữu ích từ bài viết này và đón chờ bài viết theo về Machine Learning bằng PySpark với AWS EMR nhé.

Hãy theo dõi Renova Cloud ngay để được cập nhật những thông tin mới nhất!

Tác giả: Hùng Lê –  Cloud Consultant tại Renova Cloud