playdata/project

project 2. Business-Chatting-System

soojin1 2024. 9. 5. 22:15

https://github.com/DE32-2nd-team4/Business-Chatting-System

 

GitHub - DE32-2nd-team4/Business-Chatting-System

Contribute to DE32-2nd-team4/Business-Chatting-System development by creating an account on GitHub.

github.com

 

주제 

업무용 메신저 만들기

 

배경

1) 사내 기술 유출 이슈

2) 감사팀 메신저 감찰 필요

 

활용 필수 기술 스택

  • Apache Kafka
  • Apache Spark
  • Apache Airflow
  • Apache Zeppelin

 

주요 요구사항

  • 업무 대화 기능
  • 업무 대화 감사 기능 (검색, 대화 주제 통계) - zeppelin
  • 영화 챗봇 기능 (@bot 파묘 감독은 누구야?)
  • 시스템 챗봇 기능 (airflow JOB ... 성공 실패)
  • 일정 챗봇 기능 (애자일 칸반 미팅 시간입니다)

< 진행 >

나는 이번 프로젝트에서 업무 대화 감사 기능을 맡았다. 

Airflow에 Kafka Consumer를 연결하여 Producer가 생성하는 메시지를 받고, 데이터를 쌓아서 Zeppelin에서 통계를 내었다.

 

먼저, Airflow에 Consumer를 연결하기 위한 방식으로 Apache Kafka Operator를 사용하기로 했다. 

t2 = ConsumeFromTopicOperator(
    kafka_config_id="team4",
    task_id="consume_from_topic",
    topics=["test_1"],
    apply_function="example_dag_hello_kafka.consumer_function",
    apply_function_kwargs={"prefix": "consumed:::"},
    commit_cadence="end_of_batch",
    max_messages=10,
    max_batch_size=2,
)

기본적으로 위와 같이 사용할 수 있고, kafka_config_id에 해당하는 내용은 두 번째 사진과 같이 Airflow UI에서 입력을 해야한다.

 

수업시간에 배웠던 Kafka Consumer는 발생하는 메시지를 for 문을 사용하여 하나씩 받아왔다. 여기서도 마찬가지로 for 문으로 받아온다고 생각하여 같은 방식으로 구현했었다.

for m in c:
	print(f"[{m.value['time']}] {m.value['username']} {m.value['message']}")

채팅방에서 발생하는 메시지를 받아오는 것 이고, 내가 직접 메시지를 생성하는 것 이기 때문에 분명히 1개 이상의 메시지가 쌓였을 것 인데, 받아온 메시지(위의 반복문에서는 c에 해당)가 cimpl.Message(단일객체)이기 때문에 반복문을 돌릴 수 없다는 오류가 발생했다. 

카프카에 대한 이해가 조금 부족하기도 했고, 위의 방식밖에 몰랐기 때문에 (ㅜㅜ) 이 문제로 프로젝트 첫째날 오후시간을 날렸다.

 

프로젝트 두 번째 날

단일 객체라는 것을 조금 더 생각해보았다. 데이터가 많이 쌓였음이 확실하기 때문에 단일객체로 받아온다는 것은 메시지를 하나씩 받아오는 것 이었다. (그러니까 For 문을 돌릴 필요 없이 지금까지 쌓인 데이터를 하나씩 받아오는 것) 이와 같은 사실은 ConsumeFromTopicOperator 요소 중 max_messages=3로 설정하여 보다 쉽게 이해할 수 있었다.

 

이렇게 진행 중, 우리 조는 감사 기능을 위해 메크로를 사용해서 채팅 데이터를 쌓기로 했다. 메크로가 3초에 한 번씩 채팅을 보내며 짧은 시간 내에 정말 많은 데이터를 쌓을 수 있었는데, 여기서 문제가 발생했다. (나만 ...ㅡㅜ)

아직 에어플로우 기능이 덜 완성되었기 때문에 계속해서 에어플로우를 구동시키며 테스트가 필요했는데, 데이터가 너무 많이 쌓이다보니 시간도 엄청 오래 걸렸다. 10분 넘게 기다려서 작동이 되는 경우도 있었고, 그 이상을 기다려도 안되는 경우도 있었다. 나는 max_messages로 조절할 수 있을 줄 알았는데, 우선 모두 받아온 후 max_messages만큼 처리하는 듯 하였다. 또 task 구동 시간에 제한을 두는 방법도 생각하여 구현해보았는데 이유는 아직 모르겠지만 로그가 발생하지 않고 내가 생각한 대로 작동되지 않았다. 

 

