liyonghelpme liyonghelpme - 1 month ago 21
C# Question

how .Net Task Parallel Library dataflow send message to many clients with not only one cache?

There is a Server Task, which uses

TPL Dataflow
to send messages to many clients Task.


  • Client connects to the server randomly

  • Client can send message to server and client can receive message from server



Server use a
BufferBlock<string>
to send message to client, when client connects to the server, it receive message from this
BufferBlock<string>
.

But this
BufferBlock<string>
can cache only one message, and client can't ask for more than one message from server, and client can't set a condition to select which message to receive.

I want a block type, which can cache several messages; and client can read not only one message from this block type, and client can choose which message to receive;

I tried other
TPL Dataflow
block types, but no one has such ability, does
TPL Dataflow
block is not suit for such requirement?

Condition is simple, each message has a timestamp, client just send a timestamp to server, then server return messages that sent after the timestamp.

using System;
using System.Web;
using System.Net;
using System.Threading.Tasks;
using System.Text;
using SimpleJSON;
using System.Collections.Generic;
using System.Threading.Tasks.Dataflow;

namespace TestHttp
{
public class HttpServer
{
private HttpListener httpListener;
public Task task;
public HttpServer()
{
var ta = Task.Factory.StartNew(RunHttp);
task = ta.Result;
}

private async Task RunHttp()
{
var httpPort = 9090;
httpListener = new HttpListener();
httpListener.Prefixes.Add("http://*:"+httpPort+"/");
httpListener.Start();
while (httpListener.IsListening)
{
var context = await httpListener.GetContextAsync();
var req = context.Request;
Handle(context, req);
}

httpListener.Stop();
httpListener.Close();
}

private async Task Handle(HttpListenerContext context, HttpListenerRequest req)
{
Console.WriteLine(req.RawUrl);

var resp = await HandleGet(req);
var buf = Encoding.UTF8.GetBytes(resp);
context.Response.AddHeader("Content-Encoding", "utf-8");
context.Response.ContentEncoding = Encoding.UTF8;
context.Response.ContentLength64 = buf.Length;
try
{
context.Response.OutputStream.Write(buf, 0, buf.Length);
}
catch (Exception exp)
{
Console.WriteLine(exp.ToString());
}
finally
{
context.Response.OutputStream.Close();
}

}

private BufferBlock<string> messages = new BufferBlock<string>();

private async Task<string> HandleGet(HttpListenerRequest req)
{
var r = req.RawUrl.Split('?');
if (r[0] == "/send")
{
await messages.SendAsync(r[1]);
return "Suc";
}
else if(r[0] == "/receive"){
var timestamp = Convert.ToInt32(r[1]);
var ret = await messages.ReceiveAsync();
return ret;
}
//Console.WriteLine(r[0]);
return "Error";

}
}
}

Answer

Why are you saying that the BufferBlock can contain only one value? It's not true, it can contain as many messages as you want to, based on the block creation options and, specifically, BoundedCapacity option. Default value for this parameter is -1, which stands for unlimited capacity.

So, at the moment the client connects, you can easy get all the messages filtered by the timestamp, and get them back to client. This, maybe, will lead to changing the signature of the result value for the client's request, as you must provide the TimeStamp parameter and provide the ability to return the List<T> of the messages and not the only one. Without any code we can't say more for this issue.