public class KomodoSettings : BlobSettings
{
#region Public-Members
/// <summary>
/// Komodo endpoint URL, of the form http://[hostname]:[port]/.
/// </summary>
public string Endpoint { get; set; } = null;
/// <summary>
/// Komodo index GUID.
/// </summary>
public string IndexGUID { get; set; } = null;
/// <summary>
/// Komodo API key.
/// </summary>
public string ApiKey { get; set; } = null;
/// <summary>
/// Initialize the object.
/// </summary>
/// <param name="endpoint">Komodo endpoint, i.e. http://localhost:8000/</param>
/// <param name="indexGuid">GUID of the index.</param>
/// <param name="apiKey">API key with read, write, and delete permissions.</param>
public KomodoSettings(string endpoint, string indexGuid, string apiKey)
{
if (String.IsNullOrEmpty(endpoint)) throw new ArgumentNullException(nameof(endpoint));
if (String.IsNullOrEmpty(indexGuid)) throw new ArgumentNullException(nameof(indexGuid));
if (String.IsNullOrEmpty(apiKey)) throw new ArgumentNullException(nameof(apiKey));
Endpoint = endpoint;
IndexGUID = indexGuid;
ApiKey = apiKey;
if (!Endpoint.EndsWith("/")) Endpoint += "/";
}
}
_Komodo = new KomodoSdk(_KomodoSettings.Endpoint, _KomodoSettings.ApiKey);
private async Task KomodoDelete(string key, CancellationToken token)
{
await _Komodo.DeleteDocument(_KomodoSettings.IndexGUID, key, token).ConfigureAwait(false);
}
private async Task<byte[]> KomodoGet(string key, CancellationToken token)
{
DocumentData data = await _Komodo.GetSourceDocument(_KomodoSettings.IndexGUID, key, token).ConfigureAwait(false);
return data.Data;
}
private async Task<BlobData> KomodoGetStream(string key, CancellationToken token)
{
BlobData ret = new BlobData();
DocumentData data = await _Komodo.GetSourceDocument(_KomodoSettings.IndexGUID, key, token).ConfigureAwait(false);
ret.ContentLength = data.ContentLength;
ret.Data = data.DataStream;
return ret;
}
private async Task<bool> KomodoExists(string key, CancellationToken token)
{
try
{
DocumentMetadata md = await _Komodo.GetDocumentMetadata(_KomodoSettings.IndexGUID, key, token).ConfigureAwait(false);
return true;
}
catch (KomodoException)
{
return false;
}
}
private async Task KomodoWrite(string key, string contentType, byte[] data, CancellationToken token)
{
await _Komodo.AddDocument(_KomodoSettings.IndexGUID, key, key, null, key, DocType.Unknown, data, null, token).ConfigureAwait(false);
}
private async Task KomodoWrite(string key, string contentType, long contentLength, Stream stream, CancellationToken token)
{
byte[] data = Common.StreamToBytes(stream);
await KomodoWrite(key, contentType, data, token).ConfigureAwait(false);
}
private async Task KomodoWriteMany(List<WriteRequest> objects, CancellationToken token)
{
foreach (WriteRequest obj in objects)
{
if (obj.Data != null)
{
await KomodoWrite(obj.Key, obj.ContentType, obj.Data, token).ConfigureAwait(false);
}
else
{
await KomodoWrite(obj.Key, obj.ContentType, obj.ContentLength, obj.DataStream, token).ConfigureAwait(false);
}
}
}
private async Task<BlobMetadata> KomodoGetMetadata(string key, CancellationToken token)
{
DocumentMetadata dm = await _Komodo.GetDocumentMetadata(_KomodoSettings.IndexGUID, key, token).ConfigureAwait(false);
BlobMetadata md = new BlobMetadata();
md.ContentLength = dm.SourceRecord.ContentLength;
md.ContentType = dm.SourceRecord.ContentType;
md.CreatedUtc = dm.SourceRecord.Created;
md.ETag = dm.SourceRecord.Md5;
md.Key = dm.SourceRecord.GUID;
md.LastAccessUtc = null;
md.LastUpdateUtc = null;
return md;
}
private async Task<EnumerationResult> KomodoEnumerate(string prefix, string continuationToken, CancellationToken token)
{
int startIndex = 0;
int count = 1000;
if (!String.IsNullOrEmpty(continuationToken))
{
if (!KomodoParseContinuationToken(continuationToken, out startIndex, out count))
{
throw new ArgumentException("Unable to parse continuation token.");
}
}
EnumerationQuery eq = new EnumerationQuery();
eq.StartIndex = startIndex;
eq.MaxResults = count;
if (!String.IsNullOrEmpty(prefix))
{
SearchFilter sf = new SearchFilter("GUID", SearchCondition.StartsWith, prefix);
eq.Filters.Add(sf);
}
Komodo.Sdk.Classes.EnumerationResult ker = await _Komodo.Enumerate(_KomodoSettings.IndexGUID, eq, token).ConfigureAwait(false);
EnumerationResult ret = new EnumerationResult();
ret.NextContinuationToken = KomodoBuildContinuationToken(startIndex + count, count);
if (ker.Matches != null && ker.Matches.Count > 0)
{
foreach (SourceDocument curr in ker.Matches)
{
BlobMetadata md = new BlobMetadata();
md.ContentLength = curr.ContentLength;
md.ContentType = curr.ContentType;
md.CreatedUtc = curr.Created;
md.ETag = curr.Md5;
md.Key = curr.GUID;
ret.Blobs.Add(md);
}
}
return ret;
}
private async Task<EmptyResult> KomodoEmpty(CancellationToken token)
{
EmptyResult er = new EmptyResult();
string continuationToken = null;
while (true)
{
EnumerationResult result = await KomodoEnumerate(null, null, token).ConfigureAwait(false);
continuationToken = result.NextContinuationToken;
if (result.Blobs != null && result.Blobs.Count > 0)
{
foreach (BlobMetadata md in result.Blobs)
{
await KomodoDelete(md.Key, token).ConfigureAwait(false);
er.Blobs.Add(md);
}
}
else
{
break;
}
}
return er;
}
private string BuildContinuationToken(long start, int count)
{
string ret = start.ToString() + " " + count.ToString();
byte[] retBytes = Encoding.UTF8.GetBytes(ret);
return Convert.ToBase64String(retBytes);
}
private bool KomodoParseContinuationToken(string continuationToken, out int start, out int count)
{
return KvpbaseParseContinuationToken(continuationToken, out start, out count);
}
private string KomodoBuildContinuationToken(long start, int count)
{
if (start >= count) return null;
return BuildContinuationToken(start, count);
}
private string KomodoGenerateUrl(string key)
{
if (!_KomodoSettings.Endpoint.EndsWith("/")) _KomodoSettings.Endpoint += "/";
string ret =
_KomodoSettings.Endpoint +
_KomodoSettings.IndexGUID + "/" +
key;
return ret;
}
public static byte[] StreamToBytes(Stream input)
{
if (input == null) throw new ArgumentNullException(nameof(input));
if (!input.CanRead) throw new InvalidOperationException("Input stream is not readable");
byte[] buffer = new byte[16 * 1024];
using (MemoryStream ms = new MemoryStream())
{
int read;
while ((read = input.Read(buffer, 0, buffer.Length)) > 0)
{
ms.Write(buffer, 0, read);
}
return ms.ToArray();
}
}