본문 바로가기
Side Project/AWS와 Python으로 페이스북 챗봇 개발하기

AWS와 python으로 페이스북 챗봇 만들기 (2) - python 코드

by 잇서니 2020. 9. 27.
반응형

 

AWS lambda 함수를 작성하여 (python) spotify 관련 데이터를 facebook 챗봇 메신저로 제공해봅시닷!

 

(1) facebook app, spotify app, RDS에 연결하기

import sys
import logging
import requests
import pymysql
import fb_bot
import json
import base64
import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# spotify app에서 발급받은 정보
client_id = ""
client_secret = ""

# aws RDS 접속정보
host = ""
port = 3306
database = "production"
username = ""
password = ""

# facebook app에서 발급받은 정보
PAGE_TOKEN = "****"
VERIFY_TOKEN = "verify_123"

# RDS 연결
try:
    conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True, charset='utf8')
    cursor = conn.cursor()
except:
    logging.error("could not connect to rds")
    sys.exit(1)

# facebook 연결
bot = fb_bot.Bot(PAGE_TOKEN)

 

 

(2) RDS에 artist가 저장된 경우, 저장되지 않은 경우 프로세스 처리하기

import sys
import logging
import fb_bot
import json
import base64
import boto3
from time import sleep
import re

#
from googletrans import Translator
import requests
import pymysql

logger = logging.getLogger()
logger.setLevel(logging.INFO)

client_id = "60eff8c421644ea8b52c8940ba69fcee"
client_secret = "a1d9618ff97640a1b1520c85585933e3"

host = "fastcampus.chcedkny24vd.ap-northeast-2.rds.amazonaws.com"
port = 3306
database = "production"
username = "sunny"
password = "sunny123"

PAGE_TOKEN = "EAAO813WvVrIBAKw0m4Rg1Alavgn81wRNNGzhQtep3oD7mNtUGtQTSbsLdP9Twxc2PrZBjZA2RglLWomhUmVuN7KJnVZCcVnIFCZCEzVgqZCmPKhIr038HlicYAXjWmx6oQGZBFo3ZA6hzmkl62zYdot2kxPHRWL3VmDeCz9BuYmUZAypEbqlw2TLlItfujoZAVTkZD"
VERIFY_TOKEN = "verify_123"

try:
    conn = pymysql.connect(host, user=username, passwd=password, db=database, port=port, use_unicode=True,
                           charset='utf8')
    cursor = conn.cursor()
except:
    logging.error("could not connect to rds")
    sys.exit(1)

bot = fb_bot.Bot(PAGE_TOKEN)
trans = Translator()

org_artist_name = ""

