Add dta_service/core/consumers.py
This commit is contained in:
96
dta_service/core/consumers.py
Normal file
96
dta_service/core/consumers.py
Normal file
@@ -0,0 +1,96 @@
|
||||
import json
|
||||
from channels.generic.websocket import AsyncWebsocketConsumer
|
||||
from channels.db import database_sync_to_async
|
||||
from django.contrib.auth import get_user_model
|
||||
from rest_framework_simplejwt.tokens import AccessToken
|
||||
from .models import Conversation, Message
|
||||
from .serializers import MessageSerializer
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
class ChatConsumer(AsyncWebsocketConsumer):
|
||||
async def connect(self):
|
||||
self.conversation_id = self.scope['url_route']['kwargs']['conversation_id']
|
||||
self.conversation_group_name = f'chat_{self.conversation_id}'
|
||||
|
||||
# Authenticate user via JWT
|
||||
token = self.scope.get('query_string').decode('utf-8').split('=')[1]
|
||||
|
||||
try:
|
||||
access_token = AccessToken(token)
|
||||
user_id = access_token['user_id']
|
||||
self.user = await self.get_user(user_id)
|
||||
|
||||
# Check if user is part of the conversation
|
||||
conversation = await self.get_conversation(self.conversation_id)
|
||||
if not await self.is_participant(conversation, self.user):
|
||||
await self.close()
|
||||
return
|
||||
|
||||
await self.channel_layer.group_add(
|
||||
self.conversation_group_name,
|
||||
self.channel_name
|
||||
)
|
||||
|
||||
await self.accept()
|
||||
except Exception as e:
|
||||
print(f"Error: {e}")
|
||||
await self.close()
|
||||
|
||||
async def disconnect(self, close_code):
|
||||
await self.channel_layer.group_discard(
|
||||
self.conversation_group_name,
|
||||
self.channel_name
|
||||
)
|
||||
|
||||
async def receive(self, text_data):
|
||||
text_data_json = json.loads(text_data)
|
||||
message = text_data_json['message']
|
||||
|
||||
# Save message to database
|
||||
conversation = await self.get_conversation(self.conversation_id)
|
||||
message_obj = await self.create_message(conversation, self.user, message)
|
||||
|
||||
# Serialize message
|
||||
serializer = MessageSerializer(message_obj)
|
||||
|
||||
# Send message to room group
|
||||
await self.channel_layer.group_send(
|
||||
self.conversation_group_name,
|
||||
{
|
||||
'type': 'chat_message',
|
||||
'message': serializer.data
|
||||
}
|
||||
)
|
||||
|
||||
async def chat_message(self, event):
|
||||
message = event['message']
|
||||
|
||||
# Send message to WebSocket
|
||||
await self.send(text_data=json.dumps({
|
||||
'message': message
|
||||
}))
|
||||
|
||||
@database_sync_to_async
|
||||
def get_user(self, user_id):
|
||||
return User.objects.get(id=user_id)
|
||||
|
||||
@database_sync_to_async
|
||||
def get_conversation(self, conversation_id):
|
||||
return Conversation.objects.get(id=conversation_id)
|
||||
|
||||
@database_sync_to_async
|
||||
def is_participant(self, conversation, user):
|
||||
if user.user_type == 'property_owner':
|
||||
return conversation.property_owner.user == user
|
||||
elif user.user_type == 'vendor':
|
||||
return conversation.vendor.user == user
|
||||
return False
|
||||
|
||||
@database_sync_to_async
|
||||
def create_message(self, conversation, user, text):
|
||||
return Message.objects.create(
|
||||
conversation=conversation,
|
||||
sender=user,
|
||||
text=text
|
||||
)
|
||||
Reference in New Issue
Block a user