playdata/weekly

[플레이데이터 데이터 엔지니어링 캠프 32기] 4주차 회고

soojin1 2024. 8. 4. 23:47

✏️ 학습내용

 

7.29(월)

[영화진흥위원회 습격사건🔫]

새로운 프로젝트 귀여운 이름을 지으셨다

이렇게 생긴 파이프라인을 생성했다.

영화진흥위원회 사이트에서 키를 발급받았다.

이 키를 활용하여 get.data 단계에서 데이터를 받아올 것 이고,

save.data에서는 ..! 잘 모르겠어서 나중에 알게되면 적겠다.

 

$ pdm add -dG test pytest pytest-cov

여기서도 pytest를 활용할 것 이기 때문에 테스트 환경 설정 먼저 해줬다.

$ pdm add request

그리고 requests 패키지를 프로젝트에 추가했다. requests는 HTTP 요청을 보내기 위해 사용되는 파이썬 라이브러리이다.

$ export MOVIE_API_KEY= " ~~~ "

export는 환경변수를 설정하는데 사용되는 명령어이다. export 없이 선언한 변수는 현재 셸 세션에서만 유효하고, 다른 프로세스에서는 읽을 수 없다. 내 키는 보안사항(?)이기 때문에 셸 설정 파일(.zshrc)에 코드를 작성하여 환경변수로 설정하고 셸을 다시 시작할 때마다 자동으로 실행되게하고 깃헙에 올리도록 한다. 

src/src/mov/api/call.py

def get_key():
    """영화진흥위원회 가입 및 API 키 생성 후 환경변수 선언 필요"""
    key = os.getenv('MOVIE_API_KEY')
    return key

def gen_url():
    base_url = "http://www.kobis.or.kr/kobisopenapi/webservice/rest/boxoffice/searchDailyBoxOfficeList.json"
    key = get_key()
    url= f"{base_url}?key={key}&targetDt={'20120101}"
    
    return url
    
import requests
def req():
    url = gen_url('20240720')
    r = requests.get(url)
    code = r.status_code
    data = r.json()
    print(data)
    return code, data
    
def req2list() -> list:
    _,data = req()
    l=data['boxOfficeResult']['dailyBoxOfficeList']
    return l
    
def list2df():
    l = req2list()
    df = pd.DataFrame(l)
    return df

mov/tests/test_call.py

: call.py 코드가 잘 작성되었는지, 오류가 없는지 pytest를 통해 확인할 코드

 

from src.mov.api.call import gen_url,get_key,req
  
def test_비밀키숨기기():
    key = get_key()
    assert key

def test_gen_url():
    url = gen_url()
    #assert url == "http://www.kobis.or.kr/kobisopenapi/webservice/rest/boxoffice/searchDailyBoxOfficeList.json?key=8ece39e938f850f4099b0d6f34095640&targetDt=20120101"
    assert "http" in url
    assert "kobis" in url
    
    url = gen_url('20120101')
    
def test_req():
    code, data = req()
    assert code == 200

def test_req2list():
    l = req2list()
    assert len(l) > 0
    v = l[0]
    assert 'rnum' in v.keys()
    assert v['rnum'] == '1'
    
def test_list2df():
    df = list2df()
    print(df)
    assert isinstance(df, pd.DataFrame)
    assert 'rnum' in df.columns
    assert 'openDt' in df.columns
  • test_로 시작하는 함수를 pytest가 실행한다.
  • 한국말로 함수 이름 작성 가능하다.

----------------------------------------------------------------------------------------------------------------------

def save2df():
    df = list2df()
    # df에 load_dt 컬럼 추가 (조회 일자 YYYYMMDD 형식으로)
    # 아래 파일 저장 시 load_dt 기분으로 파티셔닝
    df['load_dt'] = '20120101'
    print(df.head(5))
    df.to_parquet('~/tmp/test_parquet',partition_cols=['load_dt'])
    return df

 

from src.mov.api.call import gen_url, req, get_key, req2list, list2df, save2df

