Article:

バイトでgolangでdynamodbを叩くlambdaを書く機会があったので、そのときに調べたことを軽くメモしています。 goはほんと触りたてなので、あたまわるコードや誤りが存分に含まれている可能性高いです。

2020/11/27 query+batchの項を追加

前置き

便利なライブラリがいろいろあるようですが、ここでは公式のものを用いることにします。 詳細は公式ドキュメントで確認可能です。

様々なオペレーションが用意されています。ここではPut, Get, Delete, Update, Query, Scanについて紹介します。 テーブル自体の作成・削除やその設定の変更については紹介しません。

概要

どの関数も、基本は次の形になっています。

1
func (c *DynamoDB) PutItem(input *PutItemInput) (*PutItemOutput, error)

どの関数も構造体のメンバ関数として定義されているので、Dynamodbのクライアントをdbとして作成し(コードは後述)、

1
res, err := db.PutItem(param)

と呼び出す事ができます。

このPutItem()では引数の型が*PutItemInputとなっていますが、GetItem()では*GetItemInputと、 それぞれの関数専用の構造体が用意されています。 それぞれの構造体の値を書き換えて関数に渡すことで、クエリをカスタマイズすることができます。

それぞれのInputは個別に特徴的なパラメーターを持ってはいますが、基本は同じなのでまずはそれを紹介します。

TableName

ほぼすべてでRequiredなフィールドです。操作を行いたいテーブルの名前を指定します。型は*stringです。

ちなみに、aws.String(string)を用いると*stringになります。 これはaws-sdk-go全体で多用します。

Key

GetやDeleteなどのオペレーションは、操作を行うレコードのパーティションキー(及びソートキー)を指定します。 ソートキーを指定する可能性があるので型はmap[string]*dynamodb.AttributeValueです。

dynamodb.AttributeValueはdynamodbのデータ型記述子に沿ってデータを記述するためのもので、 これは実際に投げたり返ってきたりするデータと似ています。

リテラルではこんな感じです。

1
2
3
{
	S: aws.String("your_string"),
}

なんてことないですね。 ここの先頭のSはstringのことです。他にも次の種類があります。

  • S – 文字列
  • N – 数値
  • B – バイナリ
  • BOOL – ブール
  • NULL – Null
  • M – マップ
  • L – リスト
  • SS – 文字列セット
  • NS – 数値セット
  • BS – バイナリセット

注意しないといけないのは、値の型はstringなので、例えば数値を入力したいときは

1
	N: aws.String(strconv.Itoa( your_number )),

のようにstringにキャストしてあげなければならないことです。

なんちゃらExpression Condition

それぞれのオペレーションではConditionExpressionやFilterExpressionなどを指定することで、 書き換えを制御したり、読み込むデータを制限したりすることができます。 型は*stringです。

Expressionは関数として’attribute_exists’, ‘attribute_not_exists’, ‘attribute_type’, ‘contains’, ‘begins_with size’が、 演算子として ‘=’, ‘<>’, ‘<’, ‘>’, ‘<=’, ‘>=’, ‘BETWEEN’, ‘IN’, ‘AND’, ‘OR’, ‘NOT’が使えるそうですが、 対象の型やオペレーションによってこの中から限られます。

例として、一番シンプルな「my_keyがmy_valueにマッチする」というexpressionは

1
"my_key = my_value" // 未完成!!!

と気持ち的には書けるのですが、ここで注意しないといけないのは Item名及び値はプレースホルダを使用しないといけない ことです。 ので、一度my_keyを#MY_KEYに、my_valueを:my_valueと置いて、こう書きます。(#や:は慣習的なもののようです)

1
"#MY_KEY = :my_value"

そうしたら次はこの#MY_KEYや:my_keyに値を代入したいですよね。 このときに使うのが、次に紹介するExpressionAttributeNames, ExpressionAttributeValuesです。

ExpressionAttributeNames

Expressionで使用したプレースホルダのうち、アイテム名のみを代入することができるフィールドです。 型はmap[string]*stringです。 Expressionで使用したプレースホルダがすべてきっちり指定されていないとエラーが出ます。多すぎてもダメです!!!

