playdata/project

project 1. 영화 박스오피스 데이터 수집/처리/보관 및 활용

soojin1 2024. 8. 11. 23:33

프로젝트 내용

  • 영화 박스오피스 데이터 수집/처리/보관 및 활용에 대하여
  • 각각 단계에 대하여 파이썬 프로그램을 package(PIP설치) 단위로 개발
  • 개발 package 를 airflow 적용 및 운영

필수산출물

1.github ( 플레이데이터 레포 + TEAM REPO ) code ( Airflow + PIP )
플레이데이터 레포 : 통합 메인 AIRFLOW 코드 ( 팀레포 코드가 모두 연결 되도록 가이드 작성 필요)
TEAM REPO 각각 패키지 + 플레이데이터 메인 repo Fork ( 차후 팀 레포 만들로 완경성을 갖도록 최종 완료 후 포크)
2.github READMD.md(*) - 팀원에게 도움이 되는 문서 작성, 기존 작성 블로그 링크등 활용 OK
3.데이터 수집, 처리, 분석 PIP 팩키지 개발
4. Airflow 데이터 파이프라인 개발 / 운영
5. 회고록 - 프로젝트 마지막날 점심시간 20분간 진행 - 발표 자료 1 슬라이드 작성 포함
( 좋은점, 아쉬운점, 개선할점 - 다음에 한다면 이렇게)
6. 팀별로 지정된 년도의 수집 변환 처리 데이터 -> PARQUET
 

진행계획(1일차)

