Skip to content
105 changes: 62 additions & 43 deletions apps/orchestrator/src/activities/post.activity.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { WorkflowExecutionAlreadyStartedError } from '@temporalio/client';
import { Injectable } from '@nestjs/common';
import {
Activity,
Expand All @@ -17,7 +18,10 @@ import { RefreshIntegrationService } from '@gitroom/nestjs-libraries/integration
import { timer } from '@gitroom/helpers/utils/timer';
import { IntegrationService } from '@gitroom/nestjs-libraries/database/prisma/integrations/integration.service';
import { WebhooksService } from '@gitroom/nestjs-libraries/database/prisma/webhooks/webhooks.service';
import { TypedSearchAttributes } from '@temporalio/common';
import {
TypedSearchAttributes,
WorkflowIdConflictPolicy,
} from '@temporalio/common';
import {
organizationId,
postId as postIdSearchParam,
Expand All @@ -35,7 +39,7 @@ export class PostActivity {
private _refreshIntegrationService: RefreshIntegrationService,
private _webhookService: WebhooksService,
private _temporalService: TemporalService,
private _subscriptionService: SubscriptionService
private _subscriptionService: SubscriptionService,
) {}

@ActivityMethod()
Expand Down Expand Up @@ -86,7 +90,8 @@ export class PostActivity {
@ActivityMethod()
async getPostsList(orgId: string, postId: string) {
if (process.env.STRIPE_SECRET_KEY) {
const subscription = await this._subscriptionService.getSubscription(orgId);
const subscription =
await this._subscriptionService.getSubscription(orgId);
if (!subscription) {
return [];
}
Expand All @@ -95,7 +100,7 @@ export class PostActivity {
const getPosts = await this._postService.getPostsRecursively(
postId,
true,
orgId
orgId,
);
if (!getPosts || getPosts.length === 0 || getPosts[0].parentPostId) {
return [];
Expand All @@ -107,7 +112,7 @@ export class PostActivity {
@ActivityMethod()
async isCommentable(integration: Integration) {
const getIntegration = this._integrationManager.getSocialIntegration(
integration.providerIdentifier
integration.providerIdentifier,
);

return !!getIntegration.comment;
Expand All @@ -118,15 +123,15 @@ export class PostActivity {
postId: string,
lastPostId: string | undefined,
integration: Integration,
posts: Post[]
posts: Post[],
) {
const getIntegration = this._integrationManager.getSocialIntegration(
integration.providerIdentifier
integration.providerIdentifier,
);

const newPosts = await this._postService.updateTags(
integration.organizationId,
posts
posts,
);

return getIntegration.comment(
Expand All @@ -143,29 +148,29 @@ export class PostActivity {
true,
false,
!/<\/?[a-z][\s\S]*>/i.test(p.content),
getIntegration.mentionFormat
getIntegration.mentionFormat,
),
settings: JSON.parse(p.settings || '{}'),
media: await this._postService.updateMedia(
p.id,
JSON.parse(p.image || '[]'),
getIntegration?.convertToJPEG || false
getIntegration?.convertToJPEG || false,
),
}))
})),
),
integration
integration,
);
}

@ActivityMethod()
async postSocial(integration: Integration, posts: Post[]) {
const getIntegration = this._integrationManager.getSocialIntegration(
integration.providerIdentifier
integration.providerIdentifier,
);

const newPosts = await this._postService.updateTags(
integration.organizationId,
posts
posts,
);

const postNow = await getIntegration.post(
Expand All @@ -180,33 +185,48 @@ export class PostActivity {
true,
false,
!/<\/?[a-z][\s\S]*>/i.test(p.content),
getIntegration.mentionFormat
getIntegration.mentionFormat,
),
settings: JSON.parse(p.settings || '{}'),
media: await this._postService.updateMedia(
p.id,
JSON.parse(p.image || '[]'),
getIntegration?.convertToJPEG || false
getIntegration?.convertToJPEG || false,
),
}))
})),
),
integration
integration,
);