결국 강사님께 도움을 청했는데 나처럼 UI를 통해 개발하는 것은 코드 상 직접적으로 드러난 부분이 아니기 때문에 숙련된 사람이 아니라면 디버깅하기에 매우 까다로울 수 있다고 하셨다. 대신 수업시간에 배운 코드를 활용해보는 방법을 더 추천하셨다. 

여기서....약간 실수를 했다면......!  나는 Airflow와 kafka연결에 대한 기능을 검색해보았을 때 나온 공식문서를 보고 구현 중 이었기 때문에 왠지 Apache Kafka Operator를 통해 구현을 해보고 싶었다. 그래서 우선은 기존 방식대로 진행을 했었는데, 디버깅에 큰 불편함이 있었고 문제는 발생하는데 해결하기가 쉽지 않았다. 결국 둘째날이 다 끝나갈 때 쯤에서야 강사님의 말씀대로..... 방식을 바꿨다.

프로젝트 마지막 날

우선 완성된 코드는 다음과 같다.

import os
import json
import time
import pandas as pd
from kafka import KafkaConsumer,TopicPartition
from datetime import datetime, timedelta
from textwrap import dedent

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator


from airflow.operators.python import (
        PythonOperator,
        PythonVirtualenvOperator,
        BranchPythonOperator
)

with DAG(
    'log2parquet',
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        'depends_on_past': True,
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(seconds=3),
    },
    max_active_runs=1,
    max_active_tasks=3,
    description='chat audit',
    schedule_interval='@hourly',
    start_date=datetime(2024, 8, 28),
    catchup=True,
    tags=['logs','parquet'],

) as dag:
    start=EmptyOperator(task_id='start')
    end=EmptyOperator(task_id='end')     

    

    def fun_con(**kwargs):
        c = KafkaConsumer(
            'team4',
            bootstrap_servers=['ec2-43-203-210-250.ap-northeast-2.compute.amazonaws.com:9092'],
            auto_offset_reset='earliest',
            enable_auto_commit=True,
            group_id='E',
            consumer_timeout_ms=30000,
            fetch_max_wait_ms=30000,
            value_deserializer = lambda x: json.loads(x.decode('utf-8'))
        )

        partition = TopicPartition('team4', 0)  # 특정 파티션 지정
        #c.assign([partition])
        
        for partition in c.assignment():
            last_committed_offset = c.committed(partition)
            if last_committed_offset is not None:
                # 마지막 커밋된 오프셋으로 이동
                c.seek(partition, last_committed_offset)
            else:
                # 커밋된 오프셋이 없을 경우, 가장 오래된 오프셋으로 이동
                c.seek_to_beginning(partition)

        df = pd.DataFrame(columns=['nickname', 'message', 'time'])
        #cnt = 0
        for m in c:
            #cnt +=1
            #if cnt==5:
            #    break
            try:
                nickname = m.value.get('nickname', 'N/A')
                message = m.value.get('message', 'N/A')
                time_value = m.value.get('time', 'N/A')
                #print(f"------------------{m.value['nickname','message','time']}")          
                #print(f"------------------{m.value['nickname']}   {m.value['message']}    {m.value['time']}")           
                print(f"{nickname}   {message}   {time_value}")
                data = [[nickname], [message], [time_value]]
                
                df_data = pd.DataFrame([[nickname, message, time_value]], columns=['nickname', 'message', 'time'])
                df = pd.concat([df, df_data], ignore_index=True)
               
            except KeyError as e:
                print(f"KeyError: {e}")
            except UnicodeEncodeError as e:
                print(f"Unicode encoding error: {e}")
            except Exception as e:
                print(f"An unexpected error occurred: {e}")
        
            print("*"*1000)
            

        home_dir = os.path.expanduser("~")
        file_path = os.path.join(os.path.expanduser("~"), 'code','Business-Chatting-System', 'data', 'csv', 'data.csv')

        # 파일이 있는지 확인하고, 없으면 생성하여 내용 추가
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        df.to_csv(file_path, mode='a', header=not os.path.exists(file_path), index=False)

    def fun_parquet():
        print('parquet')
        home_dir = os.path.expanduser("~")

        #csv 파일 읽기
        csv_file_path = os.path.join(os.path.expanduser("~"), 'code','Business-Chatting-System', 'data', 'csv', 'data.csv')
        df = pd.read_csv(csv_file_path) 

        parquet_file_path = os.path.join(os.path.expanduser("~"), 'code','Business-Chatting-System', 'data', 'parquet', 'p.parquet')
        
        # 디렉토리가 존재하지 않으면 생성
        #os.makedirs(os.path.dirname(parquet_file_path), exist_ok=True)


        if os.path.exists(parquet_file_path):
            # 기존 Parquet 파일 읽기
            df_existing = pd.read_parquet(parquet_file_path, engine='pyarrow')
            # 기존 데이터프레임과 새 데이터프레임을 결합
            df_combined = pd.concat([df_existing, df], ignore_index=True)
        else:
            # 디렉토리가 존재하지 않으면 생성
            os.makedirs(os.path.dirname(parquet_file_path), exist_ok=True)

            # 새 파일을 위한 데이터프레임 사용
            df_combined = df

        # Parquet 파일로 저장
        df_combined.to_parquet(parquet_file_path, index=False, engine='pyarrow')
        
        
    consumer = PythonOperator(
        task_id="c",
        python_callable=fun_con,
    )

    to_parquet = PythonOperator(
        task_id="to.parquet",
        python_callable=fun_parquet

    )


    start >> consumer >> to_parquet >> end

 