- github 배포전략 수립 ( dev release 생성 및 머지 전략 )
- Create GitHub milestones and projects (Kanban board)
- 작성 팩키지 분배
- 분배 팩키지 개발 진행 ( 1일차 오후 ) - README.md + 동작 보다는 배포 중심 껍데기 제작 + 껍데기 테스트
- 1차 배포 ( branch d1.X.X/<abc> -> release/d1.0.0 merge ( PR 제출 )
- 로컬 AIRFLOW 24 * 7 가동 설정 ( 오후5시 까지)
- 1일차 회고 ( 좋음점, 아쉬운점, 개선방안 ) + 포스트잇 + 칸반 보드 생성
 

 

 
나는 Extract 패키지 단계를 맡아 영화진흥위원회에서 제공하는 데이터를 API를 통해 받아오는 과정의 코드를 작성했다.
 
 

Branch 전략

 

 위와 같이 브랜치를 생성하여 프로젝트를 진행하고자 했다.

따라서 내가 맡은 Extract repo에서 브랜치를 먼저 생성했다.

 

여기서 첫 번째 문제 발생 ❗️

원격에서 생성한 branch를 로컬에서 git pull하면 받아와지는 줄 알았다. => 안됨.

그래서 로컬에서 똑같은 이름으로 브랜치를 생성해보았다 => 원격 브랜치와 연결 x, 이름만 같음, 충돌 발생할수도(?)

해결법은 원격 브랜치와 로컬 브랜치를 연결하는 것 이다. (추적 관계를 설정)

# 원격 브랜치 확인
$ git branch -r

# 방법 1)
$ git checkout -b <로컬> <원격>

# 방법 2)
$ git branch -t <로컬> <원격>

 

특별 미션 - Ice_breaking()

  • 모든 pip 팩키지에 Ice_breaking() 를 생성합니다
  • Ice_breaking() 를 호출하면 아스키아트로 변환된 팀원 사진이 나옵니다.
  • 1일차 AIRFLOW DAG 작성 시 모든 DAG TASK 출력 결과는 Ice_breaking() 을 호출하도록 만듭니다.
  • 여러 PIP 모듈에 Ice_breaking() 함수를 중복으로 만들지 않는 방법도 고민해 봅니다

 

갑자기 강사님이 특별 미션을 주셨다.

우리 팀은 패키지(extract, transform, load) 별로 각각 아스키아트로 변환된 사진을 출력하는 함수를 작성하고,

마지막 load 패키지에서 모듈을 호출하여 세 개의 아스아트를 출력하는 과정을 진행해보기로 했다.

main -> release/d1.0.0 -> dev/d1.0.0 브랜치에서 수행했다.

출력 결과는 Airflow Dag에 ice.task를 생성하여 로그로 확인할 수 있도록 했다.

 

(ice_breaking은 내가 Load 패키지에서 수행했다.) 

load 디렉토리를 만들고 새 파이썬 초기화를 위한 pdm init 후 가상환경을 설정해주었다.

$ pdm init

# 가상환경 활성화
$ source .venv/bin/activate

 

그리고 print문을 수행하는 ice_sj()를 생성했다.

def ice_sj():


    p= """
    @@@@@@@@@@%%%%##*+======++#%%%%%%%######%%%%%%%%%%%%%%%%%%###**###%%@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@@@%%%%#*+===========+*#######%%%%%%%%%%%%%%%%%%#*+++++++++++++*%%@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@@@%%%%#*+====++========+*##%%%%%%%%%%%%%%%%%%%%*++++==++++++++++++*#@@@@@@@@@@@@@@@@@@@@@@@@@@@@
@@@@%%%%%#+===+++++++========+#%%%%%%%%%%%%%%%#%#*=====++++++++++**++++*#%@@@@@@@@@@@@@@@@@@@@@@@@@@
@@%%%%%#*===++++++++++===-====+*#*#####%%%%%##*++=======++++++++**********#%%%@@@@@@@@@@@@@@@@@@@@@@
@%%%%%#+==++++++++++++=====================++==+++=====++++++**************#%%%@@@@@@@@@@@@@@@@@@@@@
%%%%#*+=++++++****++++=============================+++++++++++++***********#%%%%%%@@@@@@@@@@@@@@@@@%
%%##*+=++++++*****+++==============================+++++++++=+++++++********#%%%%%@@@@@@@@@@@@@@@@@@
%#*+==+++++*****++======================================++===========++*****#%%%%%%@@@@@@@@@@@@@@@@@
#*+==+++++***++====================-------------=======================++***#%%%%%%%@@@@@@@@@@@@@@@@
+++==+++***++==============-----------------------=======================+**##%%%%%%%@@@@@@@@@@@@@@@
+=+*+++***+===========-------------------------------===========-----=====+++*%%%%%%%%@@@@@@@@@@@@@@
+==+++**++=============---------------------------------===-----------=====+++#%%%%%%%%@@@@@@@@@@@@@
+++++**++==++=============--------------------------------------------=====++**#%%%%%%%%%@@@@@@@@@@@
+++***+++======++==========-----==--------------------------------=========++***#%%%%%%%@@@@@@@@@@@@
+****++++++++++++++===---------==========-------------------------=========+****##%%%%%%@@@@@@@@@@@@
****+++++++++++++=====-----------===========----------------------==========++***#%%%%%%@@@@@@@@@@@%
**#*+++======+++=======--------===-=========---------------------=======-===++***##%%%%%@@@@@@@@@@@%
##*++====================+++**###***+++=====---------------------====++***++++****#%%%%%@@@@@@@@@@%#
##*+===============++++*###%%###*##%#***++===--------------------==+***#%%###+++**#%%%%%@@@@@@@@@@##
#*+==============++*#%###%%%%%*#%######**++++==--------------------===+#%%%#+++++**%%%%@@@@@@@@@@%##
#*==------------==+*#%##%%%%%%%%%########*++=------------------------=+*%%%*+++++**#%%@@@@@@@@@@%###
#*+=--------------==+#####%%%%##*###**+++++==---------------=======---==+*##*+++++**%%@@@@@@@@@%####
%*+==---------==----===++*******++====++====------------==================++++++++**#%@@@@@@@@@%####
%#*===----------------------------===========--------==============+++++++++++++++**#@@@@@@@@@%#####
%#*+===-----=--====----------------==========------============++***####*******+++**%@@%@@@@@%######
%##*+===-================---------============-----------===++*##%%%%%########%%#**#%@@@@@@@%#######
%%#**+===================----------=============---------==+*%%%%%%%%%####***###%##%@@@@@@@@%#######
%%%****+======+++============---========++========--------=+*#%%%%%%%%%@@%#***#%%%#%%@@@@@@@@@@%%###
%%%#****++++++++++++++++========================--------====+*#%%%%%%%@%%######%%%*#%%@@@@@@@@@%%%%#
#@%%##****++++++++++++++=============++====+======----=======++**#######**########**%@@@@@@@@@%%%%%%
#%@%####****+++*****+**++++++++=++++++++=====================++++***#######%####%***#%%%%%%%%%%%%%%%
%#@%#####********************++++++++++++=++==================+++***##%%%%%%%%#*+++*#%%%%%%%%%%%%%%%
%%%@%########**************************++++++================++****######%%##*+++++*#%%%%%%%%%%%%%%%
#%%@%##########*********#****######****++++++++==========++++**#######***##***++++*#%%%%%%%%%%%%%%%%
#%%%%####***#####*****###*****#######****++++++++++=====+++*****#****#####****++**#%%%%%%%%%%%%%%%%%
%%%%%%#####****###***######******######*****+*+*++********################***###%%%%%%%%%%%%%%%%%%%%
%%%%%%%####***********###************#**##***************#***########**++**##%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%#####*********************************************+++++*********##%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%####**********************#*******************************####%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%####******+++******************************************#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%###*********++++**********************************+++*#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%###*******+++++++++************************++++++++**%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%####****++++++++++++++++++++++++*+++++++++++++++++++*#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%%%%%%%###****++++++++++++++++++++++++++++++++++++++++++++*#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@%%%%%%%%%####***+++++++++++++++++++=++++++++++++++++++++++++**#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%@
@@%%%%%%%%##******++++++++++++++++=========+++++=++++++++++++**#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%@@@
@@@@%%%%%%%#****+++++++++++++++=+++====================+++++++**#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%@@@@@
@@@@@@%%%%%##****+++++++++===++++=======================++++++**#%%%%%%%%%%%%%%%%%%%%%%%%%%%%%@@@@@@
@@@@@@@%%%%##****++++++++++=============================+++++++*#%%%%%%%%%%%%%%%%%%%%%%%%%%%@@@@@@@@"""

    print(p)

 

pyproject.toml 파일이다.

ice 명령을 통해 ice_sj()의 아스키아트가 출력되도록 구성하였다.

[project]
name = "Load"
version = "0.1.0"
description = "Default template for PDM package"
authors = [
    {name = "sooj1n", email = "sujin19371@gmail.com"},
]
dependencies = [
    "transform @ git+https://github.com/play-gogo/transform.git@sunwoo/d1.0.0",
]
requires-python = ">=3.9"
readme = "README.md"
license = {text = "MIT"}


[tool.pdm]
distribution = true

[project.scripts]
ice = "load.cli:ice_sj"

 

 

이 패키지를 사용하기 위한 방식을 README.md에 추가하였다.

다음은 transform 패키지에 load 패키지를 설치했다.

여기서 문제2 발생❗️

(README.md 작성 전 발생한 문제임.)

pip install git+ ~~ 를 통해 설치했기 때문에 dependency를 위한 pdm add는 설치된 pip명을 통해 수행하면 될 줄 알았다.

따라서

$ pdm add load 

를 수행했는데, 설치가 엄~~~청 오래걸렸다.

분명 가상환경 활성화를 통해 깨끗한 환경을 만들어놓았기 때문에 연관되어 설치될 pip 모듈이 없을건데, 내가 설치하지도 않은 것들이 다 설치되고 있었다.

이유는 pdm add 또한 github을 통해 설치해야하는 것 이었다.

load같은 흔한(?) 패키지명은 무조건 존재하는 것이 있을것이고, 다운받아진건

https://pypi.org/project/load/ 

 

load

load python module from a file

pypi.org

이것이었다...

 

결론은 pdm add도 github 주소를 활용하자 !

 

transform 파이썬 코드에서는 ice_sj 함수를 다음과 같은 방식으로 호출하여 사용할 수 있다. 이후 extract 패키지에서도 마찬가지.

from load.cli import ice_sj

 

마지막 단계로는 airflow에 extract 패키지를 태워 로그를 출력해보는 것 이었다.

DAG파일을 다음과 같이 작성하였다.

# The DAG object; we'll need this to instantiate a DAG

from airflow import DAG
from datetime import datetime, timedelta
from textwrap import dedent

# Operators; we need this to operate!

from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator,
    PythonVirtualenvOperator,
    is_venv_installed,
    PythonVirtualenvOperator,
    BranchPythonOperator,
)


