Let’s work with DynamoDB with the Python Boto3 library.
We can
- list tables
- retrieve table information including keys and indexes
- create a table
- read, write, and delete items
- delete a table
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html
List Tables
import boto3
client = boto3.client('dynamodb')
# list of tables
dynamoDbTables = client.list_tables()
print('======= Tables =========')
print(dynamoDbTables['TableNames'])
Create Table
import boto3
client = boto3.client('dynamodb')
# create a table
response = client.create_table(
AttributeDefinitions=[
{
'AttributeName': 'Name',
'AttributeType': 'S',
},
{
'AttributeName': 'Code',
'AttributeType': 'S',
},
{
'AttributeName': 'Price',
'AttributeType': 'N',
},
],
KeySchema=[
{
'AttributeName': 'Name',
'KeyType': 'HASH',
},
{
'AttributeName': 'Code',
'KeyType': 'RANGE',
},
],
ProvisionedThroughput={
'ReadCapacityUnits': 2,
'WriteCapacityUnits': 2,
},
LocalSecondaryIndexes=[
{
'IndexName': 'PriceIndex',
'KeySchema': [
{
'AttributeName': 'Name',
'KeyType': 'HASH'
},
{
'AttributeName': 'Price',
'KeyType': 'RANGE'
},
],
'Projection': {
'ProjectionType': 'ALL'
}
},
],
TableName='Product',
)
print(response)
# list of tables
dynamoDbTables = client.list_tables()
print('======= Tables =========')
print(dynamoDbTables['TableNames'])
Retrieve Table Information
import boto3
def keyExists(tableDictioray, key):
return key in tableDictioray.keys()
client = boto3.client('dynamodb')
# list of tables
dynamoDbTables = client.list_tables()
print('======= Tables =========')
print(dynamoDbTables['TableNames'])
for table in dynamoDbTables['TableNames']:
tableInfo = client.describe_table(TableName = table)
# general table info
print()
print('\t======= Table General Info =========')
print('\tName:', tableInfo['Table']['TableName'])
print('\tARN:', tableInfo['Table']['TableArn'])
print('\tStatus:', tableInfo['Table']['TableStatus'])
print('\tCreated:', tableInfo['Table']['CreationDateTime'])
print('\tProvisioned RCUs:', tableInfo['Table']['ProvisionedThroughput']['ReadCapacityUnits'])
print('\tProvisioned WCUs:', tableInfo['Table']['ProvisionedThroughput']['WriteCapacityUnits'])
print('\tItem Count:', tableInfo['Table']['ItemCount'])
print('\tTable Size [Bytes]:', tableInfo['Table']['TableSizeBytes'])
print('\tDeletion Protection Enabled:', tableInfo['Table']['DeletionProtectionEnabled'])
# Primary Key: Partition key & Sort Key
print('\t======= Primary Key =========')
for key in tableInfo['Table']['KeySchema']:
print('\t', key['AttributeName'], key['KeyType'])
# Local Secondary Indexes
print('\t======= Local Secondary Indexes =========')
if keyExists(tableInfo['Table'], 'LocalSecondaryIndexes'):
for lsi in tableInfo['Table']['LocalSecondaryIndexes']:
print('\t', lsi['IndexName'])
for key in lsi['KeySchema']:
print('\t\t', key['AttributeName'], key['KeyType'])
# Global Secondary Indexes
print('\t======= Global Secondary Indexes =========')
if keyExists(tableInfo['Table'], 'GlobalSecondaryIndexes'):
for gsi in tableInfo['Table']['GlobalSecondaryIndexes']:
print('\t', gsi['IndexName'])
for key in gsi['KeySchema']:
print('\t\t', key['AttributeName'], key['KeyType'])
Put Items
import boto3
client = boto3.client('dynamodb')
# create a table
response = client.put_item(
Item={
'Name': { 'S': 'Bread' },
'Code': { 'S': 'B1' },
'Price': { 'N': '4.99' },
},
TableName='Product',
)
print(response)
response = client.put_item(
Item={
'Name': { 'S': 'Bread' },
'Code': { 'S': 'B2' },
'Price': { 'N': '7.00' },
},
TableName='Product',
)
print(response)
response = client.put_item(
Item={
'Name': { 'S': 'Toy Car' },
'Code': { 'S': 'TC1' },
'Price': { 'N': '39.00' },
},
TableName='Product',
)
print(response)
response = client.put_item(
Item={
'Name': { 'S': 'Laptop' },
'Code': { 'S': 'LPT1' },
'Price': { 'N': '890.00' },
},
TableName='Product',
)
print(response)
# list of tables
productTables = client.describe_table(TableName='Product')
print('======= Product Table =========')
print('Item Count:', productTables['Table']['ItemCount'])
Scan Items
import boto3
client = boto3.client('dynamodb')
# scan all
response = client.scan(
TableName = 'Product',
Limit = 10
)
print('==== Sacn - All ====')
print('Count:', response['Count'])
for item in response['Items']:
print(f'\tName: {item["Name"]["S"]}, Code: {item["Code"]["S"]}, Price: ${float(item["Price"]["N"]):,.2f}')
# scan projection
response = client.scan(
TableName = 'Product',
ExpressionAttributeNames={
'#ProductCode': 'Code',
},
ProjectionExpression = '#ProductCode, Price',
Limit = 10
)
print()
print('==== Sacn - Projection ====')
print('Count:', response['Count'])
for item in response['Items']:
print(f'\tCode: {item["Code"]["S"]}, Price: ${float(item["Price"]["N"]):,.2f}')
# scan filtering
response = client.scan(
TableName = 'Product',
ExpressionAttributeValues={
':price': {
'N': '100',
},
},
FilterExpression = 'Price >= :price',
Limit = 10
)
print()
print('==== Sacn - Filtering ====')
print('Count:', response['Count'])
for item in response['Items']:
print(f'\tName: {item["Name"]["S"]}, Code: {item["Code"]["S"]}, Price: ${float(item["Price"]["N"]):,.2f}')
Query Items
import boto3
client = boto3.client('dynamodb')
# query all
response = client.query(
TableName = 'Product',
ExpressionAttributeValues={
':productName': {
'S': 'Bread',
},
},
ExpressionAttributeNames={
'#ProductName': 'Name',
},
KeyConditionExpression = '#ProductName = :productName',
Limit = 10
)
print('==== Query - All ====')
print('Count:', response['Count'])
for item in response['Items']:
print(f'\tName: {item["Name"]["S"]}, Code: {item["Code"]["S"]}, Price: ${float(item["Price"]["N"]):,.2f}')
# query projection
response = client.query(
TableName = 'Product',
ExpressionAttributeValues={
':productName': {
'S': 'Laptop',
},
},
ExpressionAttributeNames={
'#ProductName': 'Name',
},
KeyConditionExpression = '#ProductName = :productName',
ProjectionExpression = 'Code, Price',
Limit = 10
)
print()
print('==== Query - Projection ====')
print('Count:', response['Count'])
for item in response['Items']:
print(f'\Code: {item["Code"]["S"]}, Price: ${float(item["Price"]["N"]):,.2f}')
# query filtering
response = client.query(
TableName = 'Product',
ExpressionAttributeValues={
':productName': {
'S': 'Bread',
},
':price': {
'N': '5',
},
},
ExpressionAttributeNames={
'#ProductName': 'Name',
},
KeyConditionExpression = '#ProductName = :productName',
FilterExpression = 'Price >= :price',
Limit = 10
)
print()
print('==== Query - Filtering ====')
print('Count:', response['Count'])
for item in response['Items']:
print(f'\tName: {item["Name"]["S"]}, Code: {item["Code"]["S"]}, Price: ${float(item["Price"]["N"]):,.2f}')
Delete Table
import boto3
client = boto3.client('dynamodb')
# list of tables
dynamoDbTables = client.list_tables()
print('======= Tables =========')
print(dynamoDbTables['TableNames'])
# create a table
response = client.delete_table(
TableName='Product',
)
print(response)
# list of tables
dynamoDbTables = client.list_tables()
print('======= Tables =========')
print(dynamoDbTables['TableNames'])
for table in dynamoDbTables['TableNames']:
tableInfo = client.describe_table(TableName = table)
print('\tName:', tableInfo['Table']['TableName'])
print('\tStatus:', tableInfo['Table']['TableStatus'])