consumer Task에서는 발생하는 채팅 데이터를 받아와서 csv 파일로 저장한다. 커밋  시 오프셋을 지정하여 다음 consumer 작업이 시작될 때는 이전에 읽었던 부분을 제외하고 그 다음부터 데이터를 읽어들이도록 했다. 욕심을 버리고 PythonOperator로 작성했는데 확실히 디버깅하기 굉장히 수월해서 빠르게 에러를 해결해나갈 수 있었다.

 

to_parquet Task는 저장된 csv 파일을 읽어와 parquet 형식으로 저장하는 부분이다. 조금 아쉬웠던 점은 시간이 부족해서 멱등성을 위한 rm Task를 작성하지 못했다. 테스트를 할 때도 데이터가 중복되는 것을 방지하기 위해 수동으로 디렉토리를 지워가며 수행했었다. (ㅜ)

 

이렇게 완성한 코드로 에어플로우를 구동시켜 데이터를 수집하여 제플린에서 통계를 내어보았다.

제플린 파일은 디렉토리 내에 위치시켜서 감사팀이 해당 코드를 활용할 수 있도록 했다.

 

%sql
-- 닉네임 별 채팅 빈도
SELECT nickname, COUNT(*) AS count
FROM chat
GROUP BY nickname;

 

%spark.pyspark
# 날짜 형식 통일
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import TimestampType
import datetime
import re

# Spark 세션 생성
spark = SparkSession.builder.appName("DateConversion").getOrCreate()

# Unix 타임스탬프를 변환하는 함수
def unix_to_datetime(timestamp):
    try:
        return datetime.datetime.fromtimestamp(float(timestamp))
    except ValueError:
        return None

# 날짜 형식을 인식하고 변환하는 함수
def parse_date(date_str):
    formats = [
        "%Y-%m-%d %H:%M:%S",  # 형식: 2024-08-27 10:45:24
        "%a %b %d %H:%M:%S %Y",  # 형식: Tue Aug 27 11:04:45 2024
    ]
    
    # Unix 타임스탬프인지 확인
    if re.match(r'^\d+(\.\d+)?$', date_str):
        return unix_to_datetime(date_str)
    
    # 날짜 형식 시도
    for fmt in formats:
        try:
            return datetime.datetime.strptime(date_str, fmt)
        except ValueError:
            continue
    return None

# UDF 등록
parse_date_udf = udf(parse_date, TimestampType())

# 데이터프레임에 UDF 적용
df = df.withColumn("parsed_time", parse_date_udf(col("time")))
z.show(df)
%sql
-- 날짜 별 채팅 빈도
SELECT 
    to_date(parsed_time) AS date,
    COUNT(*) AS count
FROM 
    chat_time
GROUP BY 
    to_date(parsed_time)
ORDER BY 
    date

 

%sql
-- 날짜 + 시간 별 채팅 빈도
SELECT 
    date_format(parsed_time, 'yyyy-MM-dd HH:00:00') AS date_hour,
    COUNT(*) AS count
FROM 
    chat_time
GROUP BY 
    date_format(parsed_time, 'yyyy-MM-dd HH:00:00')
ORDER BY 
    date_hour

%spark.pyspark
# 단어 빈도 계산(상위 10개)

from pyspark.sql.functions import col, explode, split, lower, regexp_replace, trim

# 메시지가 빈 경우 필터링
df_filtered = df.filter(col('message').isNotNull() & (trim(col('message')) != ''))

# 메시지에서 특수문자 및 구두점 제거
df_cleaned = df_filtered.withColumn(
    'clean_message',
    regexp_replace(col('message'), '[^\\w\\s가-힣]', '')  # 영문자, 숫자, 공백, 한글만 허용
)

