Save
Saving
  • Trung tran Trung tran

    Project airflow

    Kiến trúc hệ thống Airflow:

    Link github: https://github.com/eduhub123/ETL_Data
    Link present: https://docs.google.com/presentation/d/1YkFHazP5aX1QV3YO5wkiSlJtfCb1v4uB/edit#slide=id.g27fa31ae64b_0_17

    0c983b9a-0783-47bd-900b-e64b12b851e7-image.png
    46092a06-03c9-40d6-b188-e491a0d4aa74-image.png

    Cấu trúc project:

    Project Structure

    The folder structure for the Data Pipeline Framework can be organized as follows:

    ETL_DATA/
    │
    ├── dags/
    │   ├── etl_pipeline_example.py
    │   ├── etl_pipeline_example2.py
    │   └── ...
    │
    ├── operators/
    │   ├── __init__.py
    │   ├── data_extractor.py
    │   ├── data_transformer.py
    │   └── data_loader.py
    │
    ├── utils/
    │   ├── __init__.py
    │   ├── aws.py
    │   └── utils.py
    ├── tests/
    │   ├── __init__.py
    │   ├── operator_tests.py
    │   └── pipeline1_tests.py
    ├── requirements.txt
    ├── .env
    ├── .airflowignore
    ├── docker_file
    ├── docker-compose.yaml
    └── README.md
    

    Folder Structure Explanation:

    dags/ folder - Contains the main DAG files that define the ETL workflows.

    operators/ folder - Contains the custom Airflow operators specific to the ETL pipeline framework:

    • data_extractor.py: Implementation of the DataExtractorOperator that fetches data from an API and uploads it to an S3 bucket.

    • data_transformer.py: Implementation of the DataTransformerOperator that takes care of data cleansing, validation, and transformation tasks.

    • data_loader.py: Implementation of the RedshiftUpsertOperator that loads data into Redshift, and s3 staging tables and upserts the data into the appropriate target tables.

    utils/ folder - Contains utility functions to be used across the framework, e.g., interacting with Redshift.

    tests/ folder - Contains all the test files for custom operators, hooks, and utility functions.

    docker/ folder - Contains all the dockerfiles and docker-compose.yaml for deploy and run pipeline

    requirements.txt - A file that lists all the dependencies needed for the execution environment

    Các datapipeline hiện có:

    • Pipeline Event clevertap:
    • Pipeline Airbridge
    • Pipeline

    posted in Data_team read more
  • Trung tran Trung tran

    Hệ thống Đồng bộ và báo cáo:

    cd737fa7-0e0f-486c-aa36-05711ef5036d-image.png

    Hệ thống sử dụng kinesis để lắng nghe stream event từ người dùng, sau đó sử dụng lambda để đọc từ kinesis, và đọc từ DB là DynamoDB để xử lý dữ liệu đồng bộ và báo cáo

    Xử lý song song nhiều nhiều event

    d026b469-2d22-4d46-ba22-670377940a90-image.png
    Vài tại một thời điểm có thể đến cả 1000 event/s nên những event bắn lên cần phải xử lý song: Cần setup kinesis có nhiều shard, cho mỗi shard-id phải có nhiều hàm lambda song xử lý.
    https://aws.amazon.com/blogs/compute/new-aws-lambda-scaling-controls-for-kinesis-and-dynamodb-event-sources/

    Biểu đồ luồng:

    295d1841-0af3-4de4-9e7d-c8fb33e1b468-image.png

    Chi tiết luồng tương tác của: User, React, Unity, Backend, và Team data được trình bày ở hình trên.

    Thiết kế event:

    ac61e70d-dfb0-48eb-b27d-1f830586ad45-image.png

    Step1: Backend thiết kế event properties cần app bắn lên. Format:
    {
    "time_record": , //timestamp event is recorded
    "event_name": ,
    "properties": {
    // event properties
    }
    }
    Step2: Bên Unity /React nhận thiết kế để bắn lên Kinesis. Lưu ý về partion key của event.
    Step3: Check kinesis

    Cấu trúc của project:

    ├── README.md          <- The top-level README for developers using this project.
    ├── configs
    │   ├── configs.ini    <- Config for running mode
    │   ├── dev_configs.ini    <- Config for dev mode
    │   └── production_configs.ini  <- Production mode config
    │
    │
    ├── requirements.txt   <- The requirements file for reproducing the analysis environment
    │
    ├── src                <- Source code for use in this project.
    │   ├── __init__.py    <- Makes src a Python module
    │   │
    │   ├── api            <- Script for API
    │   │   ├── parent_report  <- Scripts for parent report 
    │   │   ├── sync_mj_app  <- Scripts for get data lesson 
    │   │   ├── homework    <- Scripts for homework report
    │   │   ├── upgrade    <- Scripts for upgrade MJ4 firestore to DynamoDB
    │   │   ├── homework    <- Scripts for homework report
    │   │   └── user_activity     <- Scripts for user activity report
    │   ├── event          <- Scripts to process event from user
    │   │   ├── event.py
    │   │   ├── lesson_event.py
    │   │   ├── change_topic.py
    │   │   ├── handler.py  <- Scripts for  handle event
    │   │   └── activity_event.py
    │   │
    │   ├── report         <- Scripts to process all objects of report  
    │   │   │          
    │   │   ├── lesson.py
    │   │   ├── lesson_learned.py
    │   │   ├── story.py
    │   │   ├── story_learned.py
    │   │   ├── activity.py
    │   │   ├── skill_learned.py
    │   │   └── parameters.py
    │   ├── sync_data         <- Scripts to process all objects for sync_data  
    │   │   │          
    │   │   ├── lesson.py
    │   │   ├── lesson_sync.py
    │   │   ├── course.py
    │   │   ├── homework.py
    │   │   ├── score.py
    │   │   ├── weely_challenge.py
    │   │   └── award.py
    │   │
    │   ├── services       <- Scripts to connect and work with  AWS services, Redis.   
    │   │   │          
    │   │   └── aws.py
    │   │
    │   └── utils          <- Scripts to some commo fucntions
    │       └── visualize.py
    │
    ├── lambda_function.py <- Lambda function handlers for processing streaming event
    ├── configs.py         <- Read and processing services 
    └── main.py            <- Main file for starting API server.
    

    Xử lý khi có event mới

    1. Khi có event mới trong folder src/event tạo 1 class mới cho event, class mới này kế thừa từ base class event.
      Ví dụ:
      722dc3be-4caa-40eb-ade8-6a8092e35d6a-image.png
    2. Trong file src/event/handler.py viết function để xử lý event:
      651ab0ee-90d5-4a1a-9e90-f438e5f95182-image.png
    3. Cập nhật trong hàm lambda_function.py, để khi có event này được bắn lên Lambda sẽ bắt và gọi đến logic code trong handler để xử lý:
      39439abe-abd0-4316-a687-67863989d823-image.png
      Ví dụ dòng code: 85-87

    Viết API, call API:

    Trong folder src/api có chia thành các folder tương ứng với api cho các project khác nhau sử dụng Fast API
    Trong trường hợp cần cập logic của báo học tập thì vào folder: src/api/parent_report để sửa.

    Api call:

    9a66d984-5f31-43fc-8381-dca9d30c600c-image.png

    Deployment:

    deploy lambda:

    Zip code and deploy lambda :
    +) Lambda dev: https://ap-southeast-1.console.aws.amazon.com/lambda/home?region=ap-southeast-1#/functions/handle_app_event?tab=code
    +) Labmda live: https://ap-southeast-1.console.aws.amazon.com/lambda/home?region=ap-southeast-1#/functions/handle_mj_app_event?tab=code
    CI/CD:
    Testing:

    deploy API:

    MJ 4.0 parent reporting

    This is repository of the Parent report of Monkey Junior 4.0

    How to use

    To run the local with dev mode please install docker before, after that use 'docker-compose-dev.yaml'

    docker-compose up -d --build
    

    Develop mode

    Running the source with develop model. Following step by step:

    1. Create new python venv
    python3 -m venv venv 
    
    source venv/bin/active 
    
    1. Install requirements
    pip3 instal -r requirements.txt
    
    1. Start services by running the main file

    Default will be ran with dev configs

    python3 main.py
    

    Production model

    To export model for production use docker. Please setup and install docker:

    docker-compose up -d --build 
    

    posted in Data_team read more