WaitForRowsAsync() Method
In This Topic
Asynchronously waits for new rows to be available in the internal queue of this real-time cursor. The returned
System.Threading.Tasks.Task will also complete if the state of this
RealtimeCursor (see
GetState) changes from
RealtimeCursorState.Subscribed. This method can be called on any thread.
Syntax
Return Value
A
System.Threading.Tasks.Task that will complete once this cursor becomes unsubscribed or new rows are available in the internal queue of this real-time cursor. The returned
System.Boolean value is
false if this cursor is unsubscribed and there are no more rows to be read. Otherwise it's
true.
Exceptions
Example
Submit a Graph Query
//On the QueuedTask...
//and assuming you have established a connection to a knowledge graph
//...
//Construct an openCypher query - return the first 10 entities (whatever
//they are...)
var query = "MATCH (n) RETURN n LIMIT 10";//default limit is 100 if not specified
//other examples...
//query = "MATCH (a:Person) RETURN [a.name, a.age] ORDER BY a.age DESC LIMIT 50";
//query = "MATCH (b:Person) RETURN { Xperson: { Xname: b.name, Xage: b.age } } ORDER BY b.name DESC";
//query = "MATCH p = (c:Person)-[:HasCar]-() RETURN p ORDER BY c.name DESC";
//Create a query filter
//Note: OutputSpatialReference is currently ignored
var kg_qf = new KnowledgeGraphQueryFilter()
{
QueryText = query
};
//Optionally - u can choose to include provenance in the results
//(_if_ the KG has provenance - otherwise the query will fail)
if (includeProvenanceIfPresent)
{
//see "Get Whether KG Supports Provenance" snippet
if (KnowledgeGraphSupportsProvenance(kg))
{
//Only include if the KG has provenance
kg_qf.ProvenanceBehavior =
KnowledgeGraphProvenanceBehavior.Include;//default is exclude
}
}
//submit the query - returns a KnowledgeGraphCursor
using (var kg_rc = kg.SubmitQuery(kg_qf))
{
//wait for rows to be returned from the server
//note the "await"...
while (await kg_rc.WaitForRowsAsync())
{
//Rows have been retrieved - process this "batch"...
while (kg_rc.MoveNext())
{
//Get the current KnowledgeGraphRow
using (var graph_row = kg_rc.Current)
{
//Graph row is an array, process all returned values...
var val_count = (int)graph_row.GetCount();
for (int i = 0; i < val_count; i++)
{
var retval = graph_row[i];
//Process row value (note: recursive)
//See "Process a KnowledgeGraphRow Value" snippet
ProcessKnowledgeGraphRowValue(retval);
}
}
}
}//WaitForRowsAsync
}//SubmitQuery
Search And Subscribe for Streaming Data
await QueuedTask.Run(async () =>
{
//query filter can be null to search and retrieve all rows
//true means recycling cursor
using (var rc = streamLayer.SearchAndSubscribe(qfilter, true))
{
//waiting for new features to be streamed
//default is no cancellation
while (await rc.WaitForRowsAsync())
{
while (rc.MoveNext())
{
using (var row = rc.Current)
{
//determine the origin of the row event
switch (row.GetRowSource())
{
case RealtimeRowSource.PreExisting:
//pre-existing row at the time of subscribe
continue;
case RealtimeRowSource.EventInsert:
//row was inserted after subscribe
continue;
case RealtimeRowSource.EventDelete:
//row was deleted after subscribe
continue;
}
}
}
}
}//row cursor is disposed. row cursor is unsubscribed
//....or....
//Use the feature class instead of the layer
using (var rfc = streamLayer.GetFeatureClass())
{
//non-recycling cursor - 2nd param "false"
using (var rc = rfc.SearchAndSubscribe(qfilter, false))
{
//waiting for new features to be streamed
//default is no cancellation
while (await rc.WaitForRowsAsync())
{
//etc
}
}
}
});
Search And Subscribe With Cancellation
await QueuedTask.Run(async () =>
{
//Recycling cursor - 2nd param "true"
//or streamLayer.Subscribe(qfilter, true) to just subscribe
using (var rc = streamLayer.SearchAndSubscribe(qfilter, true))
{
//auto-cancel after 20 seconds
var cancel = new CancellationTokenSource(new TimeSpan(0, 0, 20));
//catch TaskCanceledException
try
{
while (await rc.WaitForRowsAsync(cancel.Token))
{
//check for row events
while (rc.MoveNext())
{
using (var row = rc.Current)
{
//etc
}
}
}
}
catch (TaskCanceledException )
{
//Handle cancellation as needed
}
cancel.Dispose();
}
});
Search Existing Data and Subscribe for Streaming Data
//Note we can use System Task with the Realtime feature class
//for subscribe
await System.Threading.Tasks.Task.Run(async () =>
// or use ... QueuedTask.Run()
{
using (var rfc = streamLayer.GetFeatureClass())
{
//non-recycling cursor - 2nd param "false"
using (var rc = rfc.SearchAndSubscribe(qfilter, false))
{
//waiting for new features to be streamed
//default is no cancellation
while (await rc.WaitForRowsAsync())
{
//pre-existing rows will be retrieved that were searched
while (rc.MoveNext())
{
using (var row = rc.Current)
{
var row_source = row.GetRowSource();
switch (row_source)
{
case RealtimeRowSource.EventDelete:
//TODO - handle deletes
break;
case RealtimeRowSource.EventInsert:
//TODO handle inserts
break;
case RealtimeRowSource.PreExisting:
//TODO handle pre-existing rows
break;
}
}
}
}
}//row cursor is disposed. row cursor is unsubscribed
}
});
Requirements
Target Platforms: Windows 11, Windows 10
ArcGIS Pro version: 3.2 or higher.
See Also