def lambda_handler(event, context):
    # event['params'] only exists for HTTPS GET

    logger.info(event)
    
    if 'params' in event.keys():
        # facebook 인증
        if event['params']['querystring']['hub.verify_token'] == VERIFY_TOKEN:
            return int(event['params']['querystring']['hub.challenge'])
        else:
            logging.error('wrong validation token')
            raise SystemExit
    else:
    
        messaging = event['entry'][0]['messaging'][0]
        user_id = messaging['sender']['id']
        logger.info(messaging)

        p = re.compile('.*[ㄱ-ㅎㅏ-ㅣ가-힣]+.*')

        # 사용자가 facebook 챗봇 메신저에 입력한 아티스트 이름
        artist_name = messaging['message']['text']
        global org_artist_name
        org_artist_name = artist_name

        # DB값 비교를 위해 아티스트 이름형식 변환
        if p.search(artist_name) : # 한글인 경우, spotify api날려서 변환
            artist_name = trans_korean_artist (cursor, user_id, artist_name)
            if not artist_name:
                sys.exit(0)
            else:
                artist_name = artist_name.replace(" ", "")
        else : # 영어인 경우 대문자, 공백제거
            artist_name = artist_name.upper().replace(" ","")

        query = "SELECT image_url, url FROM artists WHERE upper(replace(name,' ','')) = '{}'".format(artist_name)
        logger.info(query)
        cursor.execute(query)
        raw = cursor.fetchall()

        # (1) 해당 아티스트가 RDS에 저장되지 않은 경우
        if len(raw) == 0:
            # spotify API를 호출하여 아티스트 정보를 찾는다. 각 상황에 맞게 챗봇 메세지를 전달한다.
            logger.info("Call search_artist")
            search_artist(cursor, user_id, artist_name)
            sys.exit(0)

        # (2) 해당 아티스트가 RDS에 저장된 경우
        image_url, url = raw[0]

        payload = {
            'template_type': 'generic',
            'elements': [
                {
                    'title': "Artist Info: '{}'".format(org_artist_name),
                    'image_url': image_url,
                    'subtitle': 'information',
                    'default_action': {
                        'type': 'web_url',
                        'url': url,
                        'webview_height_ratio': 'full'
                    }
                }
            ]
        }



        
        
        # 챗봇에서 artist 정보 응답하기! (이미지 첨부된 템플릿)
        bot.send_attachment(user_id, "template", payload)

        query = "SELECT t2.genre FROM artists t1 JOIN artist_genres t2 ON t2.artist_id = t1.id WHERE upper(replace(t1.name,' ','')) = '{}'".format(artist_name)
        logger.info(query)
        cursor.execute(query)
        genres = []
        for (genre,) in cursor.fetchall():
            genres.append(genre)

        # 챗봇에서 artist 장르 응답하기!
        logger.info(genres)
        text = "Here are genres of {}".format(artist_name)
        bot.send_text(user_id, text)
        bot.send_text(user_id, ', '.join(genres))
        
        


def get_headers(client_id, client_secret):
    endpoint = "https://accounts.spotify.com/api/token"
    encoded = base64.b64encode("{}:{}".format(client_id, client_secret).encode('utf-8')).decode('ascii')

    headers = {
        "Authorization": "Basic {}".format(encoded)
    }

    payload = {
        "grant_type": "client_credentials"
    }

    r = requests.post(endpoint, data=payload, headers=headers)

    access_token = json.loads(r.text)['access_token']

    headers = {
        "Authorization": "Bearer {}".format(access_token)
    }

    return headers


def insert_row(cursor, data, table):
    placeholders = ', '.join(['%s'] * len(data))
    columns = ', '.join(data.keys())
    key_placeholders = ', '.join(['{0}=%s'.format(k) for k in data.keys()])
    sql = "INSERT INTO %s ( %s ) VALUES ( %s ) ON DUPLICATE KEY UPDATE %s" % (
        table, columns, placeholders, key_placeholders)
    logger.info(sql)
    cursor.execute(sql, list(data.values()) * 2)


def send_artist_info_after_add(cursor, user_id, artist_name):
    
    #sleep(2)
    
    query = "SELECT image_url, url FROM artists WHERE name = '{}'".format(artist_name)
    logger.info("send_artist_info_after_add 실행")
    logger.info(query)


    cursor.execute(query)
    raw = cursor.fetchall()
    


    image_url, url = raw[0]

    payload = {
        'template_type': 'generic',
        'elements': [
            {
                'title': "Artist Info: '{}'".format(org_artist_name),
                'image_url': image_url,
                'subtitle': 'information',
                'default_action': {
                    'type': 'web_url',
                    'url': url,
                    'webview_height_ratio': 'full'
                }
            }
        ]
    }

    # 챗봇에서 artist 정보 응답하기! (이미지 첨부된 템플릿)
    bot.send_attachment(user_id, "template", payload)


    # 챗봇에서 artist 장르 응답하기!
    query = "SELECT t2.genre FROM artists t1 JOIN artist_genres t2 ON t2.artist_id = t1.id WHERE upper(replace(t1.name,' ','')) = '{}'".format(artist_name)
    logger.info(query)
    cursor.execute(query)
    genres = []
    for (genre,) in cursor.fetchall():
        genres.append(genre)

    logger.info(genres)
    text = "Here are genres of {}".format(artist_name)
    bot.send_text(user_id, text)
    bot.send_text(user_id, ', '.join(genres))
    
    