def test_save2df():
    df = save2df(load_dt='20241231')
    assert isinstance(df, pd.DataFrame)
    assert 'load_dt' in df.columns

 

list2df 함수의 결과로부터 데이터프레임을 생성하고, 특정 컬럼을 추가하여 parquet 형식으로 저장하는 함수이다.

save2df() 함수가 잘 실행되었는지 확인할 test_save2df()도 생성했다.

만약 20241231 날짜에 해당하는 데이터에 대하여 save2df()를 실행시켰을 때 데이터프레임을 반환하고, 반환된 DataFrame에 load_dt 컬럼이 포함되어 있는지를 확인한다.

 

FAILED tests/test_call.py::test_save2df - ImportError: Unable to find a usable engine; tried using: 'pyarrow', 'fastparq...

$ pdm add pyarrow
$ pytest

 

pytest 수행 시 오류가 발생하여 pyarrow를 설치해주었다.

 

 

*임의의 날짜가 아닌 파이프라인 수행 날짜 적용 => load_dt 추가

import requests
import os
import pandas as pd

def save2df(load_dt='20120101'):
    """airflow 호출 지점"""
    df = list2df(load_dt)
    # df에 load_dt 컬럼 추가 (조회 일자 YYYYMMDD 형식으로)
    # 아래 파일 저장 시 load_dt 기분으로 파티셔닝
    df['load_dt'] = load_dt
    print(df.head(5))
    df.to_parquet('~/tmp/test_parquet',partition_cols=['load_dt'])
    return df

def list2df(load_dt='20120101'):
    l = req2list(load_dt)
    df = pd.DataFrame(l)
    return df

def req2list(load_dt='20120101') -> list:
    _,data = req(load_dt)
    l=data['boxOfficeResult']['dailyBoxOfficeList']
    return l

def get_key():
    """영화진흥위원회 가입 및 API 키 생성 후 환경변수 선언 필요"""
    key = os.getenv('MOVIE_API_KEY')
    return key

def req(load_dt="20120101"):
   # url = gen_url('20240720')
    url = gen_url(load_dt)
    r = requests.get(url)
    code = r.status_code
    data = r.json()
    print(data)
    return code, data

def gen_url(load_dt="20120101"):
    base_url = "http://www.kobis.or.kr/kobisopenapi/webservice/rest/boxoffice/searchDailyBoxOfficeList.json"
    key = get_key()
    url= f"{base_url}?key={key}&targetDt={load_dt}"
    
    return url

 

 

- PythonOperator

 

요약) Apache Airflow의 PythonOperator는 파이썬 함수를 워크플로의 태스크로 실행할 수 있게 해주는 Airflow의 핵심 컴포넌트 중 하나입니다. PythonOperator를 사용하면 Airflow DAG(Directed Acyclic Graph)에서 파이썬 코드를 실행하는 작업을 정의할 수 있습니다.

from airflow.operators.python import PythonOperator
$ vi ~/airflow/dags/mov

def get_data(ds, **kwargs):
        print(ds)
        print(kwargs)
        print(f"ds_nodash =>{kwargs['ds_nodash']}")
        print(f"kwargs type => {type(kwargs)}")
        from mov.api.call import get_key,save2df
        key=get_key()
        print(f"MOVIE_API_KEY => {key}")
        YYYYMMDD=kwargs['ds_nodash']
        df=save2df(YYYYMMDD)
        print(df.head(5))


    def print_context(ds=None, **kwargs):
        pprint(kwargs)
        print(ds)

 

**kwargs : 딕셔너리 형태로 전달된다. 함수가 호출될 때 전달되는 키워드 인자들을 모두 받아서 처리할 수 있다.

처음 이 코드를 실행시키면 from mov.api.call import get_key,save2df 부분에서 모듈이 없다는 에러가 발생한다.

$ pip install -I git+https://github.com/sooj1n/mov.git@0.2/api

 

패키지 설치를 통해 오류를 해결한다.

 