ExpressionAttributeValues

Expressionで使用したプレースホルダのうち、値のみを代入することができるフィールドです。 型はmap[string]*dynamodb.AttributeValueです。 こちらもExpressionで使用したプレースホルダがすべてきっちり指定されていないとエラーが出ます。多すぎてもダメです!!!

後述のコード例のところにそれぞれのパラメーターのドキュメントへのリンクを貼っておくので、あとはそちらを参考にしてください。 というより、ここまで書いておいて何ですが、このドキュメントがかなり丁寧なのでむしろここだけ見てればいい気がします。

前準備

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import(
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/service/dynamodb"
	"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
)


region := "ap-northeast-1" // 日本の首都は.tokyo
sess := session.Must(session.NewSession())
db := dynamodb.New(sess, aws.NewConfig().WithRegion(region))

Put

PutItemInput

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
params := &dynamodb.PutItemInput {
	TableName: aws.String("your_table_name"),
	Item: map[string]*dynamodb.AttributeValue {
		"your_item_name1": {
			S: aws.String("your_string"),
		},
		"youte_item_name2": {
			N: aws.String(strconv.Itoa( your_number )),
		},
	},
}

_, err := db.PutItem(params)
if putErr != nil {
	panic(err)
}

シンプルですね。概要で述べたことしか使われていませんが、付け加えるとすればPutオペレーターで既存の値の上書きが可能です(つまりupdate)。 その場合、ReturnValue: ALL_OLDを指定すれば古い値が帰ってくるようです。

Get

GetItemInput

APIから帰ってくるデータはデータ型記述子にエンコードされているので、 そのままだと非常に使いにくいです。なので、予めstructを用意しておいてdynamodbattribute.UnmarshalMapを 用いることで目的の構造体にunmarshalできます。

dynamodbavタグでdynamodb側のItem名を指定できます。しない場合はstructのメンバ変数名が利用されます。大文字小文字の区別はされないようです(たぶん)。

1
2
3
4
5
// unmarshal先のstruct例
type MyItems struct {
	Item1 string   `dynamodbav:"your_dynamo_item_name1"`
	Item2 number   `dynamodbav:"your_dynamo_item_name2"`
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
params := &dynamodb.GetItemInput {
	TableName: aws.String("your_table_name"),
	Key: map[string]*dynamodb.AttributeValue {
		"your_key_name": {
			S: aws.String("your_key_value"),
		},
	},
}

result, err:= db.GetItem(params)
if err != nil {
	panic(err)
}

myitems := MyItems{}
err = dynamodbattribute.UnmarshalMap(result.Item, &myitems)
if err != nil {
	panic(err)
}

fmt.Println(myitems)

Delete

DeleteItemInput

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
params := &dynamodb.DeleteItemInput {
	TableName: aws.String("your_table_name"),
	Key: map[string]*dynamodb.AttributeValue{
		"your_key_name": {
			S: aws.String("your_key_value"),
		},
	},
}

_, err:= db.DeleteItem(params)
if err != nil {
	panic(err)
}

キーを指定してデリート。これもシンプルですね。 これもReturnValue: ALL_OLDで消す直前の値を取得できそうです。

Update

UpdateItemInput

単純に値を更新するだけならPUTで事足りるので、ここではアトミックカウンタをインクリメントしてみます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
params := &dynamodb.UpdateItemInput {
	TableName: aws.String("your_table_name"),
	Key: map[string]*dynamodb.AttributeValue {
		"your_key_name": {
			S: aws.String("your_key_value"),
		},
	},
	UpdateExpression: aws.String("add #item :addend"),
	ExpressionAttributeNames: map[string]*string {
		"#item": aws.String(item),
	},
	ExpressionAttributeValues: map[string]*dynamodb.AttributeValue {
		":addend": {
			N: aws.String(strconv.Itoa(YOUR_ADDEND_NUMBER)),
		},
	},
}

_, err := db.UpdateItem(params)
if err != nil {
	panic(err)
}

これも概要で述べたフィールドしか利用していませんね。ReturnValueで 更新前のすべての要素・更新前の、これから更新する要素・更新後の〜といろいろ 取得する値を指定できるようです(DOCみて)。

Query

QueryInput

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
params := &dynamodb.QueryInput {
	TableName: aws.String("your_table_name"),
	IndexName: aws.String("your_INDEX_name"), // セカンダリインデックスを利用する場合
	KeyConditionExpression: aws.String("#Item_name = :item_name"),
	ExpressionAttributeNames: map[string]*string{
		"#Period": aws.String("Item_name"),
	},
	ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
		":item_name": {
			N: aws.String("your_item_value"),
		},
	},
}

