こんにちは。
アドテク領域のエンジニアをしています安部です。
こちらは TVer Advent Calendar 2023 の14日目の記事です。
13日目の記事で「ツールを作成した」という話をちらっと書きました。
今回はそのツールについて備忘として書きます。
ツールは作成時は半自動状態(起動トリガーが手動)、12月に全自動化となりました。
ツールを作ったきっかけ
4月には広告入稿システムのリプレイスがありました。
このシステムと対向システムでデータの同期しているのですが、データに差分が出ていないか確認する必要があります。
そのため対向システムから不整合が発生している可能性のあるデータを連携していただき、確認するという定常業務がありました。
不整合の可能性があるデータは平均20件ほどあり、毎日手動で確認することは現実的ではありませんでした。
どうにか自動化できないかとツールを作成することにしました。
ツール作成時の条件
- 連携されるデータはテキストデータとして受領する(Slackでメッセージの受信ができない)
- 対向システム側に対応をお願いすることはできない(S3に直接アップロードしてください、特定の形式のファイルでくださいなど)
- 項目が多すぎると全て連携されないことがある(途中で途切れたメッセージになる)
1と2がネックとなりデータを自動でLambdaへ取り込むことができませんでした。
そのため
- データを取得し.mdファイルを作成してS3に置く
→手動 - ファイルを読み込み、データを突合し、その結果をSlackの特定のチャンネルに投稿
→Lambdaで自動化
という形式の半自動化ツールを作成しました。
なぜAWS、Lambdaを選んだのか
システム構成図
・半自動
・全自動
ツールの詳細
①S3のバケットからファイルを取得
ライブラリ(boto3)をありがたく活用しました。
s3_client = boto3.client('s3') # ファイルを読み込み、使用できる状態にする response = s3_client.get_object(Bucket=MD_BUCKET_NAME, Key=md_key_name) lines = response['Body'].readlines()
②SQLの作成
ファイルを1行ずつ読み込みながらRDSへ投げるSQLを作成します。
③RDS接続・確認
RDSの接続情報についてはSecretsManagerに設定されているので、SecretsManagerから情報を取得しRDSへ接続します。
SecretsManagerからの取得はこんな感じ
REGION_NAMEにはリージョン名を、SECRETE_NAMEにはシークレットの名前を入れます。
シークレットキー(DB_USER_NAME,DB_USER_PASSWORD)を指定することでシークレットの値を取得することができます。
# Create a Secrets Manager client session = boto3.session.Session() client = session.client( service_name='secretsmanager', region_name=REGION_NAME ) try: get_secret_value_response = client.get_secret_value( SecretId=SECRETE_NAME ) except ClientError as e: raise e # Decrypts secret using the associated KMS key. secret_data = get_secret_value_response['SecretString'] secret = ast.literal_eval(secret_data) # 接続情報設定 db_user = secret['DB_USER_NAME'] db_pass = secret['DB_USER_PASSWORD']
この接続情報を用いてRDSへ接続します。
RDS接続の際はVPC設定が必要です。
こちらを参考に作成しました。
SQLの結果で不整合データの有り無しを確認します。
④Slackへ結果を送信
Incoming Webhook URLを使用してSlackへ結果をポストします。
他の投稿と差別化したかったので引用マークが付くように設定しています。
# SlackのwebhookURL WEB_HOOK_URL = 'WEBHOOK_URL' # alert-adcas-app-error宛 CHANNEL_ID = 'CHANNEL_ID' # Slack投稿の情報 send_data = { 'channel': CHANNEL_ID, 'username': 'CHECK_TOOL', 'icon_emoji': ':beer:', 'attachments':[{ #投稿の引用マーク部分の設定 'color': '#ffb6c1', 'text': '投稿テストです。', }], } send_text = json.dumps(send_data) request = urllib.request.Request( WEB_HOOK_URL, data=send_text.encode('utf-8'), method="POST" ) with urllib.request.urlopen(request) as response: response_body = response.read().decode('utf-8')
こんな感じで投稿されます。
⑤起動トリガーの設定
最後に起動トリガーを設定します。
S3にファイルを置いたタイミングで起動してほしいので、トリガーにS3を設定します。
関数の概要部分の「+トリガーを追加」からS3を選択し、ファイルを置くバケットを選択します。
ファイルの配置と同時になのでEvent typesはPUT(一応POSTも)、アップロードするファイルサイズが大きくなるとマルチパートになるらしいので、念のためMultipart upload completedも設定します。
Suffixに「.md」が設定してあるのは、mdファイルを読み込むという条件にしているためです。
これでファイルを配置と同時にツールが実行されるようになりました。
ファイルアップロードが手動というのがもったいないですが、しばらくこれで運用していきます。
全自動化
12月、ついにデータがSlackのメッセージで受信できるようになりました。
これで取り込みが自動化できます。
半自動化している部分は活用したいので、メッセージを取得しS3にファイルを配置するという関数を作成します。
①Slack Appの準備
Slack Appの作成方法は多くの方が書いていると思うのでそこに譲りまして、メッセージ取得・送信に必要なOAuthの設定だけご紹介します。
今回はBotとして使用するためBot Token Scopesに必要なScopeを追加します。
※画像の黒線部分はSlack Appのアプリ名が表示されています。
- groups:history
取得したいメッセージがプライベートチャンネルに投稿される場合の設定です。
パブリックチャンネルの場合は「channels:history」を追加します。
- chat:write
メッセージを投稿するための設定です。
- chat:write.customize
メッセージ投稿をする際にユーザー名やユーザーのアイコンを自由に設定できるようにするための設定です。
カスタマイズしない場合は「Basic Information」の「Display Information」で設定しているApp Nameとアイコンが表示されます。
②メッセージの取得
conversations.history を使用して取得します。
こんな感じ
def get_slack_message(): SLACK_BOT_TOKEN = os.environ['SLACK_BOT_TOKEN'] CHANNEL_ID = os.environ['CHANNEL_ID'] slackGetMessageUrl = 'https://slack.com/api/conversations.history' dt_today = datetime.combine(date.today(),time(0,0,0)) d_ut = datetime.timestamp(dt_today) data = { 'channel': CHANNEL_ID, 'oldest': d_ut } post_data = urllib.parse.urlencode(data) req = urllib.request.Request(slackGetMessageUrl, data=post_data.encode()) req.add_header('Authorization','Bearer ' + SLACK_BOT_TOKEN) with urllib.request.urlopen(req, timeout=1) as response: response_data = json.loads(response.read()) print(response_data)
トークンとメッセージを取得したいチャンネルのチャンネルIDはlambdaの環境変数に設定してそこから取得しています。
conversations.history には色々オプションを設定できますが、実行当日のメッセージのみ取得したいのでoldestに当日の0:00のunixtimeを設定しています。
オプションについてはドキュメントに書いてあるので必要に応じて追加します。
確認できるようにprintでメッセージを表示するようにしています。
③S3へ配置
取得したメッセージをこねこねして元のツールが読み取れるファイルを作成し、S3へ配置します。
実行ロールにS3のput権限をつけることをお忘れなく!
(設定タブ > アクセス権限 > ロール名を押すとIAMに飛ぶので許可を追加できます)
expost_data = ''; for line_data in reversed(response_data['messages']): #こねこねしてexport_dataに1行ずつ追加 expost_data += line_data['text']; S3_CLIENT = boto3.client('s3') response = S3_CLIENT.put_object( Body=expost_data, Bucket=MD_BUCKET_NAME, Key=MD_FILENAME )
この1行ずつ読み取るタイミングで
3.項目が多すぎると全て連携されないことがある(途中で途切れたメッセージになる)
のチェックも行っています。 連携の最後にendのプレフィックスを設定していただいているので、そのプレフィックスが存在しない場合はエラー通知をSlackに送信しファイル配置と読み取りの後続処理が動かないようにしています。
④Slack投稿部分のSlack API化
最後に、Slack投稿部分をwebhookからSlack APIに変更します。
権限は①で付与済みなのでリクエスト送信部分をちょっと変えるだけ。
slackPostMessageUrl = 'https://slack.com/api/chat.postMessage'; # Slack投稿の情報(ここは変わらず) send_data = { 'channel': CHANNEL_ID, 'username': 'CHECK_TOOL', 'icon_emoji': ':beer:', 'attachments':[{ #投稿の引用マーク部分の設定 'color': '#ffb6c1', 'text': '投稿テストです。', }], } send_text = json.dumps(send_data) request = urllib.request.Request( slackPostMessageUrl, data=send_text.encode('utf-8'), method="POST" ) # トークン情報だけ追加 req.add_header('Authorization','Bearer ' + SLACK_BOT_TOKEN) with urllib.request.urlopen(request) as response: response_body = response.read().decode('utf-8')
⑤起動トリガーの設定
EventBridgeで定時起動するように設定します。
Lambdaの設定タブ > トリガー > トリガーを追加 でEventBridgeを選択し、cron形式で書きました。
UTC時刻で記載することに注意です。
私はJSTで設定し、11:30に動かしたいのに20:30に起動したことがありました。
お恥ずかしい。
これで夢の全自動化完了です。
ツール作成してどうだった?
気持ちが楽になった
そもそも最初はSQL手動実行だったので、手作業でミスしないかというプレッシャーがありました。
ツール化することで手動でミスする不安から開放されました。 また、ファイル作って、AWSにアクセスして、S3にファイルを置いて…という時間にして3分もかからない対応ですがやるのとやらないのでは気持ちが違います。
長期休みのときもチェック漏れが起こらないので安心です。Python好きだなぁ
このツールでほぼ初めてPythonを触りましたが、個人的に書きやすくてもっと使いたいなと思いました。
Pythonを使っているシステムはあまりないのですが、ツール職人するときは積極的に使っていこうと思います。- 知見が広がった
業務で使わないAWSの部分(EventBridgeなど)も触ることができて勉強になりました。
もっと使いこなせるようになりたい!