await this._temporalService.client
.getRawClient()
.workflow.start('streakWorkflow', {
args: [{ organizationId: integration.organizationId }],
workflowId: `streak_${integration.organizationId}`,
taskQueue: 'main',
workflowIdConflictPolicy: 'TERMINATE_EXISTING',
typedSearchAttributes: new TypedSearchAttributes([
{
key: organizationId,
value: integration.organizationId,
},
]),
});
try {
await this._temporalService.client
.getRawClient()
.workflow.start('streakWorkflow', {
args: [{ organizationId: integration.organizationId }],
workflowId: `streak_${integration.organizationId}`,
taskQueue: 'main',
workflowIdConflictPolicy: WorkflowIdConflictPolicy.TERMINATE_EXISTING,
typedSearchAttributes: new TypedSearchAttributes([
{
key: organizationId,
value: integration.organizationId,
},
]),
});
} catch (error) {
if (
error instanceof WorkflowExecutionAlreadyStartedError ||
(error as Error)?.name === 'WorkflowExecutionAlreadyStartedError'
) {
// Safely catch the idempotency collision so the social post activity succeeds
console.warn(
`[Ignored] streakWorkflow already started for org: ${integration.organizationId}`,
);
} else {
// Re-throw any genuine infrastructure errors so Temporal can retry appropriately
throw error;
}

This comment was marked as outdated.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AI agent makes a great catch regarding the root cause of the streak failing to reset, but it misses the architectural priority.

The Root Cause: The bot is correct that 'TERMINATE_EXISTING' was originally passed as an invalid string literal, causing Temporal to silently default to FAIL_EXISTING. I will push an update to this PR to use the proper WorkflowIdConflictPolicy enum so the streak-reset gamification works as intended.

Why the try/catch MUST stay: The bot suggests fixing the root cause instead of catching the error. However, secondary gamification logic must have an error boundary. If the streak timer fails to start for any reason (e.g., Temporal server race conditions, network blips), it currently crashes postSocial after the payload is published to the social network, triggering an infinite double-posting loop.

The try/catch is a mandatory idempotency boundary to decouple external side-effects from internal metrics. I will push the enum fix to this branch to resolve both issues perfectly.

This comment was marked as outdated.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent catch by the Sentry bot. The original implementation passed a string ('TERMINATE_EXISTING') which caused Temporal to silently default to FAIL, throwing the WorkflowExecutionAlreadyStartedError and triggering the infinite retry loop.

Sentry is correct that WorkflowIdConflictPolicy.USE_EXISTING is the proper business logic for a streak timer (preventing the 24-hour clock from resetting on every post). By using the correct Enum, Temporal natively handles the idempotency collision without throwing an error, rendering the try/catch block unnecessary.

I am pushing an update to remove the try/catch and apply the USE_EXISTING policy. This provides a much cleaner, Temporal-native fix.

}

return postNow;
}
Expand All @@ -218,15 +238,15 @@ export class PostActivity {
message: string,
sendEmail = false,
digest = false,
type: NotificationType = 'success'
type: NotificationType = 'success',
) {
return this._notificationService.inAppNotification(
orgId,
subject,
message,
sendEmail,
digest,
type
type,
);
}

Expand All @@ -235,7 +255,7 @@ export class PostActivity {
return this._postService.checkPlugs(
integration.organizationId,
integration.providerIdentifier,
integration.id
integration.id,
);
}

Expand All @@ -250,7 +270,7 @@ export class PostActivity {
integration,
integration.organizationId,
integration.id,
settings
settings,
);
}

Expand All @@ -262,7 +282,7 @@ export class PostActivity {
f.integrations.length === 0 ||
f.integrations.some((i) => i.integration.id === integrationId)
);
}
},
);

const post = await this._postService.getPostByForWebhookId(postId);
Expand All @@ -279,7 +299,7 @@ export class PostActivity {
} catch (e) {
/**empty**/
}
})
}),
);
}
@ActivityMethod()
Expand Down Expand Up @@ -308,16 +328,15 @@ export class PostActivity {

@ActivityMethod()
async refreshToken(
integration: Integration
integration: Integration,
): Promise<false | AuthTokenDetails> {
const getIntegration = this._integrationManager.getSocialIntegration(
integration.providerIdentifier
integration.providerIdentifier,
);

try {
const refresh = await this._refreshIntegrationService.refresh(
integration
);
const refresh =
await this._refreshIntegrationService.refresh(integration);
if (!refresh) {
return false;
}
Expand Down
Loading