with DAG(
'ice_breaking',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': True,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
},
description='ice breaking DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 7, 20),
catchup=True,
tags=['ice'],
) as dag:


    def fun_ice(ds_nodash):
        from extrct.ice import ice_hun
        import os
        df = ice_hun()


    start = EmptyOperator(task_id='start')
    end = EmptyOperator(task_id='end', trigger_rule="all_done")




    ice = PythonVirtualenvOperator(
        task_id='ice.data',
        python_callable=fun_ice,
        system_site_packages=False,
        requirements=["git+https://github.com/play-gogo/Extract.git@d1/0.1.0"],
        )


    start >> ice >> end

 

 

여기서 문제3 발생❗️

이전 실습에서 계속해서 airflow를 사용하고 있었기 때문에 airflow dag 위치 이동 및 team 용 AIRFLOW 변경이 필요했다.

우선 airflow 모듈이 설치되어있는 환경에서 수행하였고, 공식가이드를 참고하여 수행할 수 있다.

AIRFLOW_HOME은 기본 디렉토리를 지정하는 부분인데 처음에 dag파일 바로 앞 경로까지 지정해서 오류가 발생하기도 했다. 

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html

 

Configuration Reference — Airflow Documentation

 

airflow.apache.org

$ tail -n 4 ~/.zshrc
# AIRFLOW
export AIRFLOW_HOME=~/airflow_team
export AIRFLOW__CORE__DAGS_FOLDER=~/code/team1/movie_airflow/dags
export AIRFLOW__CORE__LOAD_EXAMPLES=False

 

 

 (1일차 결과)

 

 

