258 lines
9.1 KiB
JavaScript
258 lines
9.1 KiB
JavaScript
import * as RA from "../../Array.js";
|
|
import * as Chunk from "../../Chunk.js";
|
|
import { dual, pipe } from "../../Function.js";
|
|
import * as Option from "../../Option.js";
|
|
import { hasProperty } from "../../Predicate.js";
|
|
import * as core from "./core.js";
|
|
import * as OpCodes from "./opCodes/strategy.js";
|
|
import * as stm from "./stm.js";
|
|
import * as tRef from "./tRef.js";
|
|
const TEnqueueSymbolKey = "effect/TQueue/TEnqueue";
|
|
/** @internal */
|
|
export const TEnqueueTypeId = /*#__PURE__*/Symbol.for(TEnqueueSymbolKey);
|
|
const TDequeueSymbolKey = "effect/TQueue/TDequeue";
|
|
/** @internal */
|
|
export const TDequeueTypeId = /*#__PURE__*/Symbol.for(TDequeueSymbolKey);
|
|
/** @internal */
|
|
export const BackPressure = {
|
|
_tag: OpCodes.OP_BACKPRESSURE_STRATEGY
|
|
};
|
|
/** @internal */
|
|
export const Dropping = {
|
|
_tag: OpCodes.OP_DROPPING_STRATEGY
|
|
};
|
|
/** @internal */
|
|
export const Sliding = {
|
|
_tag: OpCodes.OP_SLIDING_STRATEGY
|
|
};
|
|
/** @internal */
|
|
export const tDequeueVariance = {
|
|
/* c8 ignore next */
|
|
_Out: _ => _
|
|
};
|
|
/** @internal */
|
|
export const tEnqueueVariance = {
|
|
/* c8 ignore next */
|
|
_In: _ => _
|
|
};
|
|
class TQueueImpl {
|
|
ref;
|
|
requestedCapacity;
|
|
strategy;
|
|
[TDequeueTypeId] = tDequeueVariance;
|
|
[TEnqueueTypeId] = tEnqueueVariance;
|
|
constructor(ref, requestedCapacity, strategy) {
|
|
this.ref = ref;
|
|
this.requestedCapacity = requestedCapacity;
|
|
this.strategy = strategy;
|
|
}
|
|
capacity() {
|
|
return this.requestedCapacity;
|
|
}
|
|
size = /*#__PURE__*/core.withSTMRuntime(runtime => {
|
|
const queue = tRef.unsafeGet(this.ref, runtime.journal);
|
|
if (queue === undefined) {
|
|
return core.interruptAs(runtime.fiberId);
|
|
}
|
|
return core.succeed(queue.length);
|
|
});
|
|
isFull = /*#__PURE__*/core.map(this.size, size => size === this.requestedCapacity);
|
|
isEmpty = /*#__PURE__*/core.map(this.size, size => size === 0);
|
|
shutdown = /*#__PURE__*/core.withSTMRuntime(runtime => {
|
|
tRef.unsafeSet(this.ref, void 0, runtime.journal);
|
|
return stm.void;
|
|
});
|
|
isShutdown = /*#__PURE__*/core.effect(journal => {
|
|
const queue = tRef.unsafeGet(this.ref, journal);
|
|
return queue === undefined;
|
|
});
|
|
awaitShutdown = /*#__PURE__*/core.flatMap(this.isShutdown, isShutdown => isShutdown ? stm.void : core.retry);
|
|
offer(value) {
|
|
return core.withSTMRuntime(runtime => {
|
|
const queue = pipe(this.ref, tRef.unsafeGet(runtime.journal));
|
|
if (queue === undefined) {
|
|
return core.interruptAs(runtime.fiberId);
|
|
}
|
|
if (queue.length < this.requestedCapacity) {
|
|
queue.push(value);
|
|
tRef.unsafeSet(this.ref, queue, runtime.journal);
|
|
return core.succeed(true);
|
|
}
|
|
switch (this.strategy._tag) {
|
|
case OpCodes.OP_BACKPRESSURE_STRATEGY:
|
|
{
|
|
return core.retry;
|
|
}
|
|
case OpCodes.OP_DROPPING_STRATEGY:
|
|
{
|
|
return core.succeed(false);
|
|
}
|
|
case OpCodes.OP_SLIDING_STRATEGY:
|
|
{
|
|
if (queue.length === 0) {
|
|
return core.succeed(true);
|
|
}
|
|
queue.shift();
|
|
queue.push(value);
|
|
tRef.unsafeSet(this.ref, queue, runtime.journal);
|
|
return core.succeed(true);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
offerAll(iterable) {
|
|
return core.withSTMRuntime(runtime => {
|
|
const as = Array.from(iterable);
|
|
const queue = tRef.unsafeGet(this.ref, runtime.journal);
|
|
if (queue === undefined) {
|
|
return core.interruptAs(runtime.fiberId);
|
|
}
|
|
if (queue.length + as.length <= this.requestedCapacity) {
|
|
tRef.unsafeSet(this.ref, [...queue, ...as], runtime.journal);
|
|
return core.succeed(true);
|
|
}
|
|
switch (this.strategy._tag) {
|
|
case OpCodes.OP_BACKPRESSURE_STRATEGY:
|
|
{
|
|
return core.retry;
|
|
}
|
|
case OpCodes.OP_DROPPING_STRATEGY:
|
|
{
|
|
const forQueue = as.slice(0, this.requestedCapacity - queue.length);
|
|
tRef.unsafeSet(this.ref, [...queue, ...forQueue], runtime.journal);
|
|
return core.succeed(false);
|
|
}
|
|
case OpCodes.OP_SLIDING_STRATEGY:
|
|
{
|
|
const forQueue = as.slice(0, this.requestedCapacity - queue.length);
|
|
const toDrop = queue.length + forQueue.length - this.requestedCapacity;
|
|
const newQueue = queue.slice(toDrop);
|
|
tRef.unsafeSet(this.ref, [...newQueue, ...forQueue], runtime.journal);
|
|
return core.succeed(true);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
peek = /*#__PURE__*/core.withSTMRuntime(runtime => {
|
|
const queue = tRef.unsafeGet(this.ref, runtime.journal);
|
|
if (queue === undefined) {
|
|
return core.interruptAs(runtime.fiberId);
|
|
}
|
|
if (queue.length === 0) {
|
|
return core.retry;
|
|
}
|
|
return core.succeed(queue[0]);
|
|
});
|
|
peekOption = /*#__PURE__*/core.withSTMRuntime(runtime => {
|
|
const queue = tRef.unsafeGet(this.ref, runtime.journal);
|
|
if (queue === undefined) {
|
|
return core.interruptAs(runtime.fiberId);
|
|
}
|
|
return core.succeed(Option.fromNullable(queue[0]));
|
|
});
|
|
take = /*#__PURE__*/core.withSTMRuntime(runtime => {
|
|
const queue = tRef.unsafeGet(this.ref, runtime.journal);
|
|
if (queue === undefined) {
|
|
return core.interruptAs(runtime.fiberId);
|
|
}
|
|
if (queue.length === 0) {
|
|
return core.retry;
|
|
}
|
|
const dequeued = queue.shift();
|
|
tRef.unsafeSet(this.ref, queue, runtime.journal);
|
|
return core.succeed(dequeued);
|
|
});
|
|
takeAll = /*#__PURE__*/core.withSTMRuntime(runtime => {
|
|
const queue = tRef.unsafeGet(this.ref, runtime.journal);
|
|
if (queue === undefined) {
|
|
return core.interruptAs(runtime.fiberId);
|
|
}
|
|
tRef.unsafeSet(this.ref, [], runtime.journal);
|
|
return core.succeed(queue);
|
|
});
|
|
takeUpTo(max) {
|
|
return core.withSTMRuntime(runtime => {
|
|
const queue = tRef.unsafeGet(this.ref, runtime.journal);
|
|
if (queue === undefined) {
|
|
return core.interruptAs(runtime.fiberId);
|
|
}
|
|
const [toTake, remaining] = Chunk.splitAt(Chunk.unsafeFromArray(queue), max);
|
|
tRef.unsafeSet(this.ref, Array.from(remaining), runtime.journal);
|
|
return core.succeed(Array.from(toTake));
|
|
});
|
|
}
|
|
}
|
|
/** @internal */
|
|
export const isTQueue = u => {
|
|
return isTEnqueue(u) && isTDequeue(u);
|
|
};
|
|
/** @internal */
|
|
export const isTEnqueue = u => hasProperty(u, TEnqueueTypeId);
|
|
/** @internal */
|
|
export const isTDequeue = u => hasProperty(u, TDequeueTypeId);
|
|
/** @internal */
|
|
export const awaitShutdown = self => self.awaitShutdown;
|
|
/** @internal */
|
|
export const bounded = requestedCapacity => makeQueue(requestedCapacity, BackPressure);
|
|
/** @internal */
|
|
export const capacity = self => {
|
|
return self.capacity();
|
|
};
|
|
/** @internal */
|
|
export const dropping = requestedCapacity => makeQueue(requestedCapacity, Dropping);
|
|
/** @internal */
|
|
export const isEmpty = self => self.isEmpty;
|
|
/** @internal */
|
|
export const isFull = self => self.isFull;
|
|
/** @internal */
|
|
export const isShutdown = self => self.isShutdown;
|
|
/** @internal */
|
|
export const offer = /*#__PURE__*/dual(2, (self, value) => self.offer(value));
|
|
/** @internal */
|
|
export const offerAll = /*#__PURE__*/dual(2, (self, iterable) => self.offerAll(iterable));
|
|
/** @internal */
|
|
export const peek = self => self.peek;
|
|
/** @internal */
|
|
export const peekOption = self => self.peekOption;
|
|
/** @internal */
|
|
export const poll = self => pipe(self.takeUpTo(1), core.map(RA.head));
|
|
/** @internal */
|
|
export const seek = /*#__PURE__*/dual(2, (self, predicate) => seekLoop(self, predicate));
|
|
const seekLoop = (self, predicate) => core.flatMap(self.take, a => predicate(a) ? core.succeed(a) : seekLoop(self, predicate));
|
|
/** @internal */
|
|
export const shutdown = self => self.shutdown;
|
|
/** @internal */
|
|
export const size = self => self.size;
|
|
/** @internal */
|
|
export const sliding = requestedCapacity => makeQueue(requestedCapacity, Sliding);
|
|
/** @internal */
|
|
export const take = self => self.take;
|
|
/** @internal */
|
|
export const takeAll = self => self.takeAll;
|
|
/** @internal */
|
|
export const takeBetween = /*#__PURE__*/dual(3, (self, min, max) => stm.suspend(() => {
|
|
const takeRemainder = (min, max, acc) => {
|
|
if (max < min) {
|
|
return core.succeed(acc);
|
|
}
|
|
return pipe(self.takeUpTo(max), core.flatMap(taken => {
|
|
const remaining = min - taken.length;
|
|
if (remaining === 1) {
|
|
return pipe(self.take, core.map(a => pipe(acc, Chunk.appendAll(Chunk.unsafeFromArray(taken)), Chunk.append(a))));
|
|
}
|
|
if (remaining > 1) {
|
|
return pipe(self.take, core.flatMap(a => takeRemainder(remaining - 1, max - taken.length - 1, pipe(acc, Chunk.appendAll(Chunk.unsafeFromArray(taken)), Chunk.append(a)))));
|
|
}
|
|
return core.succeed(pipe(acc, Chunk.appendAll(Chunk.unsafeFromArray(taken))));
|
|
}));
|
|
};
|
|
return core.map(takeRemainder(min, max, Chunk.empty()), c => Array.from(c));
|
|
}));
|
|
/** @internal */
|
|
export const takeN = /*#__PURE__*/dual(2, (self, n) => pipe(self, takeBetween(n, n)));
|
|
/** @internal */
|
|
export const takeUpTo = /*#__PURE__*/dual(2, (self, max) => self.takeUpTo(max));
|
|
/** @internal */
|
|
export const unbounded = () => makeQueue(Number.MAX_SAFE_INTEGER, Dropping);
|
|
const makeQueue = (requestedCapacity, strategy) => core.map(tRef.make([]), ref => new TQueueImpl(ref, requestedCapacity, strategy));
|
|
//# sourceMappingURL=tQueue.js.map
|