def invoke_lambda(fxn_name, payload, invocation_type='Event'):
    lambda_client = boto3.client('lambda')

    invoke_response = lambda_client.invoke(
        FunctionName=fxn_name,
        InvocationType=invocation_type,
        Payload=json.dumps(payload)
    )

    if invoke_response['StatusCode'] not in [200, 202, 204]:
        logging.error("ERROR: Invoking lmabda function: '{0}' failed".format(fxn_name))

    return invoke_response


def search_artist(cursor, user_id, artist_name):
    headers = get_headers(client_id, client_secret)

    ## Spotify Search API
    params = {
        "q": artist_name,
        "type": "artist",
        "limit": "1"
    }

    r = requests.get("https://api.spotify.com/v1/search", params=params, headers=headers)

    raw = json.loads(r.text)

    # spotify에 해당 artist 정보가 없을 경우
    if raw['artists']['items'] == []:
        logger.info ("There are no artists in Spotify")
        bot.send_text(user_id, "Could not find artist. Please Try Again!")
        return

    artist = {}
    artist_raw = raw['artists']['items'][0]

    logger.info(artist_raw['name'])
    # spotify에서 artist 정보가 있는 경우, 데이터를 RDS에 저장한다.
    # if artist_raw['name'] == params['q']:
    # if artist_raw['name'].upper() == params['q']:
    if artist_raw['name']:

        artist.update(
            {
                'id': artist_raw['id'],
                'name': artist_raw['name'],
                'followers': artist_raw['followers']['total'],
                'popularity': artist_raw['popularity'],
                'url': artist_raw['external_urls']['spotify'],
                'image_url': artist_raw['images'][0]['url']
            }
        )

        for i in artist_raw['genres']:
            if len(artist_raw['genres']) != 0:
                # artist_genres 테이블에 데이터 저장
                insert_row(cursor, {'artist_id': artist_raw['id'], 'genre': i}, 'artist_genres')

        # artists 테이블에 데이터 저장
        insert_row(cursor, artist, 'artists')
        conn.commit()

        # 또 다른 lambda 함수(top_tracks) 호출
        # r = invoke_lambda('top_tracks', payload={'artist_id': artist_raw['id']})

        #
        logger.info("we added artist in RDS!")
        bot.send_text(user_id, "We added artist. Please wait a second!")
        #sleep(2)
        send_artist_info_after_add(cursor, user_id, artist_name)
        return


    # 검색한 artis 이름과 spotify의 artist 이름과 매칭이 되지 않을 때
    bot.send_text(user_id, "Could not find artist. Please Try Again!")
    return


def trans_korean_artist (cursor, user_id, artist_name):
    headers = get_headers(client_id, client_secret)

    ## Spotify Search API
    params = {
        "q": artist_name,
        "type": "artist",
        "limit": "1"
    }

    r = requests.get("https://api.spotify.com/v1/search", params=params, headers=headers)

    raw = json.loads(r.text)

    # spotify에 해당 artist 정보가 없을 경우
    if raw['artists']['items'] == []:
        bot.send_text(user_id, "Could not find artist. Please Try Again!")
        return

    artist_raw = raw['artists']['items'][0]
    logger.info(artist_raw['name'])

    return artist_raw['name']

 

 

(참고) facebook 챗봇 응답용 함수들 (fb_bot.py)

import sys
import os
import requests
import base64
import json
import logging
from enum import Enum

DEFAULT_API_VERSION = 4.0

## messaging types: "RESPONSE", "UPDATE", "MESSAGE_TAG"

class NotificationType(Enum):
    regular = "REGULAR"
    silent_push = "SILENT_PUSH"
    no_push = "no_push"

