Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lift and Shift google sync #54

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/future-google/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ export class NamedError extends Error {
}

export class GmailMessageNotFound extends NamedError {}
export class EmptyGmailHistory extends NamedError {}
231 changes: 230 additions & 1 deletion packages/future-google/src/events/sync.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { DataLayer, EventHandler } from 'core';

import { createGoogleMailFields } from '../helpers';
import { GoogleClient } from '../client';
import { Labels } from '../constants';
import { arrangeThreadMessagesByFirstMessageData, createGoogleMailFields } from '../helpers';
import { Connection, CreateEmailsParams, Email, MakeClient } from '../types';

export const emailSync = ({
name,
Expand Down Expand Up @@ -65,3 +68,229 @@ export const emailSync = ({
});
},
});

export const calendarSync = ({ name, event, dataLayer }: { event: string; dataLayer: DataLayer; name: string }) => {};

export const gmailSyncSyncTable = ({
name,
event,
createEmails,
makeClient,
dataLayer,
}: {
event: string;
name: string;
dataLayer: DataLayer;
createEmails: (props: CreateEmailsParams) => Promise<void>;
makeClient: MakeClient;
}): EventHandler => ({
id: `${name}-gmail-sync-worksheet`,
event,
executor: async ({ event, step }: any) => {
const { syncTableId, options } = event.data;
const { connectionId } = event.user;
const client = await makeClient({ connectionId });

const duration = options?.duration;
const dataIntegration = await dataLayer.getDataIntegrationByConnectionId(connectionId);

// load context for this worksheet
const { connectedEmail, startSyncFrom } = await step.run('load-gmail-sync-context', async () => {
if (!dataIntegration) throw new Error('Data Integration not found');

let startSyncFrom = dataIntegration?.lastSyncAt ? new Date(dataIntegration.lastSyncAt) : undefined;

if (!startSyncFrom) {
startSyncFrom = new Date();
startSyncFrom.setFullYear(startSyncFrom.getFullYear() - 1); // Set to a year ago by default
}

const { email } = await client.getTokenInfo();

return {
connectedEmail: email,
startSyncFrom,
};
});

await step.run('run-gmail-data-sync', async () => {
const inboxLabels: Exclude<keyof typeof Labels, 'SENT'>[] = ['CATEGORY_PERSONAL'];
const BATCH_SIZE = 50;
let inboxThreadsNextPageToken = 'skip';
let sentThreadsNextPageToken = 'skip';
let totalEmails = 0;

let seenThreadsCache = new Set<string>();
const recordSearchCache = new Set<string>();

let contacts: Record<string, Connection> = {};
try {
contacts = await client.findGoogleContactsHavingEmailAddress();
} catch (error) {
/* fail silently */
}

console.info('Processing inbox threads');
while (inboxThreadsNextPageToken != '') {
// TODO: Run these batched requests as steps.

const { threads, pageToken } = await client.getThreads({
pageToken: inboxThreadsNextPageToken,
labels: inboxLabels,
from: new Date(startSyncFrom),
to: duration?.end,
limit: BATCH_SIZE,
});

inboxThreadsNextPageToken = pageToken;

if (!threads) break;

/**
* Filter out threads that have been seen and send batched request to Google API
*/
const threadMessages = await Promise.all(
threads
.filter(({ id }) => !seenThreadsCache.has(id))
.map(({ id }) => {
seenThreadsCache.add(id);
return client.getThreadById({ threadId: id });
}),
);

// sort thread messages in order of creation to avoid collision when referencing replied threads
const rearrangedThreadMessages = arrangeThreadMessagesByFirstMessageData(threadMessages);

// aggregate all messages in rearranged thread messages
const messages = rearrangedThreadMessages.reduce((acc, { messages }) => {
acc.push(...(messages ?? []));
return acc;
}, [] as Email[]);

totalEmails += messages.length;

await createEmails({
// messageId: '',
emails: messages,
options: {
connectedEmail,
syncTableId,
recordSearchCache,
},
contacts,
connectionId,
});
}

console.info('Done processing inbox threads, Total emails:', totalEmails);

console.info('Processing sent threads');
while (sentThreadsNextPageToken != '') {
// TODO: Run these batched requests as steps.
const { threads, pageToken } = await client.getThreads({
pageToken: sentThreadsNextPageToken,
labels: ['SENT'],
from: new Date(startSyncFrom),
to: duration?.end,
limit: BATCH_SIZE,
});

sentThreadsNextPageToken = pageToken;

if (!threads) break;
/**
* Filter out threads that have been seen and send batched request to Google API
*/
const threadMessages = await Promise.all(
threads
.filter(({ id }) => !seenThreadsCache.has(id))
.map(({ id }) => {
seenThreadsCache.add(id);
return client.getThreadById({ threadId: id });
}),
);

// sort thread messages in order of creation to avoid collision when referencing replied threads
const rearrangedThreadMessages = arrangeThreadMessagesByFirstMessageData(threadMessages);

// aggregate all messages in rearranged thread messages
const messages = rearrangedThreadMessages.reduce((acc, { messages }) => {
acc.push(...(messages ?? []));
return acc;
}, [] as Email[]);

totalEmails += messages.length;

await createEmails({
emails: messages,
options: {
connectedEmail,
syncTableId,
recordSearchCache,
},
contacts,
connectionId,
});
}

console.log('Done processing sent threads. Total emails:', totalEmails);

console.log('Gmail sync done');
});

// TODO:
// await step.run('update-integration-lasted-synced-timestamp', async () => {
// await dataIntegrationService.updateDataIntegration({ id: dataIntegration.id, lastSyncAt: new Date() });
// });
},
});