run_this = PythonOperator(
	task_id="print_the_context",
	python_callable=print_context
    )

get_data = PythonOperator(
	task_id='get_data',
	python_callable=get_data
    )

파이썬 코드를 실행시켜서 태스크를 수행하는 코드이다. python_callable은 실행할 파이썬 함수이다.

 

- PythonVirtualenvOperator

from airflow.operators.python import PythonVirtualenvOperator

요약) Airflow에서 파이썬 코드를 가상 환경에서 실행할 수 있게 해주는 오퍼레이터입니다. 이는 주로 특정 파이썬 패키지나 라이브러리의 버전을 고정해야 하는 경우에 유용합니다.

get_data = PythonVirtualenvOperator(
            task_id='get_data',
            python_callable=get_data,
            requirements=["git+https://github.com/sooj1n/mov.git@0.2/api"],
            system_site_packages=False
    )

 

- BranchPythonOperator

from airflow.operators.python import BranchPythonOperator

요약) 조건에 따라 실행 경로를 분기 처리할 수 있는 오퍼레이터입니다. 이를 사용하여 DAG의 실행 흐름을 동적으로 변경할 수 있습니다.

 

def branch_fun(**kwargs):
        ld = kwargs['ds_nodash']
        import os
        if os.path.exists(f'~/tmp/test_parquet/load_dt={ld}'):
            return rm_dir
        else:
            return get_data
            
            
branch_op = BranchPythonOperator(
            task_id="branch.op",
            python_callable=branch_fun
    )
    
rm_dir = BashOperator(
            task_id='rm.dir',                    
            bash_command='rm -rf ~/tmp/test_parquet/load_dt=={{ ds_nodash }}'
    )

브랜치가 체크해서 디렉토리가 있으면 rm, 없으면 rm 생략하고 get.data가 실행되도록.

일단 코드 작성하고 월요일 끝!!

 

 

7.30(화)

왠지 세뇌당하는 느낌이라 노션에 외계인 이모지와 함께 필기를 시작했다 ..

 

 

save2df()가 수행되는 get.data 테스크 결과, 데이터가 중복되어 저장되고 있었다. 

branch.op 테스크를 추가하여 조건을 검사하도록 했다.

 def branch_fun(**kwargs):
        ld = kwargs['ds_nodash']
        import os
        home_dir = os.path.expanduser("~")
        path = f('{home_dir}/tmp/test_parquet/load_dt={ld}')
        path = os.path.join(home_dir, f"tmp/test_parquet/load_dt={ld}")

        if os.path.exists(path):
            return "rm.dir"
        else:
            return "get.data"

path를 설정하는 방법은 위에 두가지가 다 가능한데, 주의할 점은 보통 홈 디렉토리를 ~/ 라고 표현하기도 하는데 이렇게 쓰면 경로 인식을 못한다. 문자열 ~를 사용자의 홈 디렉터리 경로로 변환해주는 os.path.expanduser("~") 단계가 필요하다.

 

만약 그 경로에 파일이 있으면 rm.dir 테스크를 수행하여 삭제 후 get_data로 진행하고, 아니면 get.data를 수행한다.

 

*os.path 관련 함수

https://docs.python.org/ko/3/library/os.path.html#module-os.path

 

os.path — Common pathname manipulations

Source code: Lib/genericpath.py, Lib/posixpath.py(for POSIX) and Lib/ntpath.py(for Windows). This module implements some useful functions on pathnames. To read or write files see open(), and for ac...

docs.python.org

- os.path.join(path*paths) : 두 번째 인자가 *paths이기 때문에 복수 개 받을 수 있다.

>>> os.path.join(home_dir,"tmp")
/home/sujin/tmp