class Bot:

    def __init__(self, access_token, **kwargs):

        self.access_token = access_token
        self.api_version = kwargs.get('api_version') or DEFAULT_API_VERSION
        self.graph_url = 'https://graph.facebook.com/v{0}'.format(self.api_version)

    @property
    def auth_args(self):
        if not hasattr(self, '_auth_args'):
            auth = {
                'access_token': self.access_token
            }
            self._auth_args = auth
        return self._auth_args

    def send_message(self, recipient_id, payload, notification_type, messaging_type, tag):

        payload['recipient'] = {
            'id': recipient_id
        }

        #payload['notification_type'] = notification_type
        payload['messaging_type'] = messaging_type

        if tag is not None:
            payload['tag'] = tag

        request_endpoint = '{0}/me/messages'.format(self.graph_url)

        response = requests.post(
            request_endpoint,
            params = self.auth_args,
            json = payload
        )

        logging.info(payload)
        return response.json()

    def send_text(self, recipient_id, text, notification_type = NotificationType.regular, messaging_type = 'RESPONSE', tag = None):

        return self.send_message(
            recipient_id,
            {
                "message": {
                    "text": text
                }
            },
            notification_type,
            messaging_type,
            tag
        )

    def send_quick_replies(self, recipient_id, text, quick_replies, notification_type = NotificationType.regular, messaging_type = 'RESPONSE', tag = None):

        return self.send_message(
            recipient_id,
            {
                "message":{
                    "text": text,
                    "quick_replies": quick_replies
                }
            },
            notification_type,
            messaging_type,
            tag
        )

    def send_attachment(self, recipient_id, attachment_type, payload, notification_type = NotificationType.regular, messaging_type = 'RESPONSE', tag = None):

        return self.send_message(
            recipient_id,
            {
                "message": {
                    "attachment":{
                        "type": attachment_type,
                        "payload": payload
                    }
                }
            },
            notification_type,
            messaging_type,
            tag
        )

    def send_action(self, recipient_id, action, notification_type = NotificationType.regular, messaging_type = 'RESPONSE', tag = None):

        return self.send_message(
            recipient_id,
            {
                "sender_action": action
            },
            notification_type,
            messaging_type,
            tag
        )

    def whitelist_domain(self, domain_list, domain_action_type):

        payload = {
            "setting_type": "domain_whitelisting",
            "whitelisted_domains": domain_list,
            "domain_action_type": domain_action_type
        }

        request_endpoint = '{0}/me/thread_settings'.format(self.graph_url)

        response = requests.post(
            request_endpoint,
            params = self.auth_args,
            json = payload
        )

        return response.json()

    def set_greeting(self, template):

        request_endpoint = '{0}/me/thread_settings'.format(self.graph_url)

        response = requests.post(
            request_endpoint,
            params = self.auth_args,
            json = {
                "setting_type": "greeting",
                "greeting": {
                    "text": template
                }
            }
        )

        return response

    def set_get_started(self, text):

        request_endpoint = '{0}/me/messenger_profile'.format(self.graph_url)

        response = requests.post(
            request_endpoint,
            params = self.auth_args,
            json = {
                "get_started":{
                    "payload": text
                }
            }
        )

        return response

    def get_get_started(self):

        request_endpoint = '{0}/me/messenger_profile?fields=get_started'.format(self.graph_url)

        response = requests.get(
            request_endpoint,
            params = self.auth_args
        )

        return response

    def get_messenger_profile(self, field):

        request_endpoint = '{0}/me/messenger_profile?fields={1}'.format(self.graph_url, field)

        response = requests.get(
            request_endpoint,
            params = self.auth_args
        )

        return response

    def upload_attachment(self, url):

        request_endpoint = '{0}/me/message_attachments'.format(self.graph_url)

        response = requests.post(
            request_endpoint,
            params = self.auth_args,
            json = {
                "message":{
                    "attachment":{
                        "type": "image",
                        "payload": {
                            "is_reusable": True,
                            "url": url
                        }
                    }
                }
            }
        )

        return response
반응형

댓글