export const gcalSyncSyncTable = ({
name,
event,
createCalendarEvents,
makeClient,
}: {
event: string;
name: string;
createCalendarEvents: (props: {
client?: GoogleClient;
person?: { email: string; recordId: string };
duration?: { minDate: Date; maxDate: Date };
options?: {
peopleRecordTypeId: string;
syncTableId: string;
};
connectedEmail?: string;
}) => Promise<void>;
makeClient: MakeClient;
}) => ({
id: `${name}-gcal-sync-worksheet`,
event,
executor: async ({ event, step }: any) => {
const { syncTableId } = event.data;
const { connectionId } = event.user;
const client = await makeClient({ connectionId });

const { peopleRecordType, connectedEmail } = await step.run('load-gcal-sync-context', async () => {
const { email } = await client.getTokenInfo();
return {
peopleRecordType,
connectedEmail: email,
};
});

await step.run('run-gcal-data-sync', async () => {
await createCalendarEvents({
client,
connectedEmail,
options: {
peopleRecordTypeId: peopleRecordType?.id,
syncTableId,
},
});
});

return { event, body: `sync completed` };
},
});
25 changes: 8 additions & 17 deletions packages/future-google/src/events/update.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,28 @@
// import { gmail_v1 } from '@googleapis/gmail';
// import { Worksheet } from '@prisma/client';
// import retry from 'async-retry-ng';
// import { DataLayer, EventHandler } from 'core';

// import { IntegrationAPI } from '@/lib/integrations-framework/api';

// import { BaseContext } from 'inngest/types';

// import { MakeAPI } from '../../types';
// import { EmptyGmailHistory, GmailMessageNotFound } from '../errors';
// import { Connection, MakeClient } from '../types';

// export const gmailSyncUpdate = ({
// name,
// event,
// makeClient,
// makeAPI,
// createWorksheet,
// datalayer,
// }: {
// name: string;
// event: string;
// makeAPI: MakeAPI;
// createWorksheet: ({ api, shouldSync }: { api: IntegrationAPI; shouldSync: boolean }) => Promise<Worksheet>;
// makeClient: MakeClient;
// }) => ({
// datalayer: DataLayer;
// }): EventHandler => ({
// id: `${name}-sync-gmail-update`,
// event,
// executor: async ({ event, step }: BaseContext<any>) => {
// executor: async ({ event, step }) => {
// const { emailAddress, historyId } = event.data as { emailAddress: string; historyId: string };
// const { workspaceId, userId } = event.user;
// const api = makeAPI({ context: { workspaceId, userId } });
// const client = await makeClient({ api });
// const { connectionId } = event.user;