result, err := db.Query(queryParams)
if err != nil {
	fmt.Println(err)
	panic(err)
}

myitems := []MyGSIItems{}
err = dynamodbattribute.UnmarshalListOfMaps(result.Items, &myitems)
if err != nil {
	fmt.Println(err.Error())
}

Queryではセカンダリインデックスを利用可能なので、その設定項目があります。 IndexNameはセカンダリインデックスが設定されたItemの名前ではなく、AWSコンソールのDynamodbの該当テーブルの インデックスタブないに記載されているインデックス名でなければならない点に注意です。 デフォルトだと"id-date-index"みたいな命名のやつです。

また、Queryでは複数要素が帰ってくるのでデータのunmarshalにはdynamodbattribute.UnmarshalListOfMapsを使います。

Scan

ScanInput

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
params := &dynamodb.ScanInput{
	TableName: aws.String("your_table_name"),
	ProjectionExpression: aws.String("#YOUR_PLACE1, #YOUR_PLACE2, #YOUR_PLACE3"),
	ExpressionAttributeNames: map[string]*string {
		"#YOUR_PLACE1": aws.String("your_item_name1"),
		"#YOUR_PLACE2": aws.String("your_item_name2"),
		"#YOUR_PLACE3": aws.String("your_item_name3"),
	},
}

result, err:= db.Scan(params)
if err != nil {
	panic(err)
}

myitems := []My3Items{}
err = dynamodbattribute.UnmarshalListOfMaps(result.Items, &items)
if err != nil {
	panic(err)
}

fmt.Println(items)

ここではProjectionExpressionを使ってみました。個々で指定したItemのみを取得することができます。後は同じですね。

Query + Batch

2020/11/27追記

DynamoDBは一度のQueryで取得できるデータの件数が非常に小さく、大量のデータを取得したい場合は 複数回のクエリに分ける必要があります。と言っても特に小難しいことをする必要はありません。

前述したQueryのコードで通常のクエリを投げた場合に、今回のクエリで返しきれないデータがあった場合に そのデータの始まりのキーを"LastEvaluatedKey"というフィールドで返してくれます。そして、QueryInputの"ExclusiveSmartKey" にそのLastEvaluatedKeyを設定するだけで、そこからのデータを渡してくれます。これをLastEvaluatedKeyがnilになるまで繰り返せば データを全件取得することができます。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
params := &dynamodb.QueryInput {
	TableName: aws.String("your_table_name"),
	IndexName: aws.String("your_INDEX_name"), // セカンダリインデックスを利用する場合
	KeyConditionExpression: aws.String("#Item_name = :item_name"),
	ExpressionAttributeNames: map[string]*string{
		"#Period": aws.String("Item_name"),
	},
	ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
		":item_name": {
			N: aws.String("your_item_value"),
		},
	},
}

myitems := []MySGIItems{} //ここに追加していく
var lek map[string]*dynamodb.AttributeValue = nil //lastEvaluatedKey

for {
	if lek != nil { // 初回以外は開始位置を設定する
		queryParams.ExclusiveStartKey = lek
	}

	result, err := d.db.Query(queryParams)
	if err != nil {
		log.Println(err)
		panic(err)
	}

	tmp := []MySGIItems{}
	err = dynamodbattribute.UnmarshalListOfMaps(result.Items, &tmp)
	if err != nil {
		log.Println(err.Error())
	}

	myitems = append(myitems, tmp...) // 今回のループで読み込んだぶんを追加

	if result.LastEvaluatedKey == nil { // lastEvaluatedKeyがnilならすべて読みこんだ
		break
	}

	lek = result.LastEvaluatedKey
}