사실은 이 결과는 2일차 시작 직전 완성되었다.

왜냐하면 아직 진행되지도 않을 브랜치를 미리 만들고, 나는 그 브랜치에서 작업을 했더라고 다른 팀원은 메인에서 작업하고,,

merge에서 충돌이 발생하고 잘 해결하지 못했다.

협업 부족도 맞지만 브랜치에 대한 이해가 너무너무너무너무ㅜㅜㅜ부족했다.

그래서 우리 팀만 그 날 제출해야했던 릴리즈 노트를 제출하지 못했다..!

꼴지로 시작했으니 올라갈일만 남았다..!!! 라고 긍정적으로 생각했다 

진행계획(2일차)

- Daily 5-minute agile stand-up meeting ( 아침 )
- Pair programming ( 3인 1시간 진행 )
- 2차 배포 ( release/d2.0.0 ) - 오후 5시
- code review - focused on test cases ( v0.5 대상 )

 

 

2일차에는 패키지 개발을 시작했다.

먼저 최종 저장한 데이터로 어떤 인사이트를 도출해낼지 생각해보았다.

우리 조는 2020년 데이터를 맡았기 때문에, 전체적인 주제는 2020년동안 가장 인기가 많았던 영화로 정했다.

 

❗️사소한거긴 한데, 하나 짚고 넘어가자면 저번 실습에서도 헷갈려서 강사님께 도움을 청했다가 아차했던 것이 있었는데 또 실수했던!!

git clone 할 때 ssh 주소를 사용하지 않고 https를 사용하는 실수

나는 ssh key를 미리 등록해두었다. 이렇게 확인할 수 있다.

$ ls -al ~/.ssh

 따라서 git push를 할 때 추가적인 인증이 필요없는 것 이었는데, https 주소로 실행하면 push때 마다 추가 인증이 필요하다.

 

❗️그리고 branch 적용이 익숙하지 않아서 commit, merge 등을 되돌려야 하는 상황이 많이 있었다. 그래서 명령어를 정리해두었다.

# 로그 확인
$ git log

# pull 취소하고 되돌리기
$ git reset  --hard <log>

# merge 취소하고 되돌리기
$ git reset --merge <log>

# commit 취소하고 되돌리기
$ git reset --hard <log>

# add 취소하고 되돌리기
$ git reset HEAD <log>

 

 

그리고 내가 맡은 Extract 패키지 개발을 시작했다. 전체적인 코드는 다음과 같다.

import pandas as pd
import requests
import os
import json
from pprint import pprint
def get_key():
    """영화진흥위원회 가입 및 API 키 생성 후 환경변수 선언 필요"""
    key = os.getenv('MOVIE_API_KEY')
    return key

def gen_url(dt="20200101"):
    base_url = "http://www.kobis.or.kr/kobisopenapi/webservice/rest/boxoffice/searchDailyBoxOfficeList.json"
    key = get_key()
    url = f"{base_url}?key={key}&targetDt={dt}"

    return url

def req(load_dt="20200101"):
    url = gen_url(load_dt)
    r = requests.get(url)
    code = r.status_code
    data = r.json()
    return code, data

def req2list(load_dt='20200101') -> list:
    _,data = req(load_dt)
    l=data['boxOfficeResult']['dailyBoxOfficeList']
    return l

def list2df(load_dt='20120101'):
    l = req2list(load_dt)
    df = pd.DataFrame(l)
    df['load_dt']=load_dt
    return df

 

수업시간에 실습했던 코드를 거의 그대로 사용했다.

요약하자면 환경변수로 설정해놓은 API_KEY를 사용해 영화진흥위원회에 데이터를 요청하여 json 형식으로 받은 데이터를 데이터프레임형식으로 저장하여 최종적으로 리턴하였다.

 

pyproject.toml은 이렇게 작성했다.

[project]
name = "Extrct"
version = "0.2.0"
description = "Default template for PDM package"
authors = [
    {name = "hun0219", email = "gsw98000@gmail.com"},
]
requires-python = ">=3.9"
readme = "README.md"
license = {text = "MIT"}
dependencies = [
    "pandas>=2.2.2",
    "requests>=2.32.3",
]

[project.scripts]
#ice='extrct.ice:ice_hun'
data = 'extrct.mov:list2df'

[tool.pdm]
distribution = true

 

위를 바탕으로 READMD.md는 이렇게 작성했다.

Box Office Data Retrieval

  • 이 프로젝트는 영화진흥위원회(KOFIC) API를 사용하여 박스오피스 데이터를 가져오고 이를 Pandas 데이터프레임으로 변환하는 작업을 수행합니다. 이 문서에서는 사용 방법, 설치 요구 사항, 그리고 각 함수의 설명을 제공합니다.

