LambdaからAmazon Elasticsearchにクエリを投げるためのテンプレメモ。LambdaはSAMで用意するが、Amazon Elasticsearchのドメインは事前に準備しておく。RuntimeはPython 3.8。Elasticsearchの接続にはelasticsearchのモジュールを利用する。
権限周りとかけっこうたいへん。
前提条件
- SAM CLI 1.6.2
- LambdaのRuntimeはPython 3.8
- 今回はSNSをトリガにしているがそこは本質ではない
- Amazon ELasticsearch 7.7
- これはドメインを事前に作っておくものとする
ディレクトリ構造
以下のようなディレクトリ構造で実行するものとする。
.
├── hello_world_function
│ ├── hello_world
│ │ ├── __init__.py
│ │ └── app.py
│ └── requirements.txt
├── samconfig.toml
└── template.yml
hello_world.py
テンプレだからhello_worldだけど好きにするよろし。
実行時間のために、ESに接続するのはコンテナ起動時にしているわけだが、これで安定的に動くのかは試行中。丁寧にやるなら、下記コードいうところのes_client.close()を毎回実行するべきだと思うが、それをするとMEM 256MBで実行時間が200ms -> 600ms程度に跳ね上がってしまう。。REST API投げるだけなら違うんだろうか。
import os
import json
import datetime
import boto3
import requests
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
'us-west-2',
'es',
session_token=credentials.token
)
es_client = Elasticsearch(
hosts=[{'host': os.environ['ES_ENDPOINT'], 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
def to_es(payload):
print(payload)
try:
es_client.create(index='test', id=datetime.datetime.now().strftime("%Y%m%d%H%M%S"), body=payload)
except Exception as err:
print(err)
def lambda_handler(event, context):
payload = {'test': 'test'}
to_es(payload)
とりあえずエラーは握りつぶしている。
requirements.txt
sixとregexはなくていいと思う。
requests
requests-aws4auth
elasticsearch
six
regex
template.yml
SNSトリガにしているが、そこはどうでもいい。
PoliciesでAmazonESFullAccessにしている。ES側のアクセスポリシーは特に設定しないでクエリ投げられた。
AWSTemplateFormatVersion: 2010-09-09
Description: >-
sam-app
Transform:
- AWS::Serverless-2016-10-31
Globals:
Function:
Timeout: 3
Resources:
SimpleTopic:
Type: AWS::SNS::Topic
SNSPayloadLogger:
Type: AWS::Serverless::Function
Properties:
Description: A Lambda function that logs the payload of messages sent to an associated SNS topic.
Runtime: python3.8
CodeUri: hello_world_function
Handler: hello_world/app.lambda_handler
Environment:
Variables:
ES_ENDPOINT: xxxx
Events:
SNSTopicEvent:
Type: SNS
Properties:
Topic: !Ref SimpleTopic
MemorySize: 256
Timeout: 100
Policies:
- AWSLambdaBasicExecutionRole
- arn:aws:iam::aws:policy/AmazonESFullAccess
samconfig.toml
sam build; sam deploy –guided で適当に生成。
ESで確認
Amazon Elasticsearchは最初からkibanaがあってとてもよい。
SQL実行文で、 select * from test でデータが投げられていることを確認する。
参考
なんだか色々と苦労したので、先人の足跡とてもありがたかったです。
- AWS LambdaからElasticsearch Serviceに接続するときに、IAM Roleを使う
- Lambda から elasticsearch service に何かする [cloudpack OSAKA blog] | cloudpack.media
- Amazon Elasticsearch Service とlambda(Python3) を一瞬で疎通するメモ – Qiita
- Amazon Elasticsearch Service への HTTP リクエストの署名 – Amazon Elasticsearch Service
- 本家
関連記事
aws の記事
- [2021年5月13日] Lambda関数でDeadLetterQueue(DLQ)を試す
- [2021年1月25日] AWS Lambdaのロギングを考える
- [2021年1月12日] AWS Lambdaをデプロイした時のダウンタイム
- [2020年12月8日] DynamoDB から DocumentClient で get() した StringSet型 の値を取り出す
- [2020年11月20日] AWS Lambdaでイベント起動した時にeventに渡されるパラメータあれこれ
- ---本記事---
- [2020年10月12日] AWS SAM CLIでSNSからLambdaを起動する(Python 3.8)、S3からSNSに通知する
- [2020年5月13日] Grafana on EC2 で Let’s Encrypt 使ってSSL
- [2020年4月28日] httpでPOSTする受け口をSAM CLI + API Gateway + Lambdaで作る
- [2020年4月26日] DynamoDBで多対多のテーブル設計
- [2020年4月17日] S3に指定のバケットにファイル生成したらSQSに通知
スポンサーリンク