playdata/weekly

[플레이데이터 데이터 엔지니어링 캠프 32기] 13주차 회고

soojin1 2024. 10. 6. 19:17

✏️ 학습내용

 

▶ 에어플로우와 데이터베이스 백엔드

 

 

Bitnami PostgreSQL 스택은 PostgreSQL 데이터베이스 서버를 쉽게 설치하고 실행할 수 있도록 Bitnami에서 제공하는 패키지입니다. 이 스택은 PostgreSQL 데이터베이스 뿐만 아니라, 이를 사용하기 위해 필요한 기본적인 설정과 다양한 유틸리티들을 포함하고 있어, 사용자가 PostgreSQL 환경을 쉽게 설정할 수 있도록 돕습니다.

 

 

$ docker pull postgres:12
$ docker run -d -e POSTGRES_DB=airflow_db \
-e POSTGRES_USER=airflow_user \
-e POSTGRES_PASSWORD=airflow_pass \
--name airpg \
-p 15432:5432 \
postgres:12

$ docker exec -it airpg bash
root@257d583d6d8d:/# psql -U airflow_user -d airflow_db
psql (12.20 (Debian 12.20-1.pgdg120+1))
Type "help" for help.

airflow_db=# airflow_db airflow_user
airflow_db-# \db
         List of tablespaces
    Name    |    Owner     | Location
------------+--------------+----------
 pg_default | airflow_user |
 pg_global  | airflow_user |
(2 rows)
 

 

에어플로우 새로 설치

air2 가상환경을 새로 만들었다.

$ pyenv virtualenv 3.11.9 air2
$ pyenv versions
	'
	'
	'

  air --> /home/sujin/.pyenv/versions/3.11.9/envs/air
  air2 --> /home/sujin/.pyenv/versions/3.11.9/envs/air2
 	'
 	'
 	'
    
$ pyenv global air2

 

 

$ pip install "apache-airflow[celery]==2.10.2" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.10.2/constraints-3.8.txt"

(제약조건을 사용하여 설치하는 명령어)

  • 이 명령어는 Celery를 통한 분산 작업 실행을 위해 Apache Airflow와 필요한 모든 종속성을, 호환 가능한 버전으로 설치하려는 의도를 가지고 있습니다.
  • Airflow Celery는 Apache Airflow에서 분산 작업 실행을 관리하기 위해 사용되는 기능입니다.

 

* Airflow에서 "Celery"는 분산 실행을 위해 사용되는 Executor의 일종으로, 작업(task)을 여러 작업자(worker) 노드에 분산시켜 병렬로 실행할 수 있는 기능을 제공합니다. Celery Executor는 특히 많은 작업을 동시에 실행해야 하거나 확장 가능성이 필요한 환경에서 사용됩니다.

 

[.zshrc 수정]

export AIRFLOW_HOME=~/airflow_celery
export AIRFLOW__CORE__LOAD_EXAMPLES=False
export AIRFLOW__CELERY__BROKER_URL=redis://:@localhost:16379/0
export AIRFLOW__CORE__EXECUTOR=CeleryExecutor
export AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow_user:airflow_pass@localhost:15432/airflow_db
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow_user:airflow_pass@localhost:15432/airflow_db

 

Redis 서버를 실행

* Redis(Remote Dictionary Server)는 오픈 소스 인메모리 데이터베이스로, 데이터를 빠르게 저장하고 조회하기 위한 용도로 설계된 시스템입니다. 주로 캐시, 메시지 브로커, 세션 저장소 등으로 많이 사용됩니다. Redis는 데이터를 메모리에 저장하기 때문에 데이터 접근 속도가 매우 빠르며, 다양한 데이터 구조를 지원합니다.

$ sudo docker run -d \
--name airflow_rd \
-p 16379:6379 \
-e REDIS_PASSWORD=airflow \
redis:5.0

 

psycopg2 설치

*psycopg2**는 PostgreSQL 데이터베이스에 연결하고 이를 사용하기 위해 Python에서 제공되는 가장 인기 있는 라이브러리입니다. 이 라이브러리를 통해 Python 애플리케이션에서 PostgreSQL 데이터베이스와 연결하여 데이터를 삽입, 조회, 업데이트, 삭제 등의 작업을 할 수 있습니다.

$ brew install libpq
$ brew link --force libpq
$ pip install psycopg2

 

$ airflow db init

$ docker exec -it airpg bash
root@7ca96b19b7e0:/# psql -U airflow_user -d airflow_db
psql (12.20 (Debian 12.20-1.pgdg120+1))
Type "help" for help.

airflow_db=# /dt

 

$ airflow users create -u test -p test -f test -l user -e test@user.com -r Admin
$ airflow users list
id | username | email         | first_name | last_name | roles
===+==========+===============+============+===========+======
1  | test     | test@user.com | test       | user      | Admin

 

에어플로우 접속

 

[Celery 실행]

# 1. 
$ cd $AIRFLOW_HOME
$ airflow scheduler -D --pid scheduler.pid

# 2.
$ airflow webserver -D --pid webserver.pid

# 3. 
$ airflow celery worker
-D --pid worker.pid -q q_1

# 4. 
$ airflow celery flower
-D --pid flower.pid

[DAG]

from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator,
    PythonVirtualenvOperator,
    BranchPythonOperator,
)

REQUIREMENTS = "git+https://github.com/sooj1n/mnist.git@0.4/model"

with DAG(
        'ml_worker',
    default_args={
        'depends_on_past': False,
        'retries': 0,
        'retry_delay': timedelta(seconds=3)
    },
    max_active_runs=5,
    description='ml_worker Dag',
    schedule="*/3 * * * *",
    start_date=datetime(2024, 9, 25),
    end_date=datetime(2024, 9, 26),
    catchup=True,
    tags=['ml','worker'],
) as dag:
    def prediction():
        from mnist.worker import run
        a=run()
        return a

    ml_worker = PythonVirtualenvOperator(
        task_id="ml_worker",
        python_callable = prediction,
        requirements=REQUIREMENTS,
        system_site_packages = False,
    )

    start = EmptyOperator(task_id='start')
    end = EmptyOperator(task_id='end')

    start >> ml_worker >> end

$ pip install virtualenv

 

이렇게해서 실행은 했는데,

mnist 도커 다시 실행시켜서 fastapi에서 사진 올리고 나서 기다려봤는데도 라인 알람이 안왔다.

그래서 로그를 봤는데

올렸는데 왜 자꾸 경로에 사진이 없다는건지..............................................

아직 해결못하겟아요

 

<해결> 

$ docker run -d --name mnist \
-e LINE_NOTI_TOKEN=5Lxv8ydixOz8P9D3saImy4xYSoBo0xvjaulAVtCjvQg \
-e DB_IP=172.17.0.1 -e DB_PORT=53306 \
-v /home/sujin/code/mnist/img:/home/sujin/code/mnist/img -p 8023:8080 sooj1n/mnist:0.6.3

내 로컬의 /home ~~@@ 에는 경로가 존재하지만 도커 내에는 경로가 존재하지 않아서 그렇다.

도커 실행시킬 때 -v 옵션으로 저장소를 연동(?) 시켜주면 된다.

 

🩷 좋았던 점

엄청난 휴일

.. 끝!

 

🥹 아쉬웠던 점

PostgreSQL라던지 celery라던지 완벽하게 이해는 못했다.

또 실습해보면 이해 되겠지 ~ ~ ~ ~