설치 요구 사항

  • python 3.9 이상
  • pandas 라이브러리
  • requests 라이브러리

이 라이브러리들은 'pip'을 사용하여 설치할 수 있습니다.

$ pip install pandas requests

환경 변수 설정

  • API 키를 사용하기 위해 환경 변수를 설정해야 합니다. API 키는 영화진흥위원회 API 서비스에서 가입 후 생성할 수 있습니다.
export MOVIE_API_KEY='your_api_key_here'

설치방법

$ pip install git+https://github.com/play-gogo/Extract.git@d2/0.1.0

가상환경에서 설치방법

$ pdm add git+https://github.com/play-gogo/Extract.git@d2/0.1.0

호출방법

$ data

 

 팀원 분들이 생성한 다른 패키지는 파이썬 코드만 기록해두겠다.

 

transform

from extrct.mov import list2df
import pandas as pd

def transform(load_dt):
    read_df = list2df(load_dt=load_dt)
    
    cols = ['movieCd', #영화의 대표코드를 출력합니다.
            'movieNm', #영화명(국문)을 출력합니다.
            'openDt', #영화의 개봉일을 출력합니다.
            'rank', #해당일자의 박스오피스 순위를 출력합니다.
            'showCnt', #해당일자에 상영된 횟수를 출력합니다.
            'audiCnt', #해당일의 관객수를 출력합니다.
            'salesAmt', #해당일의 매출액을 출력합니다.
            'audiAcc', #누적관객수를 출력합니다.
            'salesAcc', #누적매출액을 출력합니다.
            'salesShare', #해당일자 상영작의 매출총액 대비 해당 영화의 매출비율을 출력합니다.
            'load_dt'
            ]
    
    df = read_df[cols]
    
    #데이터타입 변환 
    #df['load_dt'] = pd.to_datetime(df['load_dt'])
    df['month'] = pd.to_datetime(df['load_dt']).dt.month
    df[['rank', 'showCnt', 'audiCnt', 'salesAmt', 'audiAcc', 'salesAcc', 'salesShare']] = df[['rank', 'showCnt', 'audiCnt', 'salesAmt', 'audiAcc', 'salesAcc', 'salesShare']].astype(float).astype(int)
    df = df.sort_values(by='audiCnt', ascending=False)
    return df

 

 

load

(1년치 데이터를 날짜를 파티션으로 저장하려니 정리가 되지 않는 느낌이기도 하고, 월 별 데이터 차이점을 알아보고 싶어서 파티션 컬럼을 month와 load_dt 두 가지를 사용했다.

from transform.transform import transform
import pandas as pd


def load(load_dt):
    df = transform(load_dt)
    df.to_parquet("~/code/playgogo/storage", partition_cols=['month','load_dt'])

 

airflow DAG

Extract 패키지 작성 후 팀원들이 다음 작업을 진행하는 동안 airflow 뼈대를 만들고 Extract에 대한 테스트를 airflow에 직접 태워보며 진행했다.

주요 작업 태스크는 PythonVirtualenvOperator로 구현했다.

연결된 파이썬 코드는 fun_<task> 형식으로 이름을 지정해주었다.

각각의 태스크에서 패키지를 호출하여 기능을 사용하고자 했다.

# The DAG object; we'll need this to instantiate a DAG

from airflow import DAG
from datetime import datetime, timedelta
from textwrap import dedent

# Operators; we need this to operate!

from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import (
    ExternalPythonOperator,
    PythonOperator,
    PythonVirtualenvOperator,
    is_venv_installed,
    PythonVirtualenvOperator,
    BranchPythonOperator,
)


with DAG(
'movie',
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
'depends_on_past': True,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
},
description='movie 2020 DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2020, 1, 1),
end_date=datetime(2020, 12, 31),
catchup=True,
tags=['movie'],
) as dag:

    def fun_ext(dt):
        print('*'*30)
        print(dt)
        print('*'*30)
        from extrct.mov import list2df
        df=list2df(load_dt=dt)
        print(df)


    def fun_trans(dt):
        print('*'*30)
        print(dt)
        print('*'*30)
        from transform.transform import transform
        df=transform(load_dt=dt)
        #return df
        #print(df['month'].head(5))
        print(df.head(10))

    def fun_load(dt):
        print('*'*30)
        print(dt)
        print('*'*30)

        from load.load import load
        load(load_dt=dt)
        

    start = EmptyOperator(task_id='start')
    end = EmptyOperator(task_id='end', trigger_rule="all_done")

    extract = PythonVirtualenvOperator(
            task_id='extract',
            python_callable=fun_ext,
            requirements=["git+https://github.com/play-gogo/Extract.git@d2/0.1.0"],
            system_site_packages=False,
            op_args=["{{ds_nodash}}"]
    )


    transform = PythonVirtualenvOperator(
            task_id='transform',
            python_callable=fun_trans,
            requirements=['git+https://github.com/play-gogo/transform.git@d2.0.0/test'],
            system_site_packages=False,
            op_args=["{{ds_nodash}}"]
    )

    load = PythonVirtualenvOperator(
            task_id='load',
            python_callable=fun_load,
            requirements=["git+https://github.com/play-gogo/load.git@d2/0.1.0"],
            system_site_packages=False,
            op_args=["{{ds_nodash}}"]
    )


    start >> extract >> transform >> load >> end

 

