# todo: lock in token program key and remove token_program parameters
from enum import IntEnum
from typing import NamedTuple, Optional
from agora.keys import PublicKey, ED25519_PUB_KEY_SIZE
from agora.solana import system
from agora.solana.instruction import Instruction, AccountMeta
from agora.solana.transaction import Message
# Reference: https://github.com/solana-labs/solana-program-library/blob/11b1e3eefdd4e523768d63f7c70a7aa391ea0d02/token/program/src/state.rs#L125 # noqa: E501
ACCOUNT_SIZE = 165
PROGRAM_KEY = PublicKey.from_base58("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA")
[docs]class Command(IntEnum):
INITIALIZE_MINT = 0
INITIALIZE_ACCOUNT = 1
INITIALIZE_MULTISIG = 2
TRANSFER = 3
APPROVE = 4
REVOKE = 5
SET_AUTHORITY = 6
MINT_TO = 7
BURN = 8
CLOSE_ACCOUNT = 9
FREEZE_ACCOUNT = 10
THAW_ACCOUNT = 11
TRANSFER_2 = 12
APPROVE_2 = 13
MINT_TO_2 = 14
BURN_2 = 15
[docs]class AuthorityType(IntEnum):
MINT_TOKENS = 0
FREEZE_ACCOUNT = 1
ACCOUNT_HOLDER = 2
CLOSE_ACCOUNT = 3
[docs]def get_command(m: Message, index: int) -> Command:
if index >= len(m.instructions):
raise ValueError(f"instruction doesn't exist at {index}")
i = m.instructions[index]
if m.accounts[i.program_index] != PROGRAM_KEY:
raise ValueError('incorrect program')
if len(i.data) == 0:
raise ValueError('token instruction missing data')
return Command(i.data[0])
# Reference: https://github.com/solana-labs/solana-program-library/blob/b011698251981b5a12088acba18fad1d41c3719a/token/program/src/instruction.rs#L41-L55 # noqa: e501
[docs]def initialize_account(account: PublicKey, mint: PublicKey, owner: PublicKey) -> Instruction:
"""
// Accounts expected by this instruction:
//
// 0. `[writable]` The account to initialize.
// 1. `[]` The mint this account will be associated with.
// 2. `[]` The new account's owner/multisignature.
// 3. `[]` Rent sysvar
:return:
"""
return Instruction(
PROGRAM_KEY,
bytes([Command.INITIALIZE_ACCOUNT]),
[
AccountMeta.new(account, False),
AccountMeta.new_read_only(mint, False),
AccountMeta.new_read_only(owner, False),
AccountMeta.new_read_only(system.RENT_SYS_VAR, False),
]
)
[docs]def transfer(source: PublicKey, dest: PublicKey, owner: PublicKey, amount: int) -> Instruction:
"""
// Accounts expected by this instruction:
//
// * Single owner/delegate
// 0. `[writable]` The source account.
// 1. `[writable]` The destination account.
// 2. `[signer]` The source account's owner/delegate.
//
// * Multisignature owner/delegate
// 0. `[writable]` The source account.
// 1. `[writable]` The destination account.
// 2. `[]` The source account's multisignature owner/delegate.
// 3. ..3+M `[signer]` M signer accounts.
:return:
"""
data = bytearray()
data.append(Command.TRANSFER)
data.extend(amount.to_bytes(8, 'little'))
return Instruction(
PROGRAM_KEY,
data,
[
AccountMeta.new(source, False),
AccountMeta.new(dest, False),
AccountMeta.new(owner, True),
]
)
[docs]def set_authority(
account: PublicKey, current_authority: PublicKey, authority_type: AuthorityType,
new_authority: Optional[PublicKey] = None
) -> Instruction:
data = bytearray([Command.SET_AUTHORITY, authority_type])
if not new_authority:
data.append(0)
else:
data.append(1)
data.extend(new_authority.raw)
return Instruction(
PROGRAM_KEY,
data,
[
AccountMeta.new(account, False),
AccountMeta.new_read_only(current_authority, True),
]
)
[docs]def close_account(account: PublicKey, dest: PublicKey, owner: PublicKey) -> Instruction:
return Instruction(
PROGRAM_KEY,
bytes([Command.CLOSE_ACCOUNT]),
[
AccountMeta.new(account, False),
AccountMeta.new(dest, False),
AccountMeta.new_read_only(owner, True),
]
)
[docs]class DecompiledInitializeAccount(NamedTuple):
account: PublicKey
mint: PublicKey
owner: PublicKey
[docs]def decompile_initialize_account(m: Message, index: int) -> DecompiledInitializeAccount:
if index >= len(m.instructions):
raise ValueError(f"instruction doesn't exist at {index}")
i = m.instructions[index]
if m.accounts[i.program_index] != PROGRAM_KEY:
raise ValueError('incorrect program')
if len(i.accounts) != 4:
raise ValueError(f'invalid number of accounts: {len(i.accounts)}')
if len(i.data) != 1:
raise ValueError(f'invalid instruction data size: {len(i.data)}')
if i.data[0] != Command.INITIALIZE_ACCOUNT:
raise ValueError(f'invalid instruction data: {i.data}')
return DecompiledInitializeAccount(
m.accounts[i.accounts[0]],
m.accounts[i.accounts[1]],
m.accounts[i.accounts[2]],
)
[docs]class DecompiledTransfer(NamedTuple):
source: PublicKey
dest: PublicKey
owner: PublicKey
amount: int
[docs]def decompile_transfer(m: Message, index: int) -> DecompiledTransfer:
if index >= len(m.instructions):
raise ValueError(f"instruction doesn't exist at {index}")
i = m.instructions[index]
if m.accounts[i.program_index] != PROGRAM_KEY:
raise ValueError('incorrect program')
if len(i.accounts) != 3:
raise ValueError(f'invalid number of accounts: {len(i.accounts)}')
if len(i.data) != 9:
raise ValueError(f'invalid instruction data size: {len(i.data)}')
if i.data[0] != Command.TRANSFER:
raise ValueError(f'invalid instruction data: {i.data}')
return DecompiledTransfer(
m.accounts[i.accounts[0]],
m.accounts[i.accounts[1]],
m.accounts[i.accounts[2]],
int.from_bytes(i.data[1:], 'little')
)
[docs]class DecompileSetAuthority(NamedTuple):
account: PublicKey
current_authority: PublicKey
authority_type: AuthorityType
new_authority: Optional[PublicKey]
[docs]def decompile_set_authority(m: Message, index: int) -> DecompileSetAuthority:
if index >= len(m.instructions):
raise ValueError(f"instruction doesn't exist at {index}")
i = m.instructions[index]
if m.accounts[i.program_index] != PROGRAM_KEY:
raise ValueError('incorrect program')
if len(i.accounts) != 2:
raise ValueError(f'invalid number of accounts: {len(i.accounts)}')
if len(i.data) < 3:
raise ValueError(f'invalid instruction data size: {len(i.data)}')
if i.data[0] != Command.SET_AUTHORITY:
raise ValueError(f'invalid instruction data: {i.data}')
if i.data[2] == 0 and len(i.data) != 3:
raise ValueError(f'invalid instruction data size: {len(i.data)}')
if i.data[2] == 1 and len(i.data) != 3 + ED25519_PUB_KEY_SIZE:
raise ValueError(f'invalid instruction data size: {len(i.data)}')
return DecompileSetAuthority(
m.accounts[i.accounts[0]],
m.accounts[i.accounts[1]],
AuthorityType(i.data[1]),
PublicKey(i.data[3:]) if i.data[2] == 1 else None,
)
[docs]class DecompileCloseAccount(NamedTuple):
account: PublicKey
destination: PublicKey
owner: PublicKey
[docs]def decompile_close_account(m: Message, index: int) -> DecompileCloseAccount:
if index >= len(m.instructions):
raise ValueError(f"instruction doesn't exist at {index}")
i = m.instructions[index]
if m.accounts[i.program_index] != PROGRAM_KEY:
raise ValueError('incorrect program')
if len(i.data) != 1 or i.data[0] != Command.CLOSE_ACCOUNT:
raise ValueError(f'invalid instruction data: {i.data}')
# note: we do < 3 instead of != 3 in order to support multisig cases.
if len(i.accounts) < 3:
raise ValueError(f'invalid number of accounts: {len(i.accounts)}')
return DecompileCloseAccount(
m.accounts[i.accounts[0]],
m.accounts[i.accounts[1]],
m.accounts[i.accounts[2]],
)