>>>os.path.join(home_dir,"tmp","make"')
/home/sujin/tmp/make

 

-get_data 수정

 

 get_data = PythonVirtualenvOperator(
            task_id='get_data',
            python_callable=get_data,
            requirements=["git+https://github.com/sooj1n/mov.git@0.2/api"],
            system_site_packages=False,
            trigger_rule="all_done"
            venv_cache_path="/home/sujin/tmp2/air_venv/get_data"
)


def get_data(ds_nodash):
from mov.api.call import get_key,save2df
df=save2df(ds_nodash)
print(df.head(5))

경로에 데이터가 없으면 rm.dir을 스킵하고 get.data를 수행하면 되는데, 그 설정이 되어있지 않았다.

trigger_rule="all_done"을 통해 get.data 이전의 작업들이 성공/실패/스킵 상관없이 완료되면 수행되도록 했다.

 

PythonVirtualenvOperator는 파이썬 코드를 가상 환경에서 실행할 수 있게 하는 오퍼레이트였다.

venv_cache_path에서 경로를 지정하면 이 경로에 가상 환경이 생성되고 저장되어, 같은 가상 환경을 반복적으로 사용할 수 있게 한다.

만약 가상환경에서 수행하지 않으면 매번 pip ~@@@~를 통해 패키지를 설치하여 mov를 최신화시켜야한다. 

PythonVirtualenvOperator에서 생성되는 가상환경 python 버전은 기본적으로 작업이 실행되는 환경에서의 버전을 따른다.(따로 지정할 수도 있음). 그래서 mov가 생성된 가상환경을 설치하면 되는 것 같은 그런느낌..? 사실 아직 100% 이해 못했다

 

***

문자열-> 숫자

코드 복사를 안해옴

 

7/31(수)

기본 데이터에서 요청 변수를 사용하여 추가적인 데이터를 얻을 수 있었다. 

multiMovieYn과 repNationCd 요청변수를 추가적으로 사용하기로 했다.

 

기존의 get.data 단계에 4개의 task를 추가했다. (multiMovieYn -Y/N, repNationCd-K/F)

multi_y = EmptyOperator(task_id='multi.y') # 다양성 영화 유무
multi_n = EmptyOperator(task_id='multi.n')
nation_k = EmptyOperator(task_id='nation.k') # 한국영화                            
nation_f = EmptyOperator(task_id='nation.f') # 외국영화


task_start >> branch_op
task_start >> throw_err >> task_save

rm_dir >> get_start >> [get_data, multi_y, multi_n, nation_k, nation_f]

branch_op >> rm_dir
branch_op >> get_start
branch_op >> echo_task

[get_data, multi_y, multi_n, nation_k, nation_f] >> get_end >> task_save >> task_end

 

참고로 join_task -> throw_err 이름 변경함.

get_start = EmptyOperator(task_id='get.start', trigger_rule='all_done')

그리고 get_start에 trigger_rule을 추가해주었다.

echo_test가 스킵됐을 때 get_start도 스킵되버리기 때문에

 

그리고 mov 모듈을 수정했다.

src/mov/api/call.py

def gen_url(load_dt="20120101", req_val={ "multiMovieYn" : "N"}):
    base_url = "http://www.kobis.or.kr/kobisopenapi/webservice/rest/boxoffice/searchDailyBoxOfficeList.json"
    key = get_key()
    url= f"{base_url}?key={key}&targetDt={load_dt}"
    for k,v in req_val.items():
         #url = url + f"&multiMovieYn=N"
         url = url + f"&{k}={v}"
         
         
         
 def save2df(load_dt='20120101', url_param={}):
    """airflow 호출 지점"""
    df = list2df(load_dt)
    # df에 load_dt 컬럼 추가 (조회 일자 YYYYMMDD 형식으로)
    # 아래 파일 저장 시 load_dt 기분으로 파티셔닝
    df['load_dt'] = load_dt
    print(df.head(5))
    df.to_parquet('~/tmp/test_parquet',partition_cols=['load_dt'])
    return df

요청변수를 url에 추가하여 사용하는 방식이기 때문에 gen_url을 수정했다.

req_val 파라미터를 생성하여 딕셔너리 형식으로 받았다. 

 

save2df에도 url_param 파라미터를 추가했다. 밑에랑 같이 이해해야댐.

movie.py

#다양성 영화 유무
    multi_y = PythonVirtualenvOperator(
        task_id="multi.y",
        python_callable=fun_multi_y,
        system_site_packages=Fasle,
        requirements=["git+https://github.com/sooj1n/mov.git@0.3/api"]
    )
    
def fun_multi_y(ds_nodash):
        from mov.api.call import save2df
        p = {"multiMovieYn" : "Y"}
        df = save2df(load_dt=ds_nodash, url_param=p)
        print(df.head(5))

새로 만든 4개의 task 중 하나인 multi_y이다. 파이썬 함수를 적용하기 위해 emptyOperator에서 PythonVirtualenvOperator로 바꿨다. 함수는 fun_multi_y이다. 함수에서 딕셔너리 형태로 요청변수를 선언했다. 이걸 save2df에 같이 넘기기 위해 위에서 save2df에 파람티러를 추가한것임@@

 

이렇게 똑같이 나머지 3개를 만들면된다.

근데 강사님이 새로운 과제를 주심.

  • [ ] movie.py DAG 의 아래 더미 오퍼레이터를 파이썬 오퍼레이터로(각각 함수를 생성) 변경 => 지금보니 이게 위의 내용임
  • [ ] mov 모듈을 변경해서 url_param 의 값이 save_data() -> gen_url() 모두 전달 되도록
  • [ ] 오퍼레이터로(각각 함수를 생성) 한 부분을 여러 오퍼레이터에서 하나의 함수를 재활용 하는 방법(같이) -op_param... 키워드로 검색해 보세요.
  • [ ] save_data() 라는 함수를 적절한 이름으로 변경하고 이름에 맞는 동작으로 변경 -> get_data() -> 기존함수에서 df 리턴 + 저장 함께 있던 부분을 DF 리턴만 남기기 -> 저장은 AIRFLOW 에서 하도록 변경
  • [ ] 변경된 모듈의 test 도 만들면 더 좋겠다.

나는 오늘 3번까지 해결했다..!

 

먼저 1번 코드

위에서 설명한 것 처럼 파이썬 함수를 적용하기 위해 emptyOperator에서 PythonVirtualenvOperator로 바꾸고

각각 함수도 생성하고 요청변수 값인 딕셔너리형태의 p 변수도 각각 선언해주었다.

multi_y = PythonVirtualenvOperator(
        task_id="multi.y",
        python_callable=fun_multi_y,
        system_site_packages=False,
        requirements=["git+https://github.com/sooj1n/mov.git@0.3/api"],
    )
    multi_n = PythonVirtualenvOperator(
        task_id="multi.n",
        python_callable=fun_multi_n,
        system_site_packages=False,
        requirements=["git+https://github.com/sooj1n/mov.git@0.3/api"],
    )
    #한국영화 
    nation_k = PythonVirtualenvOperator(
        task_id="nation.k",
        python_callable=fun_nation_k,
        system_site_packages=False,
        requirements=["git+https://github.com/sooj1n/mov.git@0.3/api"],
    )

    nation_f = PythonVirtualenvOperator(
        task_id="nation.f",
        python_callable=fun_nation_f,
        system_site_packages=False,
        requirements=["git+https://github.com/sooj1n/mov.git@0.3/api"],
    )
    
    
    
    def fun_multi_y(ds_nodash):
        from mov.api.call import save2df
        p = {"multiMovieYn" : "Y"}
        df = save2df(load_dt=ds_nodash, url_param=p)
        print(df.head(5))

    def fun_multi_n(ds_nodash):
        from mov.api.call import save2df
        p = {"multiMovieYn" : "N"}
        df = save2df(load_dt=ds_nodash, url_param=p)
        print(df.head(5))

    def fun_nation_k(ds_nodash):
        from mov.api.call import save2df
        p = {"repNationCd" : "K"}
        df = save2df(load_dt=ds_nodash, url_param=p)
        print(df.head(5))

    def fun_nation_f(ds_nodash):
        from mov.api.call import save2df
        p = {"repNationCd" : "F"}
        df = save2df(load_dt=ds_nodash, url_param=p)
        print(df.head(5))

 

 

2번 mov 모듈을 변경해서 url_param 의 값이 save_data() -> gen_url() 모두 전달 되도록 

여기서 내가 틀렸던거는, gen_url이 전달되도록

save2df<-list2df <- req2list …..등등

순차적으로 돌아가는거기 때문에 각각의 함수에 url_param()을 넣어 새로운 url이 생성되도록 했어야했는데

나는 바로 save2df에서 gen_url을 호출하고 df와 url을 리턴하도록 했다.

ㅎㅎ;

수정하기

def save2df(load_dt='20120101', url_param={}):
    """airflow 호출 지점"""
    df = list2df(load_dt)
    # df에 load_dt 컬럼 추가 (조회 일자 YYYYMMDD 형식으로)
    # 아래 파일 저장 시 load_dt 기분으로 파티셔닝
    df['load_dt'] = load_dt
    print(df.head(5))
    df.to_parquet('~/tmp/test_parquet',partition_cols=['load_dt'])
    url = gen_url(load_dt, url_param)
    return df, url

 

df,url = save2df(load_dt=ds_nodash, url_param=p)
print(df.head(5))
print(url)

그리고 movie.py에서 두 개의 리턴 값을 받도록 함수를 수정했었다

print문을 통해 각각에 맞는 값이 출력되는 것을 보고 맞게 했다고 생각했었~

 

 

 

3번 .오퍼레이터로(각각 함수를 생성) 한 부분을 여러 오퍼레이터에서 하나의 함수를 재활용 하는 방법(같이) -op_param... 키워드로 검색해 보세요.

 

중복되는 내용을 각각 선언하고 사용하여서 수행하는 작업인 것 같다. common_get_data를 선언했다.

def common_get_data(dt, p={}):
        from mov.api.call import save2df
        df,url = save2df(load_dt=dt, url_param=p)
        print(df.head(5))
        print(url)

 

태스크 내용도 중복이니 multi_y 설명만 적겠다 조금귀찮..

추가된 사항은 op_args, op_kwargs이다. python_callable의 함수에 인자를 넘겨주는 부분이다.

op_args값인 ds_nodash(날짜)가 common_get_data의 첫 번째 파라미터

op_kwargs값인 딕셔너리 값이 두 번째 파라미터로 전달된다.

 

    multi_y = PythonVirtualenvOperator(
        task_id="multi.y",
        python_callable=common_get_data,
        system_site_packages=False,
        requirements=["git+https://github.com/sooj1n/mov.git@0.3/api"],
        op_args=["{{ds_nodash}}"],
        op_kwargs={"p" : {"multiMovieYn": "Y"}}
    )

 

이렇게 적으니까 얼마 안되는 양 같지만 무려 3시간동안.. 해결했다 🐢🐢

 

마지막 1시간은 강사님께서 정답코드를 보여주셨는데 내일 수정하고 회고에 적겠다~!

중요했던거는 파티셔닝을 통해 성능을 향상시킬 수 있다는 것1!!

그래서 날짜별 아래 키 별 파티셔닝을 결과로 내야한다

 

 

 

먼저 어제 마무리못한 실습 코드 정리

def common_get_data(ds_nodash, url_param):
	from mov.api.call import save2df
    df = save2df(load_dt=ds_nodash, url_param=url_param)
    
    print(df[['movieCd', 'movieNm']].head(5))
    
    for k, v in url_param.items():
    	df[k] = v
        
    p_cols = ['load_dt'] + list(url_param.keys())
    df.to_parquet('~/tmp/test_parquet', partition_cols=p_cols)
def save2df(load_dt='20120101', url_param={}):
    """airflow 호출 지점"""
    df = list2df(url_param=url_param, load_dt=load_dt)
    # df에 load_dt 컬럼 추가 (조회 일자 YYYYMMDD 형식으로)
    # 아래 파일 저장 시 load_dt 기분으로 파티셔닝
    df['load_dt'] = load_dt
    #df.to_parquet('~/tmp/test_parquet',partition_cols=['load_dt'])
    return df

아무튼 최종 목표는 날짜별 + 키 별 파티셔닝을 한 형태를 만드는 것 이었다.

성능을 향상시킬 수 있기 때문에!!

p_cols = ['load_dt'] + list(url_param.keys())
df.to_parquet('~/tmp/test_parquet', partition_cols=p_cols)

이 부분이 날짜+키 파티셔닝 부분이다.

이해가 안돼서 간단하게 생각해봤다.

이해 완 🙆‍♀️

 

그리고 마지막까지 헤맸던거는 

정상적으로 작동했다면 이렇게 parquet 파일이 저장되어야하는데 나는 load_dt= ''' 아래에 parquet파일이 또 생성됐다.

알고보니 save2df에서는 df 리턴까지 수행하고 common_get_data에서 저장 단계를 수행하도록 수정하기로 했는데 내가 이 부분을 못해서 저장하는 코드가 여전히 save2df에도 남아있어서 두 번 저장이 된 것 이었다. to_parquet 코드 주석 처리 후 해결했다.

 

오늘은 movie_summary.py DAG를 새로 생성했다. todo 리스트이다

  • [ ] movie_summary.py DAG 생성
  • [ ] type 적용 - apply_type TASK
  • [ ] 4개 df 합치기(merge) merge_df TASK 생성
  • [ ] 합친 df 중복제거 de_dup TASK 생성
  • [ ] summary 데이터 생성 summary_df TASK 생성

- **kwargs 키워드인수

가변 인자를 위한 변수이고, 딕셔너리 형태로 저장된다.

>>>def abc(**kwargs):
...     print(kwargs)

>>>abc(1,2,3)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: abc() takes 0 positional arguments but 3 were given

>>>abc(a=1,b=2,c=3)
{'a': 1, 'b': 2, 'c': 3}



>>>def abc(a, b, **kwargs):
...     print(kwargs)

>>> abc(a=1,b=2,c=3,d=4)
{'c': 3, 'd': 4}
>>> abc(a=1,b=2,c=3)
{'c': 3}
>>> abc(a=1,b=2)
{}
>>> abc(a=1)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: abc() missing 1 required positional argument: 'b'

abc(**kwargs)는 키워드 인수만 허용하는 반면, abc(a, b, **kwargs)는 위치 인수 a 및 b가 필요하며 추가 키워드 인수를 혀용한다.

 

op_kwargs={
	"url_param": {"multiMovieYn" : "Y"}
    "ds" : "2024-11-11"
    "ds_nodash" : "20241111"
}
   
   
#받을 때
a=kwargs['url_param']
b=kwargs['ds']
...

PythonVirtualenvOperator의 op_kwargs로 값을 전달할 때 이렇게 값을 받아올 수 있다.

 

이거를 어디에 사용할거냐면, PythonVirtualenvOperator 태스크를 4개 만들건데 공통 함수(gen_vpython)을 선언해서 각각의 task_id, python_callable 값을 인자로 받아올거다.

def gen_vpython(**kw):
	task = PythonVirtualenvOperator(
    	task_id=kw['id'],
        python_callable=kw['fun_obj'],
        requirements=REQUIREMENTS,
        system_site_packages=False,
        op_kwargs=kw['op_kw']
    )
	return task
    
    
    apply_type = gen_vpython(
        id= "apply.type",
        fun_obj = pro_data,
    )

    merge_df =  gen_vpython(
        id= "merge.df",
        fun_obj = pro_data2,
    )

    de_dup =  gen_vpython(
        id= "de_dup",
        fun_obj = pro_data,
    )

    summary_df= gen_vpython(
        id= "summary_df",
        fun_obj = pro_data2,
    )

 

-pprint (pretty-print)

읽기 쉽고 미적으로 보기 좋은 방식으로 데이터 구조의 형식을 출력하는 Python 모듈이다.

어떤 데이터였는지 기억이 잘 안나지만..! pprint로 보기 좋게 print해서 테스트할 수 있었다.

from pprint import pprint as pp
...
pp(data)

 

 

-merge_df TASK

import pandas as pd

def merge(load_dt="20240724"):
    read_df = pd.read_parquet('~/tmp/test_parquet2')
    cols = ['movieCd', #영화의 대표코드를 출력합니다.
       'movieNm', #영화명(국문)을 출력합니다.
        #'openDt', #영화의 개봉일을 출력합니다.
        #'audiCnt', #해당일의 관객수를 출력합니다.
        'load_dt', # 입수일자
        'multiMovieYn', #다양성영화 유무
        'repNationCd', #한국외국영화 유무
       ]

    df = read_df[cols]
    #print(df.head(5))
    #울버린만 조회
    #df_where = df[df['movieCd']=='20247781'].copy()
    df_where = df[(df['movieCd']=='20247781') & (df['load_dt']==int(load_dt))].copy()
    print(df_where)

    #카테고리 타입->Object
    df_where['load_dt']= df_where['load_dt'].astype('object')
    df_where['multiMovieYn'] = df_where['multiMovieYn'].astype('object')
    df_where['repNationCd'] = df_where['repNationCd'].astype('object')
    print(df_where.dtypes)

    # NaN값 unknown으로 변경
    df_where['multiMovieYn'] = df_where['multiMovieYn'].fillna('unknown')
    df_where['repNationCd'] = df_where['repNationCd'].fillna('unknown')
    print(df_where.dtypes)
    print(df_where)

    # 머지
    u_mul = df_where[df_where['multiMovieYn'] == 'unknown']
    u_nat = df_where[df_where['repNationCd'] == 'unknown']
    m_df = pd.merge(u_mul, u_nat, on='movieCd', suffixes=('_m', '_n'))

    print("머지 DF")
    print(m_df)
    return m_df                                                               
merge()

먼저 merge 태스크에 적용할 파이썬 코드를 작성했다.

movie 파이프라인 수행 결과 저장된 parquet를 읽어서 원하는 열로 구성하고 (cols = [ ] 부분)

영화별 날짜별 중복 데이터를 출력해봤다. 

 

예를 들면 

movieCd    movieNm           load_dt          multiMovieYn     repNatrionCd

11111           인사이드아웃        20240711           Y                         None

11111           인사이드아웃        20240711           None                      F

같은 영화이지만 추가 키에 의해 2행이 생성된 경우이다.

 

m_df = pd.merge(u_mul, u_nat, on='movieCd', suffixes=('_m', '_n'))

중복 행을 merge할 때 suffixes 옵션을 주었다. 겹치는 열 이름에 접미사 _m과 _n을 추가한다.

 

결과

끝!

 

8.2(금)

프로젝트 시작

 

 

🩷 좋았던 점

1. 한 번씩 git push를 할 때마다 인증을 요구하는 경우가 있었다. 해결 방법을 검색해서 인증키를 받아도 잘 해결되지 않았고, 해결되었어도 push를 할 때마다 인증해야해서 번거로웠는데 처음 github를 연결할 때 ssh가 아닌 https로 해서 그런것이였다. 이미 내 계정의 ssh 인증을 마친 상태이기 때문에 ssh 주소로 연동을 진행하면 된다는 것을 알게되었다.

 

2. 매일 블로그에 기록하며 복습했다!

 

 

🥹 아쉬웠던 점

강사님의 진행과정을 따라할수는 있겠는데, 시간을 주고 실습을 시키시면 조금 헤매고 해결하는데 시간이 오래걸렸던 것 같다.

프로젝트 1일차 완성 못했다..ㅜ  

 

 

💪 이 상태에서 다음 일주일을 더 잘 보내려면 어떻게 해야 할까?

월요일에는 저번 주 금요일에 완성하지 못한 부분까지 무조건 끝내고, 프로젝트 시간 관리에 좀 더 신경쓰기.