2일차의 집에 늦게 간 이유!!!

우리 팀이 원했던 parquet 데이터 저장 tree 구조는

month =1

ㄴload_dt=20200101

    ㄴ .parquet

ㄴload_dt=20200102

    ㄴ .parquet

ㄴload_dt=20200103 

    ㄴ .parquet

 ...

 

month=12

ㄴload_dt=20201201

    ㄴ .parquet

ㄴload_dt=20201202

    ㄴ .parquet

ㄴload_dt=20201203

    ㄴ .parquet

 

인데

month=1로 테스트했을 때, load_dt디렉토리가 하나 생성될 때 마다  load_dt=20200101 디렉토리에 parquet 데이터가 생성됐다.

그러니까 20200103까지 airflow를 실행시켰을 때

month =1

ㄴload_dt=20200101

    ㄴ .parquet

    ㄴ .parquet

    ㄴ .parquet

ㄴload_dt=20200102

    ㄴ .parquet

ㄴload_dt=20200103 

    ㄴ .parquet

 

이렇게 데이터가 저장이 되어있었다.

 

결국 찾아낸 원인은 Load 패키지 파이썬 파일이었다.

from transform.transform import transform
import pandas as pd


def load(load_dt='20200101'):
    df = transform(load_dt)
    df.to_parquet("~/code/playgogo/storage", partition_cols=['month','load_dt'])


load()

 가장 마지막 줄인 load() 함수 호출 부분은 테스트를 위한 코드였는데 팀원 분이 지우지 않고 push하셨다.

(이래서 pytest 쓰는건가..!) 아무튼

나는 에어플로우의 def_load에서 임포트한거는 load 함수이기 때문에 def_load 바깥 영역인 함수호출 부분은 실행되지 않을 줄 알았는데 실행이 되고 있었던 것이다. 그리고 파라미터에 20200101로 디폴트 값이 정해져있었기 때문에 해당 디렉토리에 값이 쌓이는 것 이었다.

실행 과정

  1. 임포트 문: from load.load import load와 같은 문이 실행되면, 파이썬은 load.py 모듈을 임포트합니다.
  2. 모듈 로드: 파이썬은 load.py 파일을 읽습니다.
  3. 최상위 코드 실행:
    • 함수 정의: 파이썬은 load 함수를 정의합니다. 이 단계에서는 함수 코드가 실행되지 않고, 함수가 호출 가능한 상태가 됩니다.
    • 함수 호출: load() 함수 호출은 최상위 코드에 포함됩니다. 이 코드가 즉시 실행되기 때문에 load() 함수가 기본 매개변수 '20200101'으로 호출됩니다.

 다음은 임포트 문이 실행되는 과정이다.

요약하자면 load() 함수 호출이 임포트 시 실행되는 이유는 이 호출이 함수나 클래스 정의 외부에 위치해 있기 때문이었다. 파이썬에서는 함수나 클래스 정의 외의 코드를 모듈이 처음 로드될 때 실행합니다.

 

이렇게 또 어이없는 실수로 2시간 순삭

하지만 새로운 사실 알아냈다~! 2일차 끝..

 

진행계획(3일차)

-  Daily 5-minute agile stand-up meeting
- 3차 배포 ( release/d3.0.0 ) - 오전
- 회고 ( Having lunch together + )
- 4차 최종 배포 및 버그 픽스 ( 오후 3시 )
- 최종발표 및 시연 ( 오후 4시 )

 

 

마지막 3일차 우리 조의 목표였다.

  • 데이터 멱등성을 위한 BranchOperator 생성
  • 가능하다면 데이터 시각화 (Optional) 판다스 이용
  • 데이터 분석 > 인사이트 도출
  • 피피티 1장 작성 (발표용)

내가 멱등성 작업을 위한 코드를 맡았다.

airflow DAG 수정 사항만 기록해보겠다.

