diff --git a/.talismanrc b/.talismanrc index 80639185..e22feb87 100644 --- a/.talismanrc +++ b/.talismanrc @@ -181,4 +181,8 @@ fileignoreconfig: checksum: 580932f192dd3fdd8bb2c55b7a7a78f1694f646ef5c5041f86c75668778f7ecb - filename: packages/contentstack-bulk-operations/test/unit/utils/asset-uids-from-file.test.ts checksum: 8123f7a675a0275795b59b15d0f2d5f8f1e57ccbecf3f97249a0dc5a037b9203 +- filename: packages/contentstack-bulk-operations/src/utils/data-dir-asset-fetcher.ts + checksum: ddcf4601ac47be300eba0eb901e4d45d748c2c4eab676c56677cb9a802fe3db0 +- filename: packages/contentstack-bulk-operations/src/commands/cm/stacks/bulk-assets.ts + checksum: c8767e79c1010cccf984d2124b2c6252b7cba1fb1b6a9f216fa7ddb5b195a935 version: '1.0' diff --git a/packages/contentstack-bulk-operations/src/base-bulk-command.ts b/packages/contentstack-bulk-operations/src/base-bulk-command.ts index e5f0bc62..a6b0a987 100644 --- a/packages/contentstack-bulk-operations/src/base-bulk-command.ts +++ b/packages/contentstack-bulk-operations/src/base-bulk-command.ts @@ -132,7 +132,7 @@ export abstract class BaseBulkCommand extends Command { protected rateLimiter!: AdaptiveRateLimiter; protected retryStrategy!: RetryStrategy; protected operationExecutor!: OperationExecutor; - private batchResults: Map = new Map(); + protected batchResults: Map = new Map(); protected parsedFlags: any; /** @@ -165,7 +165,7 @@ export abstract class BaseBulkCommand extends Command { } // Fill missing required flags via interactive prompts - flags = await fillMissingFlags(flags); + flags = await this.resolveFlagsInteractively(flags); this.parsedFlags = flags; await this.buildConfiguration(flags); @@ -265,6 +265,14 @@ export abstract class BaseBulkCommand extends Command { } } + + /** + * Resolve flags interactively — subclasses can override to skip prompts for specific modes. + */ + protected async resolveFlagsInteractively(flags: any): Promise { + return await fillMissingFlags(flags); + } + /** * Build operation configuration */ diff --git a/packages/contentstack-bulk-operations/src/commands/cm/stacks/bulk-assets.ts b/packages/contentstack-bulk-operations/src/commands/cm/stacks/bulk-assets.ts index 71122a91..53bc50c1 100644 --- a/packages/contentstack-bulk-operations/src/commands/cm/stacks/bulk-assets.ts +++ b/packages/contentstack-bulk-operations/src/commands/cm/stacks/bulk-assets.ts @@ -1,12 +1,17 @@ -import { flags, handleAndLogError, FlagInput } from '@contentstack/cli-utilities'; +import * as fs from 'fs'; +import * as path from 'path'; -import { ResourceType } from '../../../interfaces'; +import { flags, handleAndLogError, log } from '@contentstack/cli-utilities'; + +import { AssetPublishData, BulkOperationResult, OperationType, ResourceType } from '../../../interfaces'; import { BaseBulkCommand } from '../../../base-bulk-command'; -import { $t, messages, fetchAssets } from '../../../utils'; +import { $t, messages, fetchAssets, scanDataDirStats, BATCH_CONSTANTS, categorizeByScanStatus } from '../../../utils'; +import type { DataDirScanStats } from '../../../utils'; +import { AssetService } from '../../../services'; /** * Bulk operations command for assets - * Supports publish, unpublish, and cross publish operations + * Supports publish, unpublish, cross-publish, and data-dir publish operations */ export default class BulkAssets extends BaseBulkCommand { static description = messages.BULK_ASSETS_DESCRIPTION; @@ -32,25 +37,47 @@ export default class BulkAssets extends BaseBulkCommand { // Revert (unpublish) previously published assets using success log '<%= config.bin %> <%= command.id %> --revert ./bulk-operation -a myAlias', + + // Publish assets from exported content folder (e.g. after asset scanning clears) + '<%= config.bin %> <%= command.id %> --data-dir ./content --operation publish -k blt123', ]; - static flags: FlagInput = { + static flags = { ...BaseBulkCommand.baseFlags, 'folder-uid': flags.string({ description: messages.FOLDER_UID, }), + 'data-dir': flags.string({ + char: 'd', + description: messages.DATA_DIR_FLAG_DESC, + }), + 'dry-run': flags.boolean({ + description: messages.DRY_RUN_FLAG_DESC, + default: false, + }), }; protected resourceType: ResourceType = ResourceType.ASSET; + protected async resolveFlagsInteractively(flags: any): Promise { + if (flags['data-dir']) { + return flags; + } + return super.resolveFlagsInteractively(flags); + } + async run(): Promise { try { - // Handle cross-publish separately if source-env is specified if (this.bulkOperationConfig.sourceEnv) { await this.handleCrossPublish(this.parsedFlags); return; } + if (this.bulkOperationConfig.dataDir) { + await this.runDataDirFlow(); + return; + } + const assets = await this.fetchItems(); if (assets.length === 0) { @@ -58,18 +85,39 @@ export default class BulkAssets extends BaseBulkCommand { return; } - this.logger.info( - $t(messages.FOUND_ASSETS_TO_OPERATE, { count: assets.length, operation: this.parsedFlags.operation || '' }) - ); + const { clean, pending, quarantined, noStatus } = categorizeByScanStatus(assets); + const scanningEnabled = clean.length + pending.length + quarantined.length > 0; + const publishable = scanningEnabled ? clean : [...clean, ...noStatus]; + + if (scanningEnabled) { + // Log individual skipped assets + pending.forEach((a) => this.logger.warn($t(messages.SCAN_STATUS_SKIPPED_PENDING, { uid: a.uid }))); + quarantined.forEach((a) => this.logger.warn($t(messages.SCAN_STATUS_SKIPPED_QUARANTINED, { uid: a.uid }))); - // Confirm operation - const confirmed = await this.confirmOperation(assets); + this.printScanningDashboard({ + total: assets.length, + clean: clean.length, + pending: pending.length, + quarantined: quarantined.length, + }); + + if (publishable.length === 0) { + this.logger.warn($t(messages.NO_PUBLISHABLE_ASSETS)); + return; + } + } else { + log.info( + $t(messages.FOUND_ASSETS_TO_OPERATE, { count: assets.length, operation: this.parsedFlags.operation || '' }) + ); + } + + const confirmed = await this.confirmOperation(publishable); if (!confirmed) { this.logger.warn($t(messages.OPERATION_CANCELLED)); return; } - const result = await this.executeBulkOperation(assets); + const result = await this.executeBulkOperation(publishable); this.printOperationSummary(result); } catch (error) { handleAndLogError(error); @@ -78,6 +126,218 @@ export default class BulkAssets extends BaseBulkCommand { } } + private async runDataDirFlow(): Promise { + const { dataDir, dryRun } = this.bulkOperationConfig; + + // Capture original CLI locales/envs before pass 1 overwrites them on the config. + const cliLocales = [...(this.bulkOperationConfig.locales || [])]; + const cliEnvs = [...(this.bulkOperationConfig.environments || [])]; + + // Pass 1 — count-only scan: no AssetPublishData objects built, one chunk in memory at a time. + let stats: DataDirScanStats; + try { + stats = await scanDataDirStats(dataDir!, cliEnvs, cliLocales, this.logger); + } catch (err: any) { + this.logger.error($t(messages.DATA_DIR_READ_ERROR, { path: dataDir!, error: err.message || String(err) })); + return; + } + + this.bulkOperationConfig.environments = stats.environments; + this.bulkOperationConfig.locales = stats.locales; + + // Pass 1.5 — fetch scan status for all target UIDs (post-import UIDs on the destination stack). + const targetUids = Object.values(stats.assetUidMapper); + const assetService = new AssetService(this.managementStack, this.deliveryStack, this.logger); + const scanStatusMap = await assetService.fetchScanStatusByUIDs(targetUids); + + let cleanCount = 0; + let pendingCount = 0; + let quarantinedCount = 0; + for (const uid of targetUids) { + const status = scanStatusMap.get(uid); + if (status === 'pending') pendingCount++; + else if (status === 'quarantined') quarantinedCount++; + else cleanCount++; // clean or undefined (scanning disabled) — both are publishable + } + + this.printScanningDashboard({ + total: stats.eligible + stats.skipped + stats.unmapped, + localSkipped: stats.skipped, + unmapped: stats.unmapped, + clean: cleanCount, + pending: pendingCount, + quarantined: quarantinedCount, + }); + + if (cleanCount === 0) { + this.logger.warn($t(messages.NO_PUBLISHABLE_ASSETS)); + return; + } + + // new Array(n) has .length === n but allocates no elements — just for the count. + const confirmed = await this.confirmOperation(new Array(cleanCount)); + if (!confirmed) { + this.logger.warn($t(messages.OPERATION_CANCELLED)); + return; + } + + if (dryRun) { + log.info($t(messages.DATA_DIR_DRY_RUN)); + return; + } + + // Pass 2 — stream and publish: one chunk at a time, batches of ≤50 items enqueued directly. + // stats.assetUidMapper and stats.assetsIndex are reused from pass 1 — no second disk read. + const result = await this.streamAndPublish( + dataDir!, + cliLocales, + stats.totalItems, + stats.assetUidMapper, + stats.assetsIndex, + scanStatusMap + ); + this.printOperationSummary(result); + } + + /** + * Pass 2 of the data-dir flow. + * Reads chunk files one at a time, fills a working batch of ≤50 AssetPublishData items, + * and enqueues each batch directly into the queue manager without ever holding the full + * asset list in memory. Peak memory: one chunk file + one batch of ≤50 items. + * + * assetUidMapper and assetsIndex are passed in from pass 1 to avoid re-reading those files. + * scanStatusMap filters out non-clean assets before enqueueing. + */ + private async streamAndPublish( + dataDir: string, + cliLocales: string[], + totalItemCount: number, + assetUidMapper: Record, + assetsIndex: Record, + scanStatusMap: Map + ): Promise { + // Snapshot both arrays so in-flight mutations to bulkOperationConfig can't corrupt payloads. + const environments = [...this.bulkOperationConfig.environments!]; + const locales = [...this.bulkOperationConfig.locales!]; + const operation = this.bulkOperationConfig.operation as OperationType; + const startTime = Date.now(); + + // Warn early if the mapper is empty — all assets will be skipped and the user needs to know why. + if (Object.keys(assetUidMapper).length === 0) { + this.logger.warn( + 'Asset UID mapper is empty — all assets will be skipped. Ensure the import completed successfully.' + ); + } + + const useOverrideLocales = cliLocales.length > 0; + const BATCH_SIZE = BATCH_CONSTANTS.maxItems; + // totalItemCount comes from pass 1 using identical counting logic — used as upper bound for totalBatches. + // Scan status filtering may reduce the actual count; the invariant check below will log any mismatch. + const totalBatches = Math.ceil(totalItemCount / BATCH_SIZE); + + let workingBatch: AssetPublishData[] = []; + let batchNumber = 0; + let totalSubmitted = 0; + + this.batchResults.clear(); + + const flushBatch = (): void => { + if (workingBatch.length === 0) return; + batchNumber++; + this.queueManager.enqueue(ResourceType.ASSET, operation, { + items: [...workingBatch], + environments, + locales, + batchNumber, + totalBatches, + operation, + }); + totalSubmitted += workingBatch.length; + workingBatch = []; + }; + + for (const chunkFilename of Object.values(assetsIndex)) { + const chunkPath = path.join(dataDir, 'assets', chunkFilename); + const chunkData: Record = JSON.parse(fs.readFileSync(chunkPath, 'utf-8')); + + for (const asset of Object.values(chunkData)) { + if (!asset.publish_details || asset.publish_details.length === 0) continue; + const targetUid = assetUidMapper[asset.uid as string]; + if (!targetUid) continue; + + // Skip assets that did not pass scanning. + const scanStatus = scanStatusMap.get(targetUid); + if (scanStatus === 'quarantined') { + this.logger.warn($t(messages.SCAN_STATUS_SKIPPED_QUARANTINED, { uid: targetUid })); + continue; + } + if (scanStatus === 'pending') { + this.logger.warn($t(messages.SCAN_STATUS_SKIPPED_PENDING, { uid: targetUid })); + continue; + } + + const assetLocales: string[] = useOverrideLocales + ? cliLocales + : [...new Set(asset.publish_details.map((pd: any) => pd.locale as string))]; + + for (const locale of assetLocales) { + workingBatch.push({ type: 'asset', uid: targetUid, locale, version: asset._version }); + if (workingBatch.length >= BATCH_SIZE) { + flushBatch(); + } + } + } + // chunkData falls out of scope here — GC can reclaim it before the next chunk is read. + } + + flushBatch(); + + // Invariant: pass 1 and pass 2 use identical counting logic (excluding scan status filtering). + // If batchNumber < totalBatches, scan status filtering reduced the published count — expected. + if (batchNumber !== totalBatches) { + this.logger.debug( + `Batch count: predicted ${totalBatches}, actual ${batchNumber}. Difference is expected when assets are skipped due to scan status.` + ); + } + + await this.queueManager.waitForCompletion(); + + const duration = Date.now() - startTime; + const jobIds = [...this.batchResults.values()].map((r) => r.jobId).filter((id): id is string => !!id); + + return { success: 0, failed: 0, total: totalSubmitted, duration, jobIds }; + } + + private printScanningDashboard(opts: { + total: number; + clean: number; + pending: number; + quarantined: number; + localSkipped?: number; + unmapped?: number; + }): void { + const { total, clean, pending, quarantined, localSkipped, unmapped } = opts; + const SEP = '─'.repeat(42); + + log.info(''); + log.info(` ${messages.DATA_DIR_ASSET_SCANNING_HEADER}`); + log.info(' ' + SEP); + log.info(` ${messages.DATA_DIR_TOTAL.padEnd(38)} ${total}`); + if (localSkipped !== undefined) { + log.warn(` ${messages.DATA_DIR_NO_PUBLISH_DETAILS.padEnd(38)} ${localSkipped}`); + } + if (unmapped !== undefined) { + log.warn(` ${messages.DATA_DIR_UNMAPPED.padEnd(38)} ${unmapped}`); + } + log.info(' ' + SEP); + log.info(` ${messages.SCAN_STATUS_CLEAN.padEnd(38)} ${clean}`); + if (pending > 0) log.warn(` ${messages.SCAN_STATUS_PENDING.padEnd(38)} ${pending}`); + if (quarantined > 0) log.warn(` ${messages.SCAN_STATUS_QUARANTINED.padEnd(38)} ${quarantined}`); + log.info(' ' + SEP); + log.info(` ${messages.DATA_DIR_WILL_PUBLISH.padEnd(38)} ${clean}`); + log.info(''); + } + protected async fetchItems(): Promise { return await fetchAssets(this.bulkOperationConfig, this.managementStack, this.deliveryStack, this.logger); } diff --git a/packages/contentstack-bulk-operations/src/interfaces/index.ts b/packages/contentstack-bulk-operations/src/interfaces/index.ts index b5a18fe7..13a4f295 100644 --- a/packages/contentstack-bulk-operations/src/interfaces/index.ts +++ b/packages/contentstack-bulk-operations/src/interfaces/index.ts @@ -58,6 +58,8 @@ export interface BulkOperationConfig { // Asset-specific options folderUid?: string; + dataDir?: string; + dryRun?: boolean; // Cross-publish sourceEnv?: string; @@ -135,6 +137,7 @@ export interface Asset { title?: string; _version?: number; publish_details?: PublishDetails[]; + _asset_scan_status?: 'pending' | 'clean' | 'quarantined'; [key: string]: any; } @@ -196,6 +199,8 @@ export interface CommandFlags { // Asset-specific flags 'folder-uid'?: string; + 'data-dir'?: string; + 'dry-run'?: boolean; /** AM bulk delete/move */ 'space-uid'?: string; @@ -254,6 +259,7 @@ export interface AssetPublishData { locale: string; version?: number; publish_details?: PublishDetails[]; + _asset_scan_status?: 'pending' | 'clean' | 'quarantined'; } /** One row for AM bulk-delete payload `{ uid, locale }[]`. */ diff --git a/packages/contentstack-bulk-operations/src/messages/index.ts b/packages/contentstack-bulk-operations/src/messages/index.ts index a285f734..bc963354 100644 --- a/packages/contentstack-bulk-operations/src/messages/index.ts +++ b/packages/contentstack-bulk-operations/src/messages/index.ts @@ -210,6 +210,25 @@ const bulkAssetsMsg = { CROSS_PUBLISHING: 'Cross-publishing from {sourceEnv} to {targetEnvs}', SYNCED_ASSETS: 'Synced {count} assets from {sourceEnv}', ASSETS_READY_FOR_CROSS_PUBLISH: '{count} assets ready for cross-publish', + + // Data-dir / scanning dashboard + DATA_DIR_ASSET_SCANNING_HEADER: 'Asset Scan Status', + DATA_DIR_TOTAL: 'Total assets found', + DATA_DIR_VALID: 'Clean (will publish)', + DATA_DIR_NO_PUBLISH_DETAILS: 'No publish details (skipped)', + DATA_DIR_UNMAPPED: 'Not imported / UID unmapped (skipped)', + DATA_DIR_WILL_PUBLISH: 'Will publish', + DATA_DIR_DRY_RUN: 'Dry run — no publish API calls will be made.', + DATA_DIR_FLAG_DESC: 'Path to exported content folder containing asset publish details.', + DRY_RUN_FLAG_DESC: 'Preview the publish plan without making any API calls.', + DATA_DIR_READ_ERROR: 'Failed to read data directory at {path}: {error}', + SCAN_STATUS_CLEAN: 'Clean (will publish)', + SCAN_STATUS_PENDING: 'Still scanning (skipped)', + SCAN_STATUS_QUARANTINED: 'Quarantined (skipped)', + SCAN_STATUS_SKIPPED_PENDING: 'Skipped (still scanning): {uid}', + SCAN_STATUS_SKIPPED_QUARANTINED: 'Skipped (quarantined): {uid}', + SCAN_STATUS_FETCHING: 'Checking asset scan status for {count} assets...', + NO_PUBLISHABLE_ASSETS: 'No publishable assets — all assets are either still scanning or quarantined.', }; /** diff --git a/packages/contentstack-bulk-operations/src/services/asset-service.ts b/packages/contentstack-bulk-operations/src/services/asset-service.ts index aba844fb..903ce373 100644 --- a/packages/contentstack-bulk-operations/src/services/asset-service.ts +++ b/packages/contentstack-bulk-operations/src/services/asset-service.ts @@ -37,7 +37,7 @@ export class AssetService { const batchUids = uids.slice(i, i + BATCH_CONSTANTS.assetFetchBatchSize); const batchPromises = batchUids.map(async (uid) => { try { - const asset = this.deliveryStack ? await this.deliveryStack.asset(uid).fetch() : undefined; + const asset = await this.deliveryStack?.asset(uid).fetch(); return asset; } catch (error: any) { // Asset might not exist or not be published to this environment @@ -125,7 +125,13 @@ export class AssetService { try { while (hasMore) { - const queryOptions: any = { skip, limit, include_count: true, include_publish_details: true }; + const queryOptions: any = { + skip, + limit, + include_count: true, + include_publish_details: true, + include_asset_scan_status: true, + }; // Add any filters from options if (options.query) { @@ -207,7 +213,14 @@ export class AssetService { while (hasMore) { const query = this.stack .asset() - .query({ skip, limit, include_count: true, include_publish_details: true, folder: folderUid }); + .query({ + skip, + limit, + include_count: true, + include_publish_details: true, + include_asset_scan_status: true, + folder: folderUid, + }); const response = await query.find(); const assets = response.items || []; @@ -273,4 +286,34 @@ export class AssetService { throw error; } } + + /** + * Fetch scan status for a specific list of asset UIDs. + * Batches requests at 100 UIDs per call to stay within API limits. + * Returns a Map — undefined means scanning is not enabled on the stack. + */ + async fetchScanStatusByUIDs(uids: string[]): Promise> { + const statusMap = new Map(); + if (uids.length === 0) return statusMap; + + this.logger.info($t(messages.SCAN_STATUS_FETCHING, { count: uids.length })); + + const BATCH = 100; + for (let i = 0; i < uids.length; i += BATCH) { + const batch = uids.slice(i, i + BATCH); + try { + const response = await this.stack + .asset() + .query({ uid: { $in: batch }, include_asset_scan_status: true, limit: BATCH }) + .find(); + for (const asset of response.items || []) { + statusMap.set(asset.uid, asset._asset_scan_status); + } + } catch (error: any) { + this.logger.warn(`Failed to fetch scan status for batch starting at index ${i}: ${error?.message}`); + } + } + + return statusMap; + } } diff --git a/packages/contentstack-bulk-operations/src/services/bulk-operation-service.ts b/packages/contentstack-bulk-operations/src/services/bulk-operation-service.ts index 705d1a90..18c9618f 100644 --- a/packages/contentstack-bulk-operations/src/services/bulk-operation-service.ts +++ b/packages/contentstack-bulk-operations/src/services/bulk-operation-service.ts @@ -47,13 +47,15 @@ export class BulkOperationService { async executeBulkPublish( items: Array, operation: OperationType, - resourceType: ResourceType + resourceType: ResourceType, + environments?: string[], + locales?: string[] ): Promise { this.logger.info($t(messages.SUBMITTING_BULK_JOB, { operation, count: items.length })); try { // Step 1: Submit bulk job - const jobId = await this.submitBulkJob(items, operation, resourceType); + const jobId = await this.submitBulkJob(items, operation, resourceType, environments, locales); this.logger.debug($t(messages.BULK_JOB_CREATED, { jobId })); // Return immediate result after job submission @@ -78,10 +80,12 @@ export class BulkOperationService { private async submitBulkJob( items: Array, operation: OperationType, - resourceType: ResourceType + resourceType: ResourceType, + environments?: string[], + locales?: string[] ): Promise { try { - const payload = this.prepareBulkPayload(items, operation, resourceType); + const payload = this.prepareBulkPayload(items, operation, resourceType, environments, locales); let response: any; switch (operation) { case OperationType.PUBLISH: @@ -203,16 +207,23 @@ export class BulkOperationService { private prepareBulkPayload( items: Array, operation: OperationType, - resourceType: ResourceType + resourceType: ResourceType, + environments?: string[], + locales?: string[] ): any { if (resourceType === ResourceType.ENTRY) { - return this.prepareEntryBulkPayload(items as EntryPublishData[], operation); + return this.prepareEntryBulkPayload(items as EntryPublishData[], operation, environments, locales); } else { - return this.prepareAssetBulkPayload(items as AssetPublishData[], operation); + return this.prepareAssetBulkPayload(items as AssetPublishData[], operation, environments, locales); } } - private prepareEntryBulkPayload(items: EntryPublishData[], operation: OperationType): any { + private prepareEntryBulkPayload( + items: EntryPublishData[], + operation: OperationType, + batchEnvironments?: string[], + batchLocales?: string[] + ): any { const entries = items.map((item) => { const entry: any = { uid: item.uid, @@ -233,8 +244,17 @@ export class BulkOperationService { return entry; }); - const environments = items[0]?.publish_details?.map((pd) => pd.environment) || []; - const locales = Array.from(new Set(items.map((item) => item.locale))); + const environments = batchEnvironments?.length + ? batchEnvironments + : items[0]?.publish_details?.map((pd) => pd.environment) || []; + const locales = batchLocales?.length ? batchLocales : Array.from(new Set(items.map((item) => item.locale))); + + if (!environments.length) { + throw new Error('No environments for bulk publish. Ensure entries have publish_details with environment data.'); + } + if (!locales.length) { + throw new Error('No locales for bulk publish. Ensure entries have a locale field.'); + } return { entries, @@ -244,14 +264,28 @@ export class BulkOperationService { }; } - private prepareAssetBulkPayload(items: AssetPublishData[], operation: OperationType): any { + private prepareAssetBulkPayload( + items: AssetPublishData[], + operation: OperationType, + batchEnvironments?: string[], + batchLocales?: string[] + ): any { const assets = items.map((item) => ({ uid: item.uid, version: item.version, })); - const environments = items[0]?.publish_details?.map((pd) => pd.environment) || []; - const locales = items[0]?.publish_details?.map((pd) => pd.locale) || []; + const environments = batchEnvironments?.length + ? batchEnvironments + : items[0]?.publish_details?.map((pd) => pd.environment) || []; + const locales = batchLocales?.length ? batchLocales : items[0]?.publish_details?.map((pd) => pd.locale) || []; + + if (!environments.length) { + throw new Error('No environments for bulk publish. Ensure assets have publish_details with environment data.'); + } + if (!locales.length) { + throw new Error('No locales for bulk publish. Ensure assets have publish_details with locale data.'); + } return { assets, diff --git a/packages/contentstack-bulk-operations/src/utils/batch-queue-handler.ts b/packages/contentstack-bulk-operations/src/utils/batch-queue-handler.ts index e32eadcd..4e624e5d 100644 --- a/packages/contentstack-bulk-operations/src/utils/batch-queue-handler.ts +++ b/packages/contentstack-bulk-operations/src/utils/batch-queue-handler.ts @@ -21,7 +21,7 @@ export function setupBatchQueueListeners(config: BatchQueueConfig) { } logger.info( - `Processing batch ${batch.batchNumber ?? 0}/${batch.totalBatches ?? 0}: ` + + `Processing batch ${batch.batchNumber}/${batch.totalBatches}: ` + `${batch.items.length} items, ` + `${batch.locales.length} locales, ` + `${batch.environments.length} environments` @@ -29,7 +29,13 @@ export function setupBatchQueueListeners(config: BatchQueueConfig) { (async () => { try { - const result = await bulkService.executeBulkPublish(batch.items, batch.operation, resourceType); + const result = await bulkService.executeBulkPublish( + batch.items, + batch.operation, + resourceType, + batch.environments, + batch.locales + ); batchResults.set(item.id, result); queueManager.updateItemStatus(item.id, OperationStatus.SUCCESS); @@ -76,7 +82,7 @@ export function setupBatchQueueListeners(config: BatchQueueConfig) { if (!batch) return; handleAndLogError(error, { - batchNumber: `${batch.batchNumber ?? 0}/${batch.totalBatches ?? 0}`, + batchNumber: `${batch.batchNumber}/${batch.totalBatches}`, itemCount: batch.items.length, }); @@ -109,7 +115,7 @@ async function handleRetryOrFailure({ : retryStrategy.getDelay(item.retryCount); logger.warn( - `Batch ${batch.batchNumber ?? 0}/${batch.totalBatches ?? 0} failed with ${ + `Batch ${batch.batchNumber}/${batch.totalBatches} failed with ${ isRateLimit ? '429 Rate Limit' : getErrorCode(error) }, retrying in ${Math.ceil(delay / 1000)}s` ); diff --git a/packages/contentstack-bulk-operations/src/utils/config-builder.ts b/packages/contentstack-bulk-operations/src/utils/config-builder.ts index c53deb98..e92e9fb7 100644 --- a/packages/contentstack-bulk-operations/src/utils/config-builder.ts +++ b/packages/contentstack-bulk-operations/src/utils/config-builder.ts @@ -69,24 +69,28 @@ function validateConfig(config: BulkOperationConfig): string[] { errors.push(`Invalid operation type: ${config.operation}. Must be 'publish' or 'unpublish'`); } - // Environments validation - if ( - (operation === OperationType.PUBLISH || operation === OperationType.UNPUBLISH) && - (!config.environments || config.environments.length === 0) - ) { - errors.push('Environments are required for publish/unpublish operations'); - } - if (config.environments?.some((env) => !env || env.trim() === '')) { - errors.push('Environment list cannot contain empty values'); + // Environments validation — skipped when assets are read from a data directory + if (!config.dataDir) { + if ( + (operation === OperationType.PUBLISH || operation === OperationType.UNPUBLISH) && + (!config.environments || config.environments.length === 0) + ) { + errors.push('Environments are required for publish/unpublish operations'); + } + if (config.environments?.some((env) => !env || env.trim() === '')) { + errors.push('Environment list cannot contain empty values'); + } } - // Locales validation + // Locales validation — skipped when assets are read from a data directory const isNonLocalized = config.filter === FilterType.NON_LOCALIZED; - if (!isNonLocalized && (!config.locales || config.locales.length === 0)) { - errors.push('Locales are required'); - } - if (config.locales?.some((locale) => !locale || locale.trim() === '')) { - errors.push('Locale list cannot contain empty values'); + if (!config.dataDir) { + if (!isNonLocalized && (!config.locales || config.locales.length === 0)) { + errors.push('Locales are required'); + } + if (config.locales?.some((locale) => !locale || locale.trim() === '')) { + errors.push('Locale list cannot contain empty values'); + } } // Filter validation @@ -144,24 +148,28 @@ function validateCommandFlags(flags: CommandFlags): string[] { const operation = flags.operation as OperationType; - // Environment validation - if ( - (operation === OperationType.PUBLISH || operation === OperationType.UNPUBLISH) && - (!flags.environments || flags.environments.length === 0) - ) { - errors.push('Environments are required for publish/unpublish operations'); - } - if (flags.environments?.some((env) => !env || env.trim() === '')) { - errors.push('Environment list cannot contain empty values'); + // Environment validation — skipped when assets are read from a data directory + if (!flags['data-dir']) { + if ( + (operation === OperationType.PUBLISH || operation === OperationType.UNPUBLISH) && + (!flags.environments || flags.environments.length === 0) + ) { + errors.push('Environments are required for publish/unpublish operations'); + } + if (flags.environments?.some((env) => !env || env.trim() === '')) { + errors.push('Environment list cannot contain empty values'); + } } - // Locale validation + // Locale validation — skipped when assets are read from a data directory const isNonLocalized = flags.filter === FilterType.NON_LOCALIZED; - if (!isNonLocalized && (!flags.locales || flags.locales.length === 0)) { - errors.push('Locales are required'); - } - if (flags.locales?.some((locale) => !locale || locale.trim() === '')) { - errors.push('Locale list cannot contain empty values'); + if (!flags['data-dir']) { + if (!isNonLocalized && (!flags.locales || flags.locales.length === 0)) { + errors.push('Locales are required'); + } + if (flags.locales?.some((locale) => !locale || locale.trim() === '')) { + errors.push('Locale list cannot contain empty values'); + } } // Content types validation @@ -243,6 +251,8 @@ export function buildConfig(flags: CommandFlags): BulkOperationConfig { contentTypes: flags['content-types'] !== undefined ? expandFlagStringList(flags['content-types']) : undefined, includeVariants: flags['include-variants'], folderUid: flags['folder-uid'], + dataDir: flags['data-dir'], + dryRun: flags['dry-run'], sourceEnv: flags['source-env'], publishMode: (flags['publish-mode'] as PublishMode) || PublishMode.BULK, apiVersion: flags['api-version'] || '3', diff --git a/packages/contentstack-bulk-operations/src/utils/data-dir-asset-fetcher.ts b/packages/contentstack-bulk-operations/src/utils/data-dir-asset-fetcher.ts new file mode 100644 index 00000000..71ea47bd --- /dev/null +++ b/packages/contentstack-bulk-operations/src/utils/data-dir-asset-fetcher.ts @@ -0,0 +1,114 @@ +import * as fs from 'fs'; +import * as path from 'path'; + +export interface DataDirScanStats { + /** Number of assets eligible for publish (have publish_details + mapped UID). */ + eligible: number; + /** Total AssetPublishData items that will be created (eligible × locale expansions). */ + totalItems: number; + skipped: number; + unmapped: number; + environments: string[]; + locales: string[]; + /** Reusable in pass 2 — already loaded during pass 1, avoids a second disk read. */ + assetUidMapper: Record; + /** Reusable in pass 2 — already loaded during pass 1, avoids a second disk read. */ + assetsIndex: Record; +} + +/** + * Pass 1: count-only scan of the data directory. + * Reads chunk files one at a time, counts eligible/skipped/unmapped, and + * discovers environments and locales — without building any AssetPublishData objects. + * Memory footprint: uid mapper + env map + one chunk at a time. + * + * Returns assetUidMapper and assetsIndex so pass 2 (streamAndPublish) can reuse them + * without re-reading the same files from disk. + */ +export async function scanDataDirStats( + dataDir: string, + overrideEnvs?: string[], + overrideLocales?: string[], + logger?: any +): Promise { + const assetsIndexPath = path.join(dataDir, 'assets', 'assets.json'); + const environmentsPath = path.join(dataDir, 'environments', 'environments.json'); + const assetUidMapperPath = path.join(dataDir, 'mapper', 'assets', 'uid-mapping.json'); + + if (!fs.existsSync(assetsIndexPath)) { + throw new Error( + `Asset index not found: ${assetsIndexPath}. Ensure --data-dir points to the import backup directory.` + ); + } + + let assetUidMapper: Record = {}; + if (fs.existsSync(assetUidMapperPath)) { + assetUidMapper = JSON.parse(fs.readFileSync(assetUidMapperPath, 'utf-8')); + } else { + logger?.warn( + `Asset UID mapper not found: ${assetUidMapperPath}. Ensure --data-dir points to the import backup directory.` + ); + } + + const environmentsMap: Record = {}; + if (fs.existsSync(environmentsPath)) { + const envData: Record = JSON.parse(fs.readFileSync(environmentsPath, 'utf-8')); + for (const [uid, env] of Object.entries(envData)) { + environmentsMap[uid] = (env as any).name || uid; + } + } else { + logger?.warn(`Environments file not found: ${environmentsPath}`); + } + + const assetsIndex: Record = JSON.parse(fs.readFileSync(assetsIndexPath, 'utf-8')); + + let eligible = 0; + let totalItems = 0; + let skipped = 0; + let unmapped = 0; + const allEnvs = new Set(); + const allLocales = new Set(); + + for (const chunkFilename of Object.values(assetsIndex)) { + const chunkPath = path.join(dataDir, 'assets', chunkFilename); + const chunkData: Record = JSON.parse(fs.readFileSync(chunkPath, 'utf-8')); + + for (const asset of Object.values(chunkData)) { + if (!asset.publish_details || asset.publish_details.length === 0) { + skipped++; + continue; + } + + const targetUid = assetUidMapper[asset.uid as string]; + if (!targetUid) { + unmapped++; + continue; + } + + eligible++; + + if (!overrideLocales?.length) { + for (const pd of asset.publish_details) { + if (pd.locale) allLocales.add(pd.locale as string); + } + } + if (!overrideEnvs?.length) { + for (const pd of asset.publish_details) { + const envName = environmentsMap[pd.environment] || pd.environment; + if (envName) allEnvs.add(envName as string); + } + } + + const localeCount = overrideLocales?.length + ? overrideLocales.length + : new Set(asset.publish_details.map((pd: any) => pd.locale as string)).size; + totalItems += localeCount; + } + // chunkData falls out of scope here — GC reclaims it + } + + const environments = overrideEnvs?.length ? overrideEnvs : [...allEnvs]; + const locales = overrideLocales?.length ? overrideLocales : [...allLocales]; + + return { eligible, totalItems, skipped, unmapped, environments, locales, assetUidMapper, assetsIndex }; +} diff --git a/packages/contentstack-bulk-operations/src/utils/helpers.ts b/packages/contentstack-bulk-operations/src/utils/helpers.ts index 7e5beb26..8b766604 100644 --- a/packages/contentstack-bulk-operations/src/utils/helpers.ts +++ b/packages/contentstack-bulk-operations/src/utils/helpers.ts @@ -1,7 +1,7 @@ import chalk from 'chalk'; import { getLogPath } from '@contentstack/cli-utilities'; import { $t, messages } from './index'; -import { AssetPublishData, EntryPublishData, BulkOperationResult, BulkJobResult } from '../interfaces'; +import { AssetPublishData, EntryPublishData, BulkOperationResult, BulkJobResult, Asset } from '../interfaces'; export function chunkArray(array: T[], chunkSize: number): T[][] { const chunks: T[][] = []; @@ -122,3 +122,37 @@ export function logSummary(result: any): void { console.log(''); } + +/** + * Categorize assets by their _asset_scan_status field. + * Assets with no status field belong to stacks where scanning is disabled — treat as publishable. + */ +export function categorizeByScanStatus(assets: Asset[]): { + clean: Asset[]; + pending: Asset[]; + quarantined: Asset[]; + noStatus: Asset[]; +} { + const clean: Asset[] = []; + const pending: Asset[] = []; + const quarantined: Asset[] = []; + const noStatus: Asset[] = []; + + for (const asset of assets) { + switch (asset._asset_scan_status) { + case 'clean': + clean.push(asset); + break; + case 'pending': + pending.push(asset); + break; + case 'quarantined': + quarantined.push(asset); + break; + default: + noStatus.push(asset); + } + } + + return { clean, pending, quarantined, noStatus }; +} diff --git a/packages/contentstack-bulk-operations/src/utils/index.ts b/packages/contentstack-bulk-operations/src/utils/index.ts index 02c334e3..66497598 100644 --- a/packages/contentstack-bulk-operations/src/utils/index.ts +++ b/packages/contentstack-bulk-operations/src/utils/index.ts @@ -21,6 +21,7 @@ import { aggregateBatchResults, createOperationResult, logSummary, + categorizeByScanStatus, } from './helpers'; import { setupBatchQueueListeners } from './batch-queue-handler'; import { confirmOperation } from './operation-confirmation'; @@ -51,6 +52,8 @@ import { validateAndBuildBulkDeleteItems, LoadAssetUidsError, } from './asset-uids-from-file'; +import { scanDataDirStats } from './data-dir-asset-fetcher'; +import type { DataDirScanStats } from './data-dir-asset-fetcher'; import { compareFieldValues, compareNonLocalizedFields, @@ -90,6 +93,7 @@ export { fetchAssets, fetchEntries, logSummary, + categorizeByScanStatus, logOperationInfo, validateBatch, enqueueIndividualItems, @@ -117,4 +121,6 @@ export { loadBulkDeleteItemsFromFile, validateAndBuildBulkDeleteItems, LoadAssetUidsError, + scanDataDirStats, }; +export type { DataDirScanStats }; diff --git a/packages/contentstack-bulk-operations/src/utils/item-fetcher.ts b/packages/contentstack-bulk-operations/src/utils/item-fetcher.ts index 35076ade..c794520c 100644 --- a/packages/contentstack-bulk-operations/src/utils/item-fetcher.ts +++ b/packages/contentstack-bulk-operations/src/utils/item-fetcher.ts @@ -221,6 +221,7 @@ export async function fetchAssets( environment: env, locale, })), + _asset_scan_status: asset._asset_scan_status, }); } }