+-
[在ConcurrentDictionary.GetOrAdd?中使用lock语句时的已知副作用

当从DbSet ConcurrentDictionary方法内部的多个线程添加到实体框架ValueFactory时,我遇到了一些问题。我试图通过引入lock语句消除该问题。虽然这似乎有一些奇怪的副作用。在某些罕见和随机的情况下,即使编程应防止此情况发生,我的代码也会抛出KeyNotFoundException。我想我监督了一些事情。

using (ESBClient client = new ESBClient()) { // WCF SERVICE

    client.Open();

    // Limit the maximum number of parallel requests
    var esbLimiter = new SemaphoreSlim(4);

    ConcurrentDictionary<string, DataEntry> dataEntryDict  = new ConcurrentDictionary<string, DataEntry>(
        await db.DataEntries
            .Where(de => allObjIDs.Contains(de.PAObjID))
            .IncludeOptimized(de => de.WorkSchedules)
            .ToDictionaryAsync(a => a.PAObjID, a => a)
    );


    // Get WorkOrderDataSet02 for each data entry number
    await Task.WhenAll(allDataEntryNumbers.Batch(20).Select(async workOrderBatch => {
        await esbLimiter.WaitAsync();

        Debug.WriteLine($"Starting for new batch after {s.ElapsedMilliseconds} with parallel {esbLimiter.CurrentCount}");

        try {
            int retryCounter = 0;
            getWorkOrderDataSet02Response gwoResp;

            retryCurrentWorkOrderDataSetResp:
            try {
                gwoResp = await client.getWorkOrderDataSet02Async(
                    new getWorkOrderDataSet02Request(
                        "?",
                        companyGroup.Key,
                        string.Join(",", workOrderBatch.Select(wob => wob.DataEntryNumber)),
                        "WNTREIB",
                        "?",
                        "act,sales",
                        "D"
                    )
                );

            } catch (System.ServiceModel.CommunicationException ex) {
                // Retry up to 3 times before finally crashing
                if (retryCounter++ < 3) {
                    await HandleServiceRetryError("getWorkOrderDataSet02Async", retryCounter, s.ElapsedMilliseconds, ex);
                    goto retryCurrentWorkOrderDataSetResp;
                } else
                    throw;
            }

            // Iterate over all work orders returned by the ESB
            foreach (dsyWorkOrder01TtyWorkOrder currDetail in gwoResp.dsyWorkOrder01) { // dsyWorkOrder01 IS AN ARRAY OF OBJECTS. IT COMES FROM A WCF CALL. PAObjID IS UNIQUE.
                // Get or create element
                DataEntry currentEntry = dataEntryDict.GetOrAdd(
                    currDetail.Obj,
                    key => {
                        DataEntry newDe = new DataEntry();
                        lock (db.DataEntries) { // I INTRODUCED THOSE LOCK STATEMENTS
                            db.DataEntries.Add(newDe); // THIS IS THE LINE THAT WAS PROBLEMATIC IN THE FIRST PLACE
                        }
                        return newDe;
                    }
                );

                // Set regular fields
                currentEntry.ApplyTtyWorkOrder(currDetail, resourceDict); // THIS METHOD APPLIES THE PAObjID PROPERTY
            }

            // Delete all elements, that were not provided by the service anymore
            lock(db.DataEntries) { 
                workOrderBatch
                    .Where(wob => !gwoResp.dsyWorkOrder01
                        .Where(wo => wo.DataEntryNumber.HasValue)
                        .Select(wo => wo.DataEntryNumber.Value)
                        .Contains(wob.DataEntryNumber)
                    )
                    .ToArray()
                    .ForEach(dataEntry => {
                        try {
                            db.DataEntries.Remove(dataEntryDict[dataEntry.ObjID]); // THIS LINE THROWS THE KeyNotFoundException
                        } catch (Exception ex) {
                            throw new Exception($"Key {dataEntry.ObjID} not in list.", ex);
                        }

                    });
            }

            // Update progress
            progress.Report(.1f + totalSteps * Interlocked.Increment(ref currentStep) * .8f);

        } finally {
            Debug.WriteLine($"Finished for batch after {s.ElapsedMilliseconds} with parallel {esbLimiter.CurrentCount}");
            esbLimiter.Release();
        }

    }));
}

// HERE'S THE APPLY METHOD
public void ApplyTtyWorkOrder(dsyWorkOrder01TtyWorkOrder src, Dictionary<(string Name, byte ResourceType), int> resourceDict) {
    Deleted = false;

    DataEntryNumber = src.DataEntryNumber.Value;
    PAObjID = src.Obj; // PAObjID IS APPLIED HERE
    IsHeader = src.IsHeader;
    Pieces = Convert.ToInt16(src.ProductionQty);
    PartNo = src.Article;
    JobNo = src.WorkOrder;
    StartDate = src.StartDate;
    FinishDate = src.EndDate;
    FinishedPA = src.WorkOrderStatus == "R";

    // Update methods
    UpdateFromTtyCustomer(src.ttyCustomer?.FirstOrDefault());
    UpdateFromPart(src.ttyPart?.FirstOrDefault());
    UpdateFromSalesDocHeader(src.ttySalesDocHeader?.FirstOrDefault());
    UpdateWorkSchedules(src.ttyWorkOrderActivity, resourceDict);
}

我在我认为相关的每一行中都添加了大写注释。

我不知道为什么会发生此错误。根据我的理解,我只尝试从我在循环的同一迭代中添加的dataEntryDict词典dataEntry.ObjID键中获得一个条目。

[在介绍这两个锁语句之前,标有“这是在第一位置有问题的行”的行偶尔会抛出异常:“集合已修改;枚举操作可能不会执行。”深入研究EF的代码后,我意识到这应该与DbSet.Add方法的实现方式有关。

[lock中使用ValueFactory语句时是否有任何已知的副作用?

1
投票
lock (db.DataEntries) { // I INTRODUCED THOSE LOCK STATEMENTS
    db.DataEntries.Add(newDe); // THIS IS THE LINE THAT WAS PROBLEMATIC IN THE FIRST PLACE
}

问题在于,db.DataEntries不是线程安全的集合,但是它被多个线程同时访问。所有EF对象都不是线程安全的。

在这里使用锁定似乎是一个很好的解决方案。确保您抓住了所有地方。

通常最好将并发部分与顺序部分分开。同时进行[[0]]调用,并将结果收集到一个集合中。然后,顺序处理结果。