400 lines
No EOL
20 KiB
Python
Executable file
400 lines
No EOL
20 KiB
Python
Executable file
import urllib
|
|
import urllib2
|
|
import json
|
|
|
|
from io import StringIO
|
|
from django.contrib import messages
|
|
from django.http import HttpResponseRedirect, HttpResponse, Http404
|
|
from django.http.response import HttpResponseNotAllowed
|
|
from django.core.urlresolvers import reverse
|
|
from django.shortcuts import get_object_or_404, render, render_to_response
|
|
from django.utils import timezone
|
|
from django.views import generic
|
|
from django.conf import settings
|
|
from django.core import serializers
|
|
|
|
from .forms import EventForm, PollForm, OptionFormset, QuestionFormset, OrganiserFormSet, TrusteeFormSet, VoteForm, EventSetupForm, EventEditForm, DecryptionFormset, DecryptionFormSetHelper
|
|
from .models import Event, Poll, PollOption, EmailUser, Ballot, TrusteeKey, Decryption
|
|
from allauthdemo.auth.models import DemoUser
|
|
|
|
from .tasks import email_trustees_prep, update_EID, generate_combpk, generate_enc, tally_results
|
|
|
|
from .utils.EventModelAdaptor import EventModelAdaptor
|
|
|
|
class EventListView(generic.ListView):
|
|
|
|
model = Event
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super(EventListView, self).get_context_data(**kwargs)
|
|
#context['now'] = timezone.now()
|
|
return context
|
|
|
|
class EventDetailView(generic.DetailView):
|
|
template_name="polls/event_detail_details.html"
|
|
model = Event
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super(EventDetailView, self).get_context_data(**kwargs)
|
|
context['is_organiser'] = ((not self.request.user.is_anonymous()) and (self.object.users_organisers.filter(email=self.request.user.email).exists()))
|
|
|
|
#context['now'] = timezone.now()
|
|
return context
|
|
|
|
|
|
class EventDetailPollsView(EventDetailView):
|
|
template_name="polls/event_detail_polls.html"
|
|
|
|
class EventDetailOrganisersView(EventDetailView):
|
|
template_name="polls/event_detail_organisers.html"
|
|
|
|
class EventDetailLaunchView(EventDetailView):
|
|
template_name="polls/event_detail_launch.html"
|
|
|
|
class PollDetailView(generic.View):
|
|
|
|
model = Poll
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super(PollDetailView, self).get_context_data(**kwargs)
|
|
#context['now'] = timezone.now()
|
|
context['form'] = VoteForm(instance=self.object)
|
|
context['poll_count'] = self.object.event.polls.all().count()
|
|
return context
|
|
|
|
#my_value = self.kwargs.get('key', 'default_value')
|
|
|
|
def test_poll_detail(request, event_id, poll_num, key=None):
|
|
context = {}
|
|
context['form'] = VoteForm(instance=self.object)
|
|
context['poll_count'] = self.object.event.polls.all().count()
|
|
return render(request, "polls/event_setup.html", context)
|
|
|
|
def util_get_poll_by_event_index(event, poll_num):
|
|
try:
|
|
poll_num = int(poll_num)
|
|
if ((poll_num < 1) or (poll_num > event.polls.all().count())):
|
|
return None
|
|
poll = event.polls.filter().order_by('id')[poll_num-1] # index field eventually
|
|
except ValueError:
|
|
return None
|
|
return poll
|
|
|
|
def edit_poll(request, event_id, poll_num):
|
|
event = get_object_or_404(Event, pk=event_id)
|
|
event_poll_count = event.polls.all().count()
|
|
poll = util_get_poll_by_event_index(event, poll_num)
|
|
|
|
if (poll == None):
|
|
raise Http404("Poll does not exist")
|
|
|
|
form = PollForm(instance=poll, prefix="main")
|
|
formset = OptionFormset(instance=poll, prefix="formset_options")
|
|
return render(request, "polls/generic_form.html", {'form_title': "Edit Poll: " + poll.question_text, 'form': form, 'option_formset': formset})
|
|
|
|
def view_poll(request, event_id, poll_num):
|
|
#return HttpResponse(param("012345"))
|
|
#return HttpResponse(combpk(param("012345"), "ABzqvL+pqTi+DNLLRcM62RwCoaZTaXVbOs3sk4fc0+Dc 0 AAaQd6S1x+bcgnkDp2ev5mTt34ICQdZIzP9GaqG4x5sy 0" "ABhQay9jI4pZvkAETNwfo8iwJ8eBMkjqplqAiu/FZxMy 0 ABPxj0jVj3rt0VW54iv4tV02gYtujnR41t5gf97asrPs 0 ABfoiW03bsYIUgfAThmjurmOViKy9L89vfkIavhQIblm 1 ABhQay9jI4pZvkAETNwfo8iwJ8eBMkjqplqAiu/FZxMy 0 ABPxj0jVj3rt0VW54iv4tV02gYtujnR41t5gf97asrPs 0 ABfoiW03bsYIUgfAThmjurmOViKy9L89vfkIavhQIblm 1 ABhQay9jI4pZvkAETNwfo8iwJ8eBMkjqplqAiu/FZxMy 0 ABPxj0jVj3rt0VW54iv4tV02gYtujnR41t5gf97asrPs 0 ABfoiW03bsYIUgfAThmjurmOViKy9L89vfkIavhQIblm 1"))
|
|
#return HttpResponse(addec("ACMW70Yj3+mJ/FO+6VOSDGYPYHf7NoTXdpInbfzUqYpH 0 ABV4Mo496B0FW3AW/7gY6Fs+oz6BwfwilonMYeriUyV/ 0 AAg+bdGhs3sxSxAc/wcKdBNUy+el8A2b4yVYShNOb8uX 0 AAspJbn5V2AaY4CgLkzCkHwUWbC5nyxrBzw+o4Az8HVM 1 ABKI7o5Yhgi44XwpFnPpLnH0/czbXA8y5vM4ucV8vojo 1 AAwVrT9+dcQsqRZYoI7+QsJvWOgd7JaJpfI6envmC2jU 1 ABIZO0DK4OrdROD805of6iRk2RenonGYmo2qG2IB1sj/ 1 ACMUHQdjGN0wyCd2AgDHMk9u0TpnywNVtamHWopGho8L 0 ABNT5lbE4siC3QklQXRvTwSQPwtme91+UrIr9iXT3y84 1 ABib0mmQ9ZVCrErqFwDgoRp3jHPpjHGQR2vsMVlwM+vI 0 ABvf3cg1NSS8fn6EKJNnTomeoflcEY1WBxkPPKrBBFl+ 0 ACBUZAtolN4HNh+mw4jLZuHzD+/rYHKR5av16PUc6BJF 0", "2"))
|
|
#return HttpResponse(tally("ACNQLLQlh+lNm1Dc+X+dEI0ECVLTkxRHjRnzX1OA+HtW 0 AAWOsUZK/G/cjhUee/gPAXop3Bc0CTVG3iDdQxD6+XqV 0", "ACNQLLQlh+lNm1Dc+X+dEI0ECVLTkxRHjRnzX1OA+HtW 0 0 2", "2"))
|
|
event = get_object_or_404(Event, pk=event_id)
|
|
if (not event.prepared):
|
|
messages.add_message(request, messages.WARNING, "This Event isn\'t ready for voting yet.")
|
|
return HttpResponseRedirect(reverse("user_home"))
|
|
event_poll_count = event.polls.all().count()
|
|
prev_poll_index, next_poll_index = False, False
|
|
can_vote, has_voted, voter_email, vote_count = False, False, "", 0
|
|
poll = util_get_poll_by_event_index(event, poll_num)
|
|
|
|
if (poll == None):
|
|
raise Http404("Poll does not exist")
|
|
|
|
form = VoteForm(instance=poll)
|
|
poll_num = int(poll_num) # now known to be safe as it suceeded in the util function
|
|
|
|
if (poll_num > 1):
|
|
prev_poll_index = (poll_num - 1)
|
|
if (poll_num < event_poll_count):
|
|
next_poll_index = (poll_num + 1)
|
|
|
|
access_key = request.GET.get('key', None)
|
|
email_key = event.keys.filter(key=access_key)
|
|
vote_count = Ballot.objects.filter(poll=poll, cast=True).count()
|
|
|
|
if (email_key.exists() and event.voters.filter(email=email_key[0].user.email).exists()):
|
|
ballot = Ballot.objects.filter(voter=email_key[0].user, poll=poll)
|
|
if (ballot.exists() and ballot[0].cast):
|
|
has_voted = True
|
|
|
|
if (access_key and email_key.exists()): #or (can_vote(request.user, event))
|
|
voter_email = email_key[0].user.email
|
|
can_vote = True
|
|
|
|
if (request.method == "POST"):
|
|
form = VoteForm(request.POST, instance=poll)
|
|
if (email_key.exists()):
|
|
#return HttpResponse(email_key[0].key)
|
|
ballot = Ballot.objects.get_or_create(voter=email_key[0].user, poll=poll)[0]
|
|
|
|
if (form.is_valid()):
|
|
ballot.cipher_text_c1 = request.POST["cipher_text_c1"]
|
|
ballot.cipher_text_c2 = request.POST["cipher_text_c2"]
|
|
ballot.cast = True
|
|
ballot.save()
|
|
if (next_poll_index):
|
|
return HttpResponseRedirect(reverse('polls:view-poll', kwargs={'event_id': event.id, 'poll_num': next_poll_index }) + "?key=" + email_key[0].key)
|
|
else:
|
|
return HttpResponse("Voted successfully!") # finished all polls in event
|
|
|
|
return render(request, "polls/poll_detail.html",
|
|
{"object": poll, "poll_num": poll_num , "event": event, "form": form, "poll_count": event.polls.all().count(),
|
|
"prev_index": prev_poll_index , "next_index": next_poll_index,
|
|
"can_vote": can_vote, "voter_email": voter_email, "has_voted": has_voted, "vote_count": vote_count
|
|
})
|
|
|
|
def event_trustee_setup(request, event_id):
|
|
# Obtain the event and the event preparation access key that's been supplied
|
|
event = get_object_or_404(Event, pk=event_id)
|
|
access_key = request.GET.get('key', None)
|
|
|
|
# If the a_key is present, check it's valid and related to a trustee EmailUser instance for this event
|
|
if access_key:
|
|
email_key = event.keys.filter(key=access_key)
|
|
if email_key.exists() and event.users_trustees.filter(email=email_key[0].user.email).exists():
|
|
if TrusteeKey.objects.filter(event=event, user=email_key[0].user).exists():
|
|
messages.add_message(request, messages.WARNING, 'You have already submitted your key for this event')
|
|
return HttpResponseRedirect(reverse("user_home"))
|
|
if request.method == "POST":
|
|
form = EventSetupForm(request.POST)
|
|
|
|
# If form data is valid, create a TrusteeKey object with the supplied public key
|
|
if form.is_valid():
|
|
public_key = request.POST["public_key"]
|
|
key = TrusteeKey.objects.get_or_create(event=event, user=email_key[0].user)[0]
|
|
key.key = public_key
|
|
key.save()
|
|
|
|
# When all trustees have supplied their public key, we can combine them to create a master key
|
|
# The event will now be ready to receive votes on the various polls that have been defined -
|
|
# voters therefore need to be informed
|
|
if event.trustee_keys.count() == event.users_trustees.count():
|
|
generate_combpk.delay(event)
|
|
# TODO: Create Celery task that generates voting URLs for voters as well as creates the ballots
|
|
|
|
success_msg = 'You have successfully submitted your public key for this event'
|
|
messages.add_message(request, messages.SUCCESS, success_msg)
|
|
|
|
# This re-direct may not be appropriate for trustees that don't have logins
|
|
return HttpResponseRedirect(reverse("user_home"))
|
|
else:
|
|
form = EventSetupForm()
|
|
return render(request, "polls/event_setup.html", {"event": event, "form": form})
|
|
|
|
#if no key or is invalid?
|
|
messages.add_message(request, messages.WARNING, 'You do not have permission to access: ' + request.path)
|
|
return HttpResponseRedirect(reverse("user_home"))
|
|
|
|
def event_addec(request, event_id):
|
|
event = get_object_or_404(Event, pk=event_id)
|
|
for poll in event.polls.all():
|
|
generate_enc.delay(poll)
|
|
return HttpResponse("Generating enc.")
|
|
|
|
def event_trustee_decrypt(request, event_id):
|
|
event = get_object_or_404(Event, pk=event_id)
|
|
access_key = request.GET.get('key', None)
|
|
if (access_key):
|
|
email_key = event.keys.filter(key=access_key)
|
|
if (email_key.exists() and event.users_trustees.filter(email=email_key[0].user.email).exists()):
|
|
if (Decryption.objects.filter(event=event, user=email_key[0].user).exists()):
|
|
messages.add_message(request, messages.WARNING, 'You have already provided your decryptions for this event')
|
|
#if (event.decryptions.count() == (event.polls.count() * event.users_trustees.count())):
|
|
# tally_results.delay(event) # all keys are in
|
|
return HttpResponseRedirect(reverse("user_home"))
|
|
elif (request.method == "GET"):
|
|
initial = []
|
|
for poll in event.polls.all():
|
|
initial.append({'text': poll.enc })
|
|
formset = DecryptionFormset(initial=initial)
|
|
else:
|
|
formset = DecryptionFormset(request.POST)
|
|
data = []
|
|
for form in formset:
|
|
if form.is_valid():
|
|
data.append(form.cleaned_data.get('text'))
|
|
if (len(data) == event.polls.count()):
|
|
for dec, poll in zip(data, event.polls.all()):
|
|
Decryption.objects.get_or_create(user=email_key[0].user, event=event, poll=poll, text=dec)
|
|
messages.add_message(request, messages.SUCCESS, 'Decryption complete.')
|
|
if (event.decryptions.count() == (event.polls.count() * event.users_trustees.count())):
|
|
tally_results.delay(event) # all keys are in
|
|
else:
|
|
messages.add_message(request, messages.ERROR, 'You didn\'t provide decryptions for every poll. Please try again.')
|
|
return HttpResponseRedirect(reverse("user_home"))
|
|
return render(request, "polls/event_decrypt.html", {"event": event, "formset": formset, "helper": DecryptionFormSetHelper() })
|
|
|
|
messages.add_message(request, messages.WARNING, 'You do not have permission to decrypt this Event.')
|
|
return HttpResponseRedirect(reverse("user_home"))
|
|
|
|
def test_poll_vote(request, poll_id):
|
|
poll = get_object_or_404(Poll, pk=poll_id)
|
|
form = VoteForm(instance=poll)
|
|
return render(request, "polls/vote_poll.html", {"vote_form": form, "poll": poll})
|
|
|
|
def manage_questions(request, event_id):
|
|
|
|
event = get_object_or_404(Event, pk=event_id)
|
|
|
|
if (request.user.is_anonymous()) or (not event.users_organisers.filter(email=request.user.email).exists()):
|
|
messages.add_message(request, messages.WARNING, 'You do not have permission to access: ' + request.path)
|
|
return HttpResponseRedirect(reverse("user_home"))
|
|
|
|
poll = Poll()
|
|
formset = OptionFormset(instance=poll, prefix="formset_organiser")
|
|
|
|
if request.method == "POST":
|
|
form = PollForm(request.POST, prefix="main")
|
|
formset = OptionFormset(request.POST, prefix="formset_organiser") # incase form fails, we still want to retain formset data
|
|
if form.is_valid():
|
|
poll = form.save(commit=False)
|
|
poll.event_id = event_id
|
|
poll.save()
|
|
formset = OptionFormset(request.POST, prefix="formset_organiser", instance=poll)
|
|
if formset.is_valid():
|
|
formset.save()
|
|
#create_ballots.delay(poll)
|
|
messages.add_message(request, messages.SUCCESS, 'Poll created successfully')
|
|
return HttpResponseRedirect(reverse('polls:event-polls', args=[poll.event_id]))
|
|
|
|
return render(request, "polls/create_poll.html", {"event": event, "question_form": form, "option_formset": formset})
|
|
|
|
elif request.method == "GET":
|
|
form = PollForm(prefix="main") #, instance=poll
|
|
return render(request, "polls/create_poll.html", {"event": event, "question_form": form, "option_formset": formset})
|
|
else:
|
|
return HttpResponseNotAllowed()
|
|
|
|
def render_invalid(request, events, demo_users, invalid_fields):
|
|
return render(request,
|
|
"polls/create_event.html",
|
|
{
|
|
"G_R_SITE_KEY": settings.RECAPTCHA_PUBLIC_KEY,
|
|
"user_email": request.user.email,
|
|
"events": events,
|
|
"demo_users": demo_users,
|
|
"invalid_fields": invalid_fields
|
|
})
|
|
|
|
def create_event(request):
|
|
# Obtain context data for the rendering of the html template and validation
|
|
events = Event.objects.all()
|
|
demo_users = DemoUser.objects.all()
|
|
|
|
if request.method == "POST":
|
|
'''Perform Google reCAPTCHA validation'''
|
|
recaptcha_response = request.POST.get('g-recaptcha-response')
|
|
url = 'https://www.google.com/recaptcha/api/siteverify'
|
|
values = {
|
|
'secret': settings.RECAPTCHA_PRIVATE_KEY,
|
|
'response': recaptcha_response
|
|
}
|
|
data = urllib.urlencode(values)
|
|
req = urllib2.Request(url, data)
|
|
response = urllib2.urlopen(req)
|
|
result = json.load(response)
|
|
|
|
'''Perform form data validation'''
|
|
adaptor = EventModelAdaptor(request.POST, request.user)
|
|
form_data_valid = adaptor.isFormDataValid(events, demo_users)
|
|
|
|
'''Process form data based on above results'''
|
|
if result['success']:
|
|
if form_data_valid:
|
|
# Create the new event using the form data
|
|
adaptor.extractData()
|
|
new_event = adaptor.updateModel()
|
|
|
|
# Update the EID to include the GP in its EID
|
|
update_EID.delay(new_event)
|
|
|
|
# Send an email to all trustees for event preparation
|
|
trustees = new_event.users_trustees.all()
|
|
email_trustees_prep.delay(trustees, new_event)
|
|
|
|
adaptor.clear_data()
|
|
|
|
return HttpResponseRedirect(reverse('polls:index'))
|
|
else:
|
|
invalid_fields = adaptor.getInvalidFormFields()
|
|
adaptor.clear_data()
|
|
return render_invalid(request, events, demo_users, invalid_fields)
|
|
else:
|
|
invalid_fields = adaptor.getInvalidFormFields()
|
|
invalid_fields['recaptcha'] = {'error': 'The reCAPTCHA server validation failed, please try again.'}
|
|
adaptor.clear_data()
|
|
return render_invalid(request, events, demo_users, invalid_fields)
|
|
|
|
elif request.method == "GET":
|
|
# Render the template
|
|
return render(request,
|
|
"polls/create_event.html",
|
|
{
|
|
"G_R_SITE_KEY": settings.RECAPTCHA_PUBLIC_KEY,
|
|
"user_email": request.user.email,
|
|
"events": events,
|
|
"demo_users": demo_users
|
|
})
|
|
else:
|
|
return HttpResponseNotAllowed()
|
|
|
|
def edit_event(request, event_id):
|
|
event = get_object_or_404(Event, pk=event_id)
|
|
if request.method == "GET":
|
|
form = EventEditForm(instance=event, prefix="main")
|
|
'''
|
|
organiser_initial_data = [{'email': request.user.email}]
|
|
trustee_initial_data = []
|
|
for user in event.users_organisers.exclude(email=request.user.email):
|
|
organiser_initial_data.append({'email': user.email})
|
|
organiser_formset = OrganiserFormSet(prefix="formset_organiser", initial=organiser_initial_data)
|
|
for trustee in event.users_trustees.all():
|
|
trustee_initial_data.append({'email': trustee.email})
|
|
trustee_formset = TrusteeFormSet(prefix="formset_trustee", initial=trustee_initial_data)
|
|
'''
|
|
elif request.method == "POST":
|
|
form = EventEditForm(request.POST, instance=event, prefix="main")
|
|
#trustee_formset = TrusteeFormSet(request.POST, prefix="formset_trustee")
|
|
#organiser_formset = OrganiserFormSet(request.POST, prefix="formset_organiser") # incase form fails, we still want to retain formset data
|
|
if form.is_valid():
|
|
form.save()
|
|
'''
|
|
if organiser_formset.is_valid():
|
|
event.users_organisers.clear()
|
|
for oform in organiser_formset:
|
|
if (oform.cleaned_data.get('email')):
|
|
event.users_organisers.add(DemoUser.objects.get(email=oform.cleaned_data['email']))
|
|
event.users_organisers.add(request.user) # always add editor/creator
|
|
if trustee_formset.is_valid():
|
|
event.users_trustees.clear()
|
|
for tform in trustee_formset:
|
|
if (tform.cleaned_data.get('email')):
|
|
event.users_trustees.add(EmailUser.objects.get_or_create(email=tform.cleaned_data['email'])[0])
|
|
'''
|
|
return HttpResponseRedirect(reverse('polls:view-event', kwargs={'pk': event.id}))
|
|
return render(request, "polls/generic_form.html", {"form_title": "Edit Event: " + event.title, "form": form}) #"organiser_formset": organiser_formset, "trustee_formset": trustee_formset})
|
|
#trustee_formset = TrusteeFormSet(request.POST, prefix="formset_trustee", instance=event)
|
|
|
|
#class CreatePoll(generic.View):
|
|
|
|
def del_event(request, event_id):
|
|
event = get_object_or_404(Event, pk=event_id)
|
|
if request.method == "GET":
|
|
return render(request, "polls/del_event.html", {"event_title": event.title, "event_id": event.id})
|
|
elif request.method == "POST":
|
|
event.delete()
|
|
return HttpResponseRedirect(reverse('polls:index'))
|
|
|
|
def can_vote(user, event):
|
|
if event.voters.filter(email=user.email).exists():
|
|
return True
|
|
return False |