def branch_fun(ds_nodash):
	import os
	home_dir = os.path.expanduser("~")
	month=int(ds_nodash[4:6])
	path = os.path.join(home_dir, f"code/playgogo/storage/month={month}/load_dt={ds_nodash}")
	print('*' * 30)
	print(path)
	print('*' * 30)   

	if os.path.exists(path):
		print('존재')
		return "rm.dir" #rmdir.task_id
	else:
		print('존재x')
   		return "load"

branch_op = BranchPythonOperator(
		task_id="branch.op",
        python_callable=branch_fun
    	)
        
rm_dir = BashOperator(
		task_id='rm.dir',
		bash_command="""
        month=$(echo "{{ ds_nodash[4:6] }}" | awk '{print $1+0}');
        echo $month
 		echo code/playgogo/storage/month=$month/load_dt={{ds_nodash}}
		rm -rf ~/code/playgogo/storage/month=$month/load_dt={{ds_nodash}}
        """
    	)

start >> extract >> transform 
transform >> branch_op
branch_op >> rm_dir
branch_op >> load

rm_dir >> load >> end

요약하자면

branch_op와 rm_dir 디렉토리를 생성해서 branch_op에서 지정한 경로에 파일이 있으면 rm_dir 태스크를 통해 디렉토리를 삭제하고 Load 태스크를 통해 다시 데이터를 받아왔다. 만약 파일이 없으면 rm_dir을 생략하고 load 를 수행한다.

 

month=int(ds_nodash[4:6])

branch_fun의 일부인데, 이거는 20200101 형식인 ds_nodash 변수 값을 먼저 월 부분으로 자르고[4:6] , int로 변환해준 이유는 내 디렉토리는 month=1, month=2 이런식으로 구성되어있는데 [4:6]으로 월 부분을 추출하면 1월은 01월, 2월은 02월 이런식으로 되기 때문에 정확한 경로를 지정할 수 없기 때문이다. 

 

❗️ 그리고 해결하는데 오래걸렸던 부분이다.

 month=$(echo "{{ ds_nodash[4:6] }}" | awk '{print $1+0}');

이거는 rm_dir의 일부이다. 처음에는 간단하게 생각해서 branch_fun에서 지정한 month 변수값을 그냥 받아올 수 있을 줄 알았다. 근데 rm_dir 태스크가 실행되어도 실제로 디렉토리가 삭제되지는 않고 파일이 쌓였다. 이유는 branch_fun은 파이썬 환경의 변수이고 rm_dir은 bash 커맨드 부분이기 때문이다.

 

이렇게 위와 같은 플로우로 DAG 작성을 완료했고, 2020년의 데이터를 받아와 /home/sujin/code/playgogo/storage 위치에 월, 일로 파티셔닝된 parquet 데이터가 저장되었다.

 

이렇게 저장한 데이터를 활용하여 판다스 시각화를 통해 인사이트를 도출하고자 했다

나는 '관객수 기준 월별 가장 인기있었던 영화'를 시각화를 통해 나타내었다.

import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import font_manager, rc

import seaborn as sns

#monthly_top_movies= monthly_top_movies.sort_values('month')
path= '/usr/share/fonts/truetype/nanum/NanumGothic.ttf'
font_name = font_manager.FontProperties(fname=path).get_name()
rc('font', family=font_name)
#print(font_name)

from matplotlib import font_manager
fontprop = font_manager.FontProperties(fname=path,size=0.5)
#시각화
plt.figure(figsize=(12, 10))

#월별로 가장 인기 있는 영화의 인기도를 막대 그래프로 시각화
sns.barplot(data=sort_cnt, x='movieNm', y='audiCnt')

plt.title('2020년 관객수 Top 10 영화', fontsize=15)
plt.xlabel('영화명', fontsize=15)
plt.ylabel('총 관객수', fontsize=15)
plt.xticks(rotation=45,fontsize=15)
plt.yticks(ticks=[0, 2500000, 5000000], fontsize=15)
plt.ticklabel_format(style='plain', axis='y')

#plt.legend(title='movieNm', bbox_to_anchor=(1.05, 1), loc='upper left', prop=fontprop)
plt.tight_layout()

plt.show()

matplotlib, seaborn을 통해서 시각화하는 부분은 어렵지 않게 수행할 수 있었다.

 

간단하게 코드에 대한 설명을 적어놓겠다.

plt.figure(figsize=(12, 10)): 새로운 그림(figure)을 생성하고 크기를 설정한다. 여기서 크기는 12x10 인치이다.

