Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .talismanrc
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,8 @@ fileignoreconfig:
checksum: 580932f192dd3fdd8bb2c55b7a7a78f1694f646ef5c5041f86c75668778f7ecb
- filename: packages/contentstack-bulk-operations/test/unit/utils/asset-uids-from-file.test.ts
checksum: 8123f7a675a0275795b59b15d0f2d5f8f1e57ccbecf3f97249a0dc5a037b9203
- filename: packages/contentstack-bulk-operations/src/utils/data-dir-asset-fetcher.ts
checksum: ddcf4601ac47be300eba0eb901e4d45d748c2c4eab676c56677cb9a802fe3db0
- filename: packages/contentstack-bulk-operations/src/commands/cm/stacks/bulk-assets.ts
checksum: c8767e79c1010cccf984d2124b2c6252b7cba1fb1b6a9f216fa7ddb5b195a935
version: '1.0'
12 changes: 10 additions & 2 deletions packages/contentstack-bulk-operations/src/base-bulk-command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ export abstract class BaseBulkCommand extends Command {
protected rateLimiter!: AdaptiveRateLimiter;
protected retryStrategy!: RetryStrategy;
protected operationExecutor!: OperationExecutor;
private batchResults: Map<string, BulkJobResult> = new Map();
protected batchResults: Map<string, BulkJobResult> = new Map();
protected parsedFlags: any;

/**
Expand Down Expand Up @@ -165,7 +165,7 @@ export abstract class BaseBulkCommand extends Command {
}

// Fill missing required flags via interactive prompts
flags = await fillMissingFlags(flags);
flags = await this.resolveFlagsInteractively(flags);
this.parsedFlags = flags;

await this.buildConfiguration(flags);
Expand Down Expand Up @@ -265,6 +265,14 @@ export abstract class BaseBulkCommand extends Command {
}
}


/**
* Resolve flags interactively — subclasses can override to skip prompts for specific modes.
*/
protected async resolveFlagsInteractively(flags: any): Promise<any> {
return await fillMissingFlags(flags);
}

/**
* Build operation configuration
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import { flags, handleAndLogError, FlagInput } from '@contentstack/cli-utilities';
import * as fs from 'fs';
import * as path from 'path';

import { ResourceType } from '../../../interfaces';
import { flags, handleAndLogError, log } from '@contentstack/cli-utilities';

import { AssetPublishData, BulkOperationResult, OperationType, ResourceType } from '../../../interfaces';
import { BaseBulkCommand } from '../../../base-bulk-command';
import { $t, messages, fetchAssets } from '../../../utils';
import { $t, messages, fetchAssets, scanDataDirStats, BATCH_CONSTANTS, categorizeByScanStatus } from '../../../utils';
import type { DataDirScanStats } from '../../../utils';
import { AssetService } from '../../../services';

/**
* Bulk operations command for assets
* Supports publish, unpublish, and cross publish operations
* Supports publish, unpublish, cross-publish, and data-dir publish operations
*/
export default class BulkAssets extends BaseBulkCommand {
static description = messages.BULK_ASSETS_DESCRIPTION;
Expand All @@ -32,44 +37,87 @@ export default class BulkAssets extends BaseBulkCommand {

// Revert (unpublish) previously published assets using success log
'<%= config.bin %> <%= command.id %> --revert ./bulk-operation -a myAlias',

// Publish assets from exported content folder (e.g. after asset scanning clears)
'<%= config.bin %> <%= command.id %> --data-dir ./content --operation publish -k blt123',
];

static flags: FlagInput = {
static flags = {
...BaseBulkCommand.baseFlags,
'folder-uid': flags.string({
description: messages.FOLDER_UID,
}),
'data-dir': flags.string({
char: 'd',
description: messages.DATA_DIR_FLAG_DESC,
}),
'dry-run': flags.boolean({
description: messages.DRY_RUN_FLAG_DESC,
default: false,
}),
};

protected resourceType: ResourceType = ResourceType.ASSET;

protected async resolveFlagsInteractively(flags: any): Promise<any> {
if (flags['data-dir']) {
return flags;
}
return super.resolveFlagsInteractively(flags);
}

async run(): Promise<void> {
try {
// Handle cross-publish separately if source-env is specified
if (this.bulkOperationConfig.sourceEnv) {
await this.handleCrossPublish(this.parsedFlags);
return;
}

if (this.bulkOperationConfig.dataDir) {
await this.runDataDirFlow();
return;
}

const assets = await this.fetchItems();

if (assets.length === 0) {
this.logger.warn($t(messages.NO_ITEMS_FOUND, { resourceType: ResourceType.ASSET }));
return;
}

this.logger.info(
$t(messages.FOUND_ASSETS_TO_OPERATE, { count: assets.length, operation: this.parsedFlags.operation || '' })
);
const { clean, pending, quarantined, noStatus } = categorizeByScanStatus(assets);
const scanningEnabled = clean.length + pending.length + quarantined.length > 0;
const publishable = scanningEnabled ? clean : [...clean, ...noStatus];

if (scanningEnabled) {
// Log individual skipped assets
pending.forEach((a) => this.logger.warn($t(messages.SCAN_STATUS_SKIPPED_PENDING, { uid: a.uid })));
quarantined.forEach((a) => this.logger.warn($t(messages.SCAN_STATUS_SKIPPED_QUARANTINED, { uid: a.uid })));

// Confirm operation
const confirmed = await this.confirmOperation(assets);
this.printScanningDashboard({
total: assets.length,
clean: clean.length,
pending: pending.length,
quarantined: quarantined.length,
});

if (publishable.length === 0) {
this.logger.warn($t(messages.NO_PUBLISHABLE_ASSETS));
return;
}
} else {
log.info(
$t(messages.FOUND_ASSETS_TO_OPERATE, { count: assets.length, operation: this.parsedFlags.operation || '' })
);
}

const confirmed = await this.confirmOperation(publishable);
if (!confirmed) {
this.logger.warn($t(messages.OPERATION_CANCELLED));
return;
}

const result = await this.executeBulkOperation(assets);
const result = await this.executeBulkOperation(publishable);
this.printOperationSummary(result);
} catch (error) {
handleAndLogError(error);
Expand All @@ -78,6 +126,218 @@ export default class BulkAssets extends BaseBulkCommand {
}
}

private async runDataDirFlow(): Promise<void> {
const { dataDir, dryRun } = this.bulkOperationConfig;

// Capture original CLI locales/envs before pass 1 overwrites them on the config.
const cliLocales = [...(this.bulkOperationConfig.locales || [])];
const cliEnvs = [...(this.bulkOperationConfig.environments || [])];

// Pass 1 — count-only scan: no AssetPublishData objects built, one chunk in memory at a time.
let stats: DataDirScanStats;
try {
stats = await scanDataDirStats(dataDir!, cliEnvs, cliLocales, this.logger);
} catch (err: any) {
this.logger.error($t(messages.DATA_DIR_READ_ERROR, { path: dataDir!, error: err.message || String(err) }));
return;
}

this.bulkOperationConfig.environments = stats.environments;
this.bulkOperationConfig.locales = stats.locales;

// Pass 1.5 — fetch scan status for all target UIDs (post-import UIDs on the destination stack).
const targetUids = Object.values(stats.assetUidMapper);
const assetService = new AssetService(this.managementStack, this.deliveryStack, this.logger);
const scanStatusMap = await assetService.fetchScanStatusByUIDs(targetUids);

let cleanCount = 0;
let pendingCount = 0;
let quarantinedCount = 0;
for (const uid of targetUids) {
const status = scanStatusMap.get(uid);
if (status === 'pending') pendingCount++;
else if (status === 'quarantined') quarantinedCount++;
else cleanCount++; // clean or undefined (scanning disabled) — both are publishable
}

this.printScanningDashboard({
total: stats.eligible + stats.skipped + stats.unmapped,
localSkipped: stats.skipped,
unmapped: stats.unmapped,
clean: cleanCount,
pending: pendingCount,
quarantined: quarantinedCount,
});

if (cleanCount === 0) {
this.logger.warn($t(messages.NO_PUBLISHABLE_ASSETS));
return;
}

// new Array(n) has .length === n but allocates no elements — just for the count.
const confirmed = await this.confirmOperation(new Array(cleanCount));
if (!confirmed) {
this.logger.warn($t(messages.OPERATION_CANCELLED));
return;
}

if (dryRun) {
log.info($t(messages.DATA_DIR_DRY_RUN));
return;
}

// Pass 2 — stream and publish: one chunk at a time, batches of ≤50 items enqueued directly.
// stats.assetUidMapper and stats.assetsIndex are reused from pass 1 — no second disk read.
const result = await this.streamAndPublish(
dataDir!,
cliLocales,
stats.totalItems,
stats.assetUidMapper,
stats.assetsIndex,
scanStatusMap
);
this.printOperationSummary(result);
}

/**
* Pass 2 of the data-dir flow.
* Reads chunk files one at a time, fills a working batch of ≤50 AssetPublishData items,
* and enqueues each batch directly into the queue manager without ever holding the full
* asset list in memory. Peak memory: one chunk file + one batch of ≤50 items.
*
* assetUidMapper and assetsIndex are passed in from pass 1 to avoid re-reading those files.
* scanStatusMap filters out non-clean assets before enqueueing.
*/
private async streamAndPublish(
dataDir: string,
cliLocales: string[],
totalItemCount: number,
assetUidMapper: Record<string, string>,
assetsIndex: Record<string, string>,
scanStatusMap: Map<string, string | undefined>
): Promise<BulkOperationResult> {
// Snapshot both arrays so in-flight mutations to bulkOperationConfig can't corrupt payloads.
const environments = [...this.bulkOperationConfig.environments!];
const locales = [...this.bulkOperationConfig.locales!];
const operation = this.bulkOperationConfig.operation as OperationType;
const startTime = Date.now();

// Warn early if the mapper is empty — all assets will be skipped and the user needs to know why.
if (Object.keys(assetUidMapper).length === 0) {
this.logger.warn(
'Asset UID mapper is empty — all assets will be skipped. Ensure the import completed successfully.'
);
}

const useOverrideLocales = cliLocales.length > 0;
const BATCH_SIZE = BATCH_CONSTANTS.maxItems;
// totalItemCount comes from pass 1 using identical counting logic — used as upper bound for totalBatches.
// Scan status filtering may reduce the actual count; the invariant check below will log any mismatch.
const totalBatches = Math.ceil(totalItemCount / BATCH_SIZE);

let workingBatch: AssetPublishData[] = [];
let batchNumber = 0;
let totalSubmitted = 0;

this.batchResults.clear();

const flushBatch = (): void => {
if (workingBatch.length === 0) return;
batchNumber++;
this.queueManager.enqueue(ResourceType.ASSET, operation, {
items: [...workingBatch],
environments,
locales,
batchNumber,
totalBatches,
operation,
});
totalSubmitted += workingBatch.length;
workingBatch = [];
};

for (const chunkFilename of Object.values(assetsIndex)) {
const chunkPath = path.join(dataDir, 'assets', chunkFilename);
const chunkData: Record<string, any> = JSON.parse(fs.readFileSync(chunkPath, 'utf-8'));

for (const asset of Object.values(chunkData)) {
if (!asset.publish_details || asset.publish_details.length === 0) continue;
const targetUid = assetUidMapper[asset.uid as string];
if (!targetUid) continue;

// Skip assets that did not pass scanning.
const scanStatus = scanStatusMap.get(targetUid);
if (scanStatus === 'quarantined') {
this.logger.warn($t(messages.SCAN_STATUS_SKIPPED_QUARANTINED, { uid: targetUid }));
continue;
}
if (scanStatus === 'pending') {
this.logger.warn($t(messages.SCAN_STATUS_SKIPPED_PENDING, { uid: targetUid }));
continue;
}

const assetLocales: string[] = useOverrideLocales
? cliLocales
: [...new Set<string>(asset.publish_details.map((pd: any) => pd.locale as string))];

for (const locale of assetLocales) {
workingBatch.push({ type: 'asset', uid: targetUid, locale, version: asset._version });
if (workingBatch.length >= BATCH_SIZE) {
flushBatch();
}
}
}
// chunkData falls out of scope here — GC can reclaim it before the next chunk is read.
}

flushBatch();

// Invariant: pass 1 and pass 2 use identical counting logic (excluding scan status filtering).
// If batchNumber < totalBatches, scan status filtering reduced the published count — expected.
if (batchNumber !== totalBatches) {
this.logger.debug(
`Batch count: predicted ${totalBatches}, actual ${batchNumber}. Difference is expected when assets are skipped due to scan status.`
);
}

await this.queueManager.waitForCompletion();

const duration = Date.now() - startTime;
const jobIds = [...this.batchResults.values()].map((r) => r.jobId).filter((id): id is string => !!id);

return { success: 0, failed: 0, total: totalSubmitted, duration, jobIds };
}

private printScanningDashboard(opts: {
total: number;
clean: number;
pending: number;
quarantined: number;
localSkipped?: number;
unmapped?: number;
}): void {
const { total, clean, pending, quarantined, localSkipped, unmapped } = opts;
const SEP = '─'.repeat(42);

log.info('');
log.info(` ${messages.DATA_DIR_ASSET_SCANNING_HEADER}`);
log.info(' ' + SEP);
log.info(` ${messages.DATA_DIR_TOTAL.padEnd(38)} ${total}`);
if (localSkipped !== undefined) {
log.warn(` ${messages.DATA_DIR_NO_PUBLISH_DETAILS.padEnd(38)} ${localSkipped}`);
}
if (unmapped !== undefined) {
log.warn(` ${messages.DATA_DIR_UNMAPPED.padEnd(38)} ${unmapped}`);
}
log.info(' ' + SEP);
log.info(` ${messages.SCAN_STATUS_CLEAN.padEnd(38)} ${clean}`);
if (pending > 0) log.warn(` ${messages.SCAN_STATUS_PENDING.padEnd(38)} ${pending}`);
if (quarantined > 0) log.warn(` ${messages.SCAN_STATUS_QUARANTINED.padEnd(38)} ${quarantined}`);
log.info(' ' + SEP);
log.info(` ${messages.DATA_DIR_WILL_PUBLISH.padEnd(38)} ${clean}`);
log.info('');
}

protected async fetchItems(): Promise<any[]> {
return await fetchAssets(this.bulkOperationConfig, this.managementStack, this.deliveryStack, this.logger);
}
Expand Down
Loading
Loading