// const client = await makeClient({ connectionId });

// try {
// const connectedEmail = (await client.getTokenInfo())?.email;
Expand Down Expand Up @@ -88,13 +81,11 @@

// export const gCalSyncUpdate = ({
// name,
// makeAPI,
// event,
// createWorksheet,
// }: {
// name: string;
// event: string;
// makeAPI: MakeAPI;
// createWorksheet: ({ api, shouldSync }: { api: IntegrationAPI; shouldSync: boolean }) => Promise<Worksheet>;
// }) => ({
// id: `${name}-sync-gcal-update`,
Expand Down
12 changes: 12 additions & 0 deletions packages/future-google/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,3 +360,15 @@ export const createGoogleMailFields = () => [
order: 13,
},
];

export const arrangeThreadMessagesByFirstMessageData = (messagesByThread: MessagesByThread[]) => {
const filteredResults: MessagesByThread[] = messagesByThread.reduce(
(prev: MessagesByThread[], curr): MessagesByThread[] => {
if (curr) prev.push(curr);
return prev;
},
[],
);
filteredResults.sort((a, b) => (a?.firstMessageDate as Date).getTime() - (b?.firstMessageDate as Date).getTime());
return filteredResults;
};
41 changes: 35 additions & 6 deletions packages/future-google/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { DataIntegration, IntegrationAuth, IntegrationPlugin } from 'core';
import { DataIntegration, IntegrationAuth, IntegrationPlugin, MakeWebhookURL } from 'core';
import { z } from 'zod';

import { SEND_BULK_EMAIL, SEND_EMAIL } from './actions/send-email';
import { GoogleClient } from './client';
import { emailSync } from './events/sync';
import { gcalSubscribe, gmailSubscribe } from './events/subscribe';
import { emailSync, gmailSyncSyncTable } from './events/sync';
import {
createGooglePersonWorksheetFields,
getValidRecipientAddresses,
Expand Down Expand Up @@ -57,7 +58,6 @@ export class GoogleIntegration extends IntegrationPlugin {
emails: Email[];
connectionId: string;
options?: {
peopleRecordTypeId: string;
connectedEmail: string;
syncTableId: string;
recordSearchCache: Set<string>;
Expand Down Expand Up @@ -237,19 +237,48 @@ export class GoogleIntegration extends IntegrationPlugin {
syncTableId: z.string(),
}),
},
CALENDAR_SYNC: {
key: 'google.calendar/sync.table',
schema: z.object({
syncTableId: z.string(),
}),
},
};
return this.events;
}

defineEventHandlers(): void {}

getEventHandlers({}: {}) {
getEventHandlers({ makeWebhookUrl }: { makeWebhookUrl: MakeWebhookURL }) {
return [
emailSync({
name: this.name,
event: this.getEventKey('EMAIL_SYNC'),
dataLayer: this.dataLayer!,
}),
gmailSubscribe({
dataLayer: this.dataLayer!,
makeClient: this.makeClient,
event: this.getEventKey('GMAIL_SUBSCRIBE'),
name: this.name,
sendEvent: this.sendEvent,
testIntegration: this.test,
topic: this.config.TOPIC,
}),
gcalSubscribe({
dataLayer: this.dataLayer!,
makeClient: this.makeClient,
event: this.getEventKey('GCAL_SUBSCRIBE'),
makeWebhookURL: makeWebhookUrl,
name: this.name,
sendEvent: this.sendEvent,
testIntegration: this.test,
}),
gmailSyncSyncTable({
createEmails: this.createEmails,
dataLayer: this.dataLayer!,
event: this.getEventKey('GMAIL_SYNC'),
makeClient: this.makeClient,
name: this.name,
}),
];
}
async createSyncTable({
Expand Down
Loading