Skip to content

Commit

Permalink
Sync wals to listen/backward #1
Browse files Browse the repository at this point in the history
  • Loading branch information
beastoin committed Oct 6, 2024
1 parent 9bd32b8 commit af41ae8
Show file tree
Hide file tree
Showing 6 changed files with 327 additions and 11 deletions.
5 changes: 5 additions & 0 deletions app/lib/backend/schema/message_event.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ enum MessageEventType {
memoryProcessingStarted('memory_processing_started'),
processingMemoryStatusChanged('processing_memory_status_changed'),
ping('ping'),
memoyBackwardSynced('memory_backward_synced'),
unknown('unknown'),
;

Expand All @@ -28,6 +29,7 @@ class ServerMessageEvent {
ServerMemory? memory;
List<ServerMessage>? messages;
// ServerProcessingMemoryStatus? processingMemoryStatus;
String? name;

ServerMessageEvent(
this.type,
Expand All @@ -36,6 +38,7 @@ class ServerMessageEvent {
this.memory,
this.messages,
// this.processingMemoryStatus,
this.name,
);

static ServerMessageEvent fromJson(Map<String, dynamic> json) {
Expand All @@ -48,6 +51,8 @@ class ServerMessageEvent {
// json['processing_memory_status'] != null
// ? ServerProcessingMemoryStatus.valuesFromString(json['processing_memory_status'])
// : null,

json['name'],
);
}
}
2 changes: 1 addition & 1 deletion app/lib/providers/memory_provider.dart
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ class MemoryProvider extends ChangeNotifier implements IWalServiceListener {
}

@override
void onWalSynced(Wal wal, ServerMemory memory) async {
void onWalSynced(Wal wal, {ServerMemory? memory}) async {
_missingWals = await _wal.getMissingWals();
notifyListeners();
}
Expand Down
150 changes: 150 additions & 0 deletions app/lib/services/sockets/wal_connection.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import 'dart:async';
import 'dart:convert';

import 'package:flutter/material.dart';
import 'package:friend_private/backend/preferences.dart';
import 'package:friend_private/backend/schema/message_event.dart';
import 'package:friend_private/env/env.dart';
import 'package:friend_private/services/sockets/pure_socket.dart';
import 'package:friend_private/services/sockets/transcription_connection.dart';

abstract interface class IWalSocketServiceListener {
void onMessageEventReceived(ServerMessageEvent event);

void onError(Object err);

void onConnected();

void onClosed();
}

class WalSocketService implements IPureSocketListener {
late PureSocket _socket;
final Map<Object, IWalSocketServiceListener> _listeners = {};

SocketServiceState get state =>
_socket.status == PureSocketStatus.connected ? SocketServiceState.connected : SocketServiceState.disconnected;

List<String> fileNames;

WalSocketService.create(
this.fileNames,
) {
final recordingsLanguage = SharedPreferencesUtil().recordingsLanguage;
var params = '?language=$recordingsLanguage&uid=${SharedPreferencesUtil().uid}&file_names=${fileNames.join(",")}';
String url = '${Env.apiBaseUrl!.replaceAll('https', 'wss')}v2/listen/backward$params';

_socket = PureSocket(url);
_socket.setListener(this);
}

void subscribe(Object context, IWalSocketServiceListener listener) {
_listeners.remove(context.hashCode);
_listeners.putIfAbsent(context.hashCode, () => listener);
}

void unsubscribe(Object context) {
_listeners.remove(context.hashCode);
}

Future start() async {
bool ok = await _socket.connect();
if (!ok) {
debugPrint("Can not connect to websocket");
}
}

Future stop({String? reason}) async {
await _socket.stop();
_listeners.clear();

if (reason != null) {
debugPrint(reason);
}
}

Future send(dynamic message) async {
_socket.send(message);
return;
}

@override
void onClosed() {
_listeners.forEach((k, v) {
v.onClosed();
});
}

@override
void onError(Object err, StackTrace trace) {
_listeners.forEach((k, v) {
v.onError(err);
});
}

@override
void onMessage(event) {
if (event == 'ping') return;

// Decode json
dynamic jsonEvent;
try {
jsonEvent = jsonDecode(event);
} on FormatException catch (e) {
debugPrint(e.toString());
}
if (jsonEvent == null) {
debugPrint("Can not decode message event json $event");
return;
}

// Message event
if (jsonEvent.containsKey("type")) {
var event = ServerMessageEvent.fromJson(jsonEvent);
_listeners.forEach((k, v) {
v.onMessageEventReceived(event);
});
return;
}

debugPrint(event.toString());
}

@override
void onInternetConnectionFailed() {
debugPrint("onInternetConnectionFailed");

/*
// Send notification
NotificationService.instance.clearNotification(3);
NotificationService.instance.createNotification(
notificationId: 3,
title: 'Internet Connection Lost',
body: 'Your device is offline. Transcription is paused until connection is restored.',
);
*/
}

@override
void onMaxRetriesReach() {
debugPrint("onMaxRetriesReach");

/*
// Send notification
NotificationService.instance.clearNotification(2);
NotificationService.instance.createNotification(
notificationId: 2,
title: 'Connection Issue 🚨',
body: 'Unable to connect to the transcript service.'
' Please restart the app or contact support if the problem persists.',
);
*/
}

@override
void onConnected() {
_listeners.forEach((k, v) {
v.onConnected();
});
}
}
92 changes: 83 additions & 9 deletions app/lib/services/wals.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ import 'dart:io';
import 'package:flutter/foundation.dart';
import 'package:friend_private/backend/preferences.dart';
import 'package:friend_private/backend/schema/memory.dart';
import 'package:friend_private/backend/schema/message_event.dart';
import 'package:friend_private/providers/message_provider.dart';
import 'package:friend_private/services/sockets/transcription_connection.dart';
import 'package:friend_private/services/sockets/wal_connection.dart';
import 'package:path_provider/path_provider.dart';

const ChunkSizeInSeconds = 7; // 30
Expand Down Expand Up @@ -108,10 +111,12 @@ abstract class IWalSyncProgressListener {
abstract class IWalServiceListener {
void onStatusChanged(WalServiceStatus status);
void onNewMissingWal(Wal wal);
void onWalSynced(Wal wal, ServerMemory memory);
void onWalSynced(Wal wal, {ServerMemory? memory});
}

class WalService implements IWalService {
class WalService implements IWalService, IWalSocketServiceListener {
WalSocketService? _socket;

List<Wal> _wals = const [];

List<List<int>> _frames = [];
Expand Down Expand Up @@ -256,6 +261,8 @@ class WalService implements IWalService {

@override
Future stop() async {
_socket?.stop();

debugPrint("wal service stop");
_chunkingTimer?.cancel();
_flushingTimer?.cancel();
Expand All @@ -278,8 +285,7 @@ class WalService implements IWalService {
}
}

@override
Future<bool> deleteWal(Wal wal) async {
Future<bool> _deleteWal(Wal wal) async {
// Delete file
if (wal.filePath != null) {
try {
Expand All @@ -295,6 +301,11 @@ class WalService implements IWalService {
return true;
}

@override
Future<bool> deleteWal(Wal wal) async {
return _deleteWal(wal);
}

@override
Future<List<Wal>> getMissingWals() async {
return _wals.where((w) => w.status == WalStatus.miss).toList();
Expand All @@ -321,11 +332,26 @@ class WalService implements IWalService {
@override
Future syncAll({IWalSyncProgressListener? progress}) async {
_wals.removeWhere((wal) => wal.status == WalStatus.synced);

await _flush();
var wals = _wals.where((w) => w.status == WalStatus.miss && w.storage == WalStorage.disk).toList();
if (wals.isEmpty) {
debugPrint("All synced!");
return;
}

// Establish connection
_socket?.stop();
_socket = WalSocketService.create(wals.map<String>((wal) => wal.getFileName()).toList());
await _socket?.start();
if (_socket?.state != SocketServiceState.connected) {
_socket?.stop();
debugPrint("Cant not connect to socket!");
return;
}
_socket?.subscribe(this, this);

var wals = _wals.where((w) => w.status == WalStatus.miss && w.storage == WalStorage.disk);
for (var wal in wals) {
for (var i = 0; i < wals.length; i++) {
var wal = wals[i];
debugPrint("sync id ${wal.id}");
if (wal.filePath == null) {
debugPrint("sync error: file path is not found. wal id ${wal.id}");
Expand All @@ -336,13 +362,61 @@ class WalService implements IWalService {
File file = File(wal.filePath!);
var bytes = await file.readAsBytes();

// TODO: sync to socket
final byteFrame = ByteData(12 + bytes.length);
byteFrame.setUint32(0, 1, Endian.big); // 0001, start new file
byteFrame.setUint32(4, i, Endian.big); // index
byteFrame.setUint32(8, bytes.length, Endian.big); // length
for (int i = 0; i < bytes.length; i++) {
byteFrame.setUint8(i + 12, bytes[i]);
}
if (_socket?.state != SocketServiceState.connected) {
debugPrint("sync error: socket is closed. wal id ${wal.id}");
break;
}
_socket?.send(byteFrame.buffer.asUint8List());

debugPrint("sync wal ${wal.id} file ${wal.filePath} length ${bytes.length}");
debugPrint("[${bytes.join(", ")}]");
debugPrint("[${bytes.sublist(0, 100).join(", ")}]");
} catch (e) {
debugPrint(e.toString());
continue;
}
}

// End
final byteFrame = ByteData(4);
byteFrame.setUint32(0, 0, Endian.big); // 0000, end
_socket?.send(byteFrame.buffer.asUint8List());
}

@override
void onClosed() {}

@override
void onConnected() {}

@override
void onError(Object err) {}

@override
void onMessageEventReceived(ServerMessageEvent event) async {
if (event.type == MessageEventType.memoyBackwardSynced) {
debugPrint("onMessageEventReceived > memoyBackwardSynced");
int? timestamp = int.tryParse(event.name?.split("_")[1] ?? "");
final idx = _wals.indexWhere((w) => w.timestamp == timestamp);
if (idx <= 0) {
return;
}
var wal = _wals[idx];

// send
for (var sub in _subscriptions.values) {
sub.onWalSynced(wal);
}

// update
await _deleteWal(wal);
SharedPreferencesUtil().wals = _wals;
}
}
}
8 changes: 8 additions & 0 deletions backend/models/message_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,11 @@ def to_json(self):
j = self.model_dump(mode="json")
j["type"] = self.event_type
return j

class MemoryBackwardSycnedEvent(MessageEvent):
name: Optional[str] = None

def to_json(self):
j = self.model_dump(mode="json")
j["type"] = self.event_type
return j
Loading

0 comments on commit af41ae8

Please sign in to comment.