passforios-gopenpgp/crypto/attachment_manual_test.go

176 lines
5.2 KiB
Go

package crypto
import (
"bytes"
"io"
"testing"
"github.com/pkg/errors"
)
func TestManualAttachmentProcessor(t *testing.T) {
pgp.latestServerTime = 1615394034
defer func() { pgp.latestServerTime = testTime }()
passphrase := []byte("wUMuF/lkDPYWH/0ZqqY8kJKw7YJg6kS")
pk, err := NewKeyFromArmored(readTestFile("att_key", false))
if err != nil {
t.Error("Expected no error while unarmoring private key, got:" + err.Error())
}
uk, err := pk.Unlock(passphrase)
if err != nil {
t.Error("Expected no error while unlocking private key, got:" + err.Error())
}
defer uk.ClearPrivateParams()
ukr, err := NewKeyRing(uk)
if err != nil {
t.Error("Expected no error while building private keyring, got:" + err.Error())
}
inputPlaintext := readTestFile("att_cleartext", false)
plaintextBytes := []byte(inputPlaintext)
plaintextReader := bytes.NewReader(plaintextBytes)
bufferLen := 2 * len(plaintextBytes)
dataPacket := make([]byte, bufferLen)
ap, err := ukr.NewManualAttachmentProcessor(
len(plaintextBytes),
"test.txt",
dataPacket,
)
if err != nil {
t.Error("Expected no error while building the attachment processor, got:" + err.Error())
}
chunkSize := 1 << 10
inputBytes := make([]byte, chunkSize)
var readAllPlaintext = false
for !readAllPlaintext {
nBytesRead, err := plaintextReader.Read(inputBytes)
if errors.Is(err, io.EOF) {
readAllPlaintext = true
} else if err != nil {
t.Error("Expected no error while reading plain data, got:" + err.Error())
}
err = ap.Process(inputBytes[:nBytesRead])
if err != nil {
t.Error("Expected no error while writing plain data, got:" + err.Error())
}
}
err = ap.Finish()
if err != nil {
t.Error("Expected no error while calling finish, got:" + err.Error())
}
dataLength := ap.GetDataLength()
keyPacket := ap.GetKeyPacket()
if keyPacket == nil {
t.Error("The key packet was nil")
}
if len(keyPacket) == 0 {
t.Error("The key packet was empty")
}
t.Logf("buffer size : %d total written : %d", bufferLen, dataLength)
if dataLength > bufferLen {
t.Errorf("Wrote more than was allocated, buffer size : %d total written : %d", bufferLen, dataLength)
}
pgpMsg := NewPGPSplitMessage(keyPacket, dataPacket[:dataLength]).GetPGPMessage()
plainMsg, err := ukr.Decrypt(pgpMsg, nil, 0)
if err != nil {
t.Error("Expected no error while decrypting, got:" + err.Error())
}
outputPlaintext := string(plainMsg.Data)
if outputPlaintext != inputPlaintext {
t.Errorf("Expectedplaintext to be %s got %s", inputPlaintext, outputPlaintext)
}
}
func TestManualAttachmentProcessorNotEnoughBuffer(t *testing.T) {
pgp.latestServerTime = 1615394034
defer func() { pgp.latestServerTime = testTime }()
passphrase := []byte("wUMuF/lkDPYWH/0ZqqY8kJKw7YJg6kS")
pk, err := NewKeyFromArmored(readTestFile("att_key", false))
if err != nil {
t.Error("Expected no error while unarmoring private key, got:" + err.Error())
}
uk, err := pk.Unlock(passphrase)
if err != nil {
t.Error("Expected no error while unlocking private key, got:" + err.Error())
}
defer uk.ClearPrivateParams()
ukr, err := NewKeyRing(uk)
if err != nil {
t.Error("Expected no error while building private keyring, got:" + err.Error())
}
inputPlaintext := readTestFile("att_cleartext", false)
plaintextBytes := []byte(inputPlaintext)
plaintextReader := bytes.NewReader(plaintextBytes)
bufferLen := len(plaintextBytes) / 2
dataPacket := make([]byte, bufferLen)
ap, err := ukr.NewManualAttachmentProcessor(
len(plaintextBytes),
"test.txt",
dataPacket,
)
if err != nil {
t.Error("Expected no error while building the attachment processor, got:" + err.Error())
}
chunkSize := 1 << 10
inputBytes := make([]byte, chunkSize)
var readAllPlaintext = false
for !readAllPlaintext {
nBytesRead, err := plaintextReader.Read(inputBytes)
if errors.Is(err, io.EOF) {
readAllPlaintext = true
} else if err != nil {
t.Error("Expected no error while reading plain data, got:" + err.Error())
}
err = ap.Process(inputBytes[:nBytesRead])
if err != nil {
t.Error("Expected no error while writing plain data, got:" + err.Error())
}
}
err = ap.Finish()
if err == nil {
t.Error("Expected an error while calling finish, got nil")
}
}
func TestManualAttachmentProcessorEmptyBuffer(t *testing.T) {
pgp.latestServerTime = 1615394034
defer func() { pgp.latestServerTime = testTime }()
passphrase := []byte("wUMuF/lkDPYWH/0ZqqY8kJKw7YJg6kS")
pk, err := NewKeyFromArmored(readTestFile("att_key", false))
if err != nil {
t.Error("Expected no error while unarmoring private key, got:" + err.Error())
}
uk, err := pk.Unlock(passphrase)
if err != nil {
t.Error("Expected no error while unlocking private key, got:" + err.Error())
}
defer uk.ClearPrivateParams()
ukr, err := NewKeyRing(uk)
if err != nil {
t.Error("Expected no error while building private keyring, got:" + err.Error())
}
inputPlaintext := readTestFile("att_cleartext", false)
plaintextBytes := []byte(inputPlaintext)
bufferLen := 0
dataPacket := make([]byte, bufferLen)
_, err = ukr.NewManualAttachmentProcessor(
len(plaintextBytes),
"test.txt",
dataPacket,
)
if err == nil {
t.Error("Expected an error while building the attachment processor with an empty buffer got nil")
}
}