作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在努力加快某些过程的执行速度,这些过程将大量记录(大多数是几百万个)发布到Elasticsearch。在我的C#代码中,我已经使用Dataflow实现了一个多线程解决方案,如下所示:
var fetchRecords = new TransformBlock<?, ?>(() => { ... });
var sendRecordsToElastic = new ActionBlock<List<?>>(records => sendBulkRequest(records));
fetchRecords.LinkTo(sendRecordsToElastic, { PropogateCompletion = true });
fetchRecords.Post("Start");
public IBulkResponse sendBulkRequest(List<?> records)
{
lock(SomeStaticObject)
{
// Execute several new threads to send records in bulk
}
}
最佳答案
您可能需要在此处使用BulkAll
,该代码实现了可观察的模式,以向Elasticsearch发出并发批量请求。这是一个例子
void Main()
{
var pool = new SingleNodeConnectionPool(new Uri("http://localhost:9200"));
var connectionSettings = new ConnectionSettings(pool);
var client = new ElasticClient(connectionSettings);
var indexName = "bulk-index";
if (client.IndexExists(indexName).Exists)
client.DeleteIndex(indexName);
client.CreateIndex(indexName, c => c
.Settings(s => s
.NumberOfShards(3)
.NumberOfReplicas(0)
)
.Mappings(m => m
.Map<DeviceStatus>(p => p.AutoMap())
)
);
var size = 500;
// set up the observable
var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b
.Index(indexName)
.MaxDegreeOfParallelism(4)
.RefreshOnCompleted()
.Size(size)
);
var countdownEvent = new CountdownEvent(1);
Exception exception = null;
// set up an observer. Delegates passed are:
// 1. onNext
// 2. onError
// 3. onCompleted
var bulkAllObserver = new BulkAllObserver(
response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"),
ex =>
{
// capture exception for throwing outside Observer.
// You may decide to do something different here
exception = ex;
countdownEvent.Signal();
},
() =>
{
Console.WriteLine("Finished");
countdownEvent.Signal();
});
// subscribe to the observable
bulkAllObservable.Subscribe(bulkAllObserver);
// wait indefinitely for it to finish. May want to put a
// max timeout on this
countdownEvent.Wait();
if (exception != null)
{
throw exception;
}
}
// lazily enumerated collection
private static IEnumerable<DeviceStatus> GetDeviceStatus()
{
for (var i = 0; i < DocumentCount; i++)
yield return new DeviceStatus(i);
}
private const int DocumentCount = 20000;
public class DeviceStatus
{
public DeviceStatus(int id) => Id = id;
public int Id {get;set;}
}
.Wait()
方法
var bulkAllObservable = client.BulkAll(GetDeviceStatus(), b => b
.Index(indexName)
.MaxDegreeOfParallelism(4)
.RefreshOnCompleted()
.Size(size)
)
.Wait(
TimeSpan.FromHours(1),
response => Console.WriteLine($"Indexed {response.Page * size} with {response.Retries} retries")
);
BulkAll
,
ScrollAll
和
Reindex
有一些可观察到的方法(尽管有
ReindexOnServer
在Elasticsearch中重新编制索引并映射到
the Reindex API-
Reindex
方法早于此)
关于c# - 线程锁内的多线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48202652/
我是一名优秀的程序员,十分优秀!