# 메시지에서 단어 분리 및 소문자 변환
words_df = df_cleaned.withColumn('word', explode(split(lower(col('clean_message')), '\\s+')))

# 빈 단어 제거 (단어가 공백이거나 NULL인 경우)
words_df_filtered = words_df.filter(col('word') != '')

# 단어 빈도 계산
word_count_df = words_df_filtered.groupBy('word').count().orderBy(col('count').desc())

# 결과 출력

top_10_words_df = word_count_df.limit(10)
top_10_words_df.show(truncate=False)

top_10_words_df.createOrReplaceTempView("frequent")

%sql
-- 단어 빈도 시각화
select * from frequent

 

날짜 형식 통일이라던지, 단어 빈도는 챗지피티의 도움을 받았다 ㅎ ㅎ

 

<느낀점>

우선 팀원들에게 배울 점이 너무 많았다.

 

태영,정훈님은 아이디어도 많고, 그 아이디어를 실제로 구현할 방법을 알고 수행할 실력이 뒷받침 된 사람들이었다.

그래서 첫 날에 아이디어 회의를 하다가 나는 말을 못알아들어서 중간 중간 뇌가 멈췄다............(우울)

문장, 단어 하나 하나 이해를 하면서 듣느라  과부화가 온 것 같았다. 

특히 태영이는 너무 즐겁게 코딩을 하는 것 같아서 부럽고 보기 좋았다 ^__^

 

준형님은 이렇게 빠르고 어려운 대화 속 회의록을 항상 작성하셨는데, 처음에 진짜 놀랐다. 단순히 기록만 한 게 아니라 글이 정리가 되어있어서, 못 알아들은 부분을 회의록을 보고 이해할 정도였다. 글 작성을 정말 잘 하시는 것 같다. 그 외에도 깊은 이해도와 함께 내가 막히는 부분을 정말 많이 도와주셔서 감사했다.

 

정은님은 우리 팀에 꼭 필요한 존재셨다. 🥹

우리 팀은 열정과 의지가 넘쳐서 아이디어도 많고 구현하고 싶은 기능도 엄청 많았다. 당연히 (!!) 모두 구현할 수 있었다면 좋았겠지만 주어진 기간은 3일이라는 짧은 기간이었기 때문에 우선순위를 정할 필요가 있었는데, 그럴 때 마다 정은언니가 그 역할을 수행해주었다. 정은언니, 준형님의 의견에 따라 스트림릿과 CL를 동시에 진행했던 것이 신의 한수 (굿굿)

 

나의 좋았던 점은 에어플로우에 대한 이해를 좀 더 쌓았다는 것 이다. 이번 프로젝트로 나중에 취업을 했을 때 에어플로우를 사용하는 직무도 좋을 것 같다는 생각을 했다.

 

아쉬웠던 점은

1. 에어플로우 스케줄러를 한시간으로 설정했지만 구현이 늦어져서 결국은 한 번 수행시킨 데이터로 결과를 냈다.

물론 남은 시간이 더 많았다면 정상적으로 구동되도록 구현했고 데이터를 차곡차곡 쌓을 수 있었을 것 이다. 하지만 토픽오퍼레이터를 사용하여 완성하고싶은 마음에... ㅠ

또 한 번 생각한 빠른 포기의 중요성 👍 포기가 좋다는 게 아니라 안되면 되는 것으로 빠르게 전환하자는 뜻...!

 

2. 채팅 시스템을 만드는 프로젝트였지만 나는 채팅 시스템 코드는 한 번도 건들지 않았다(?)

나의 역할이 감사 부분이긴 했지만.. 프로젝트 주 목적은 채팅 시스템을 개발하는 것 이기 때문에 조금 아쉬운 마음은 있었다. 

근데 나는 프로젝트 시작 때 카프카에 대한 이해가 부족하다고 스스로 느끼긴 했다. 그래도 감사부분 하면서 consumer 부분을 건들였기 때문에 이제는 이해도가 더 쌓이긴 했다. 다른 코드는 개인적으로 보고 정리를 해 둘 생각이다.

 

3. 내 코드가 좋은 코드인지 모르겠다.

다른 분 들이 내가 한 부분 코드를 쭉 보는데 너무 긴장되고(ㅎ;;) 잘 하는 사람들이 보기에 내 코드가 효율적이지 않은(?) 깔끔하지 않은(?) 그런 코드일 것 같아서 자신감 하락 .............. 사실 내 코드라 할 것 도아니고 수업시간에 배웠던거 여기저기 긁어모은거다. 이런 건 어떻게 공부해야할지...1!!!!!! 고민이다.......