sns.barplot: Seaborn 라이브러리를 사용하여 막대 그래프를 생성한다.

 

  • data=sort_cnt: 시각화할 데이터프레임을 지정
  • x='movieNm': x축에 표시할 데이터 열을 지정, 여기서는 영화명이다.
  • y='audiCnt': y축에 표시할 데이터 열을 지정, 여기서는 총 관객수이다.

 

  • plt.title: 그래프의 제목을 설정한다. 제목의 폰트 크기는 15로 설정되었다.
  • plt.xlabel: x축의 레이블을 설정한다.
  • plt.ylabel: y축의 레이블을 설정한다.

 

  • plt.xticks(rotation=45, fontsize=15): x축의 레이블을 45도 회전시키고 폰트 크기를 15로 설정한다. 긴 영화 제목을 읽기 쉽게 하기 위해 회전시켰다.
  • plt.yticks(ticks=[0, 2500000, 5000000], fontsize=15): y축의 레이블을 사용자 지정 값으로 설정한다. 여기에선 0, 250만, 500만으로 설정했다.
  • plt.ticklabel_format(style='plain', axis='y'): y축의 숫자 표기를 지수 형태가 아닌 일반 숫자로 설정한다. 대규모 수치를 보다 쉽게 읽을 수 있도록 했다.

 

  • plt.tight_layout(): 그래프의 레이아웃을 자동으로 조정하여 요소들이 겹치지 않도록 한다.
  • plt.show(): 그래프를 화면에 표시한다.

❗️ 정말 애를 먹었던 부분은 사실 폰트 설정이다..

from matplotlib import font_manager, rc
path = '/usr/share/fonts/truetype/nanum/NanumGothic.ttf'
font_name = font_manager.FontProperties(fname=path).get_name()
rc('font', family=font_name)

 

 

 

 

맥 사용하는 팀원분은 Apple 폰트로 손 쉽게 가능했는데 윈도우로 작업하는 나는 우선 적절(?)적당(?)한 폰트 먼저 다운받았다. (나눔고딕체) 이거 원래 기본으로 있는거 아닌가.. 아무튼

 

$ curl -o nanumfont.zip http://cdn.naver.com/naver/NanumFont/fontfiles/NanumFont_TTF_ALL.zip
$ sudo unzip -d /usr/share/fonts/nanum nanumfont.zip

$ cd usr/share/fonts
$ sudo apt-get update
$ sudo apt-get install fonts-nanum*

 

분명히!!오류없이 설치가 완료됐다는데 계속해서 path를 불러들이지 못했다. 삭제설치삭제설치삭제설치무한반복

그러다가 한 블로그 글을 봤는데 캐시를 삭제했더니 됐다고 해서 정말 마지막 희망으로 따라해보았다.

 

$ cd ~/.cache/matplotlib
$ rm ~/.cache/matplotlib/fontlist-v390.json

 

이게됐다.............. 한글 깨짐 이슈로 한시간 넘게 투자했다.거의 두시간

그렇게 완성된 그래프@

 

그리고 발표를 위한 ppt 파일을 만들었다.

나는 회고 부분을 맡아 발표했다. 역시 발표는 너무 떨린다.

 

그리고 팀원 중 한 분인 선우님은 마케팅 경력이 있으신 분이었는데, 마지막 페이지의 내용을 챗지피티 없이 머릿속에서 나온 아이디라고 하셨다.. 정말 좋은 아이디어에 퀄리티도 짱이었는데 챗지피티가 아니라고 하셔서 넘 놀랬던 기억이..

 

아무튼 이렇게 3일간의 첫 번째 프로젝트 끝!!

 

<느낀점>

한달 간 배웠던 내용이 나에게 익숙한 내용도 아니고 쉽지는 않았지만 계속 반복해가면서 익혔다고 생각햇는데, 역시나 강사님의 도움없이 혼자 해보려니 버벅거리는 부분도 많고 복붙한 부분도 많았다. 그래도 내용을 다 이해했으니 복붙해서 활용할 수 있는거긴 하니 그건 만족

 

또 git은 언제 익숙해질련지, 그래도 이번에 새로 알고 익힌 부분이 많은 것 같아서 그것도 만족

 

그리고 1일차에 우리 팀만 제출못한건 아직까지 약간 우울.. 오류가 너무 많이나고 해결책을 수행해도 해결이 안되니 너무 답답하고 의욕이 떨어졌었다. 오류가 안날 순 없지만 실력을 더 쌓아서 오류를 해결할 수 있는 사람이 되고싶다1!

 

README 파일 작성에 대한 중요성을 많이 말씀하셨는데!! 역시나 쓰기 어려웠다. 인터넷에 '리드미 잘 작성하는 방법'같은 글도 많이 있어서 시간내서 읽어보고 연습을 많이 해야겠다.

 

 

 

 

'playdata > project' 카테고리의 다른 글

project 3. AI Control System ( AI 관제 시스템 )  (5) 2024.10.14
project 2. Business-Chatting